You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/15 18:10:28 UTC

[GitHub] [flink] dawidwys opened a new pull request #12184: Flink 17027

dawidwys opened a new pull request #12184:
URL: https://github.com/apache/flink/pull/12184


   ## What is the purpose of the change
   
   It adds a new implementation of Elasticsearch table connectors that work with `RowData` and use new factories.
   
   It requires some polishing but I am opening the PR already to have a faster review.
   
   
   ## Verifying this change
   
   Check newly added tests
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? need to add documentation
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99bd627fe8cffe7f4f4417be049b0eb94ec4db79 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426165174



##########
File path: flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.DOCUMENT_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+
+/**
+ * A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}.
+ */
+@Internal
+public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
+	private static final Set<ConfigOption<?>> requiredOptions = Stream.of(
+		HOSTS_OPTION,
+		INDEX_OPTION,
+		DOCUMENT_TYPE_OPTION
+	).collect(Collectors.toSet());
+	private static final Set<ConfigOption<?>> optionalOptions = Stream.of(
+		KEY_DELIMITER_OPTION,
+		FAILURE_HANDLER_OPTION,
+		FLUSH_ON_CHECKPOINT_OPTION,
+		BULK_FLASH_MAX_SIZE_OPTION,
+		BULK_FLUSH_MAX_ACTIONS_OPTION,
+		BULK_FLUSH_INTERVAL_OPTION,
+		BULK_FLUSH_BACKOFF_TYPE_OPTION,
+		BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+		BULK_FLUSH_BACKOFF_DELAY_OPTION,
+		CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+		CONNECTION_PATH_PREFIX,
+		FORMAT_OPTION
+	).collect(Collectors.toSet());
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		ElasticsearchValidationUtils.validatePrimaryKey(context.getCatalogTable().getSchema());
+		final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		final SinkFormat<SerializationSchema<RowData>> format = helper.discoverSinkFormat(
+			SerializationFormatFactory.class,
+			FORMAT_OPTION);
+
+		helper.validate();
+		Configuration configuration = new Configuration();
+		context.getCatalogTable()
+			.getOptions()
+			.forEach(configuration::setString);
+		Elasticsearch6Configuration config = new Elasticsearch6Configuration(configuration, context.getClassLoader());
+
+		validate(config, configuration);
+
+		return new Elasticsearch6DynamicSink(
+			format,
+			config,
+			context.getCatalogTable().getSchema());

Review comment:
       I admit I missed that bit. 
   
   The schema is used for two purposes:
   * generating dynamic index -> it should work on the computed columns as well imo
   * computing id based on the primary key -> primary key cannot be defined on a computed column so it is not a problem
   
   For the primary keys I think it's fine because it primary keys cannot be defined on computed columns. It is a problem for the dynamic index though. I will do as you suggested.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629405311


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 531466e3091df9d6633e2aabc78ed4eb6d111e47 (Fri Oct 16 10:36:16 UTC 2020)
   
   **Warnings:**
    * **2 pom.xml files were touched**: Check for build and licensing issues.
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519",
       "triggerID" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 93f46cc0d8572f70b238640b6dbfe0b207f8c1aa Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99bd627fe8cffe7f4f4417be049b0eb94ec4db79 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426164192



##########
File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+
+/**
+ * A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch7DynamicSink}.
+ */
+@Internal
+public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory {
+	private static final Set<ConfigOption<?>> requiredOptions = Stream.of(
+		HOSTS_OPTION,
+		INDEX_OPTION
+	).collect(Collectors.toSet());
+	private static final Set<ConfigOption<?>> optionalOptions = Stream.of(

Review comment:
       The `documentType` is deprecated starting from ES7. In our old implementation we were ignoring the user provided value, even though it is required.
   
   I made it explicit and removed the `docType` whatsoever from es7.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426164273



##########
File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7Configuration.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+
+import org.apache.http.HttpHost;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+
+/**
+ * Elasticsearch 7 specific configuration.
+ */
+@Internal
+final class Elasticsearch7Configuration extends ElasticsearchConfiguration {
+	Elasticsearch7Configuration(ReadableConfig config, ClassLoader classLoader) {
+		super(config, classLoader);
+	}
+
+	public List<HttpHost> getHosts() {
+		return config.get(HOSTS_OPTION).stream()
+			.map(Elasticsearch7Configuration::validateAndParseHostsString)
+			.collect(Collectors.toList());
+	}
+
+	/**
+	 * Parse Hosts String to list.
+	 *
+	 * <p>Hosts String format was given as following:
+	 *
+	 * <pre>
+	 *     connector.hosts = http://host_name:9092;http://host_name:9093

Review comment:
       This comment is actually unnecessary. I will simply remove it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] leonardBang commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
leonardBang commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426404152



##########
File path: flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.DOCUMENT_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+
+/**
+ * A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}.
+ */
+@Internal
+public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
+	private static final Set<ConfigOption<?>> requiredOptions = Stream.of(
+		HOSTS_OPTION,
+		INDEX_OPTION,
+		DOCUMENT_TYPE_OPTION
+	).collect(Collectors.toSet());
+	private static final Set<ConfigOption<?>> optionalOptions = Stream.of(
+		KEY_DELIMITER_OPTION,
+		FAILURE_HANDLER_OPTION,
+		FLUSH_ON_CHECKPOINT_OPTION,
+		BULK_FLASH_MAX_SIZE_OPTION,
+		BULK_FLUSH_MAX_ACTIONS_OPTION,
+		BULK_FLUSH_INTERVAL_OPTION,
+		BULK_FLUSH_BACKOFF_TYPE_OPTION,
+		BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+		BULK_FLUSH_BACKOFF_DELAY_OPTION,
+		CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+		CONNECTION_PATH_PREFIX,
+		FORMAT_OPTION
+	).collect(Collectors.toSet());
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		ElasticsearchValidationUtils.validatePrimaryKey(context.getCatalogTable().getSchema());
+		final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		final SinkFormat<SerializationSchema<RowData>> format = helper.discoverSinkFormat(
+			SerializationFormatFactory.class,
+			FORMAT_OPTION);
+
+		helper.validate();
+		Configuration configuration = new Configuration();
+		context.getCatalogTable()
+			.getOptions()
+			.forEach(configuration::setString);
+		Elasticsearch6Configuration config = new Elasticsearch6Configuration(configuration, context.getClassLoader());
+
+		validate(config, configuration);
+
+		return new Elasticsearch6DynamicSink(
+			format,
+			config,
+			context.getCatalogTable().getSchema());

Review comment:
       nice discussion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys closed pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
dawidwys closed pull request #12184:
URL: https://github.com/apache/flink/pull/12184


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99bd627fe8cffe7f4f4417be049b0eb94ec4db79 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473) 
   * 93f46cc0d8572f70b238640b6dbfe0b207f8c1aa UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519",
       "triggerID" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1536",
       "triggerID" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "triggerType" : "PUSH"
     }, {
       "hash" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1573",
       "triggerID" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cd36d3abe76854bdabb47fd498e51786fee69239",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "cd36d3abe76854bdabb47fd498e51786fee69239",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 80bbdd72c2aeb3c802deef71436882603590d147 UNKNOWN
   * fba63e0698c8c60a04efb10a8914ea548bacc653 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1536) 
   * 25a6ad18a9f957a5f2081e9e9282aa967343987b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1573) 
   * cd36d3abe76854bdabb47fd498e51786fee69239 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
dawidwys commented on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629748324


   Thanks for the review. Updated.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426165174



##########
File path: flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.DOCUMENT_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+
+/**
+ * A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}.
+ */
+@Internal
+public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
+	private static final Set<ConfigOption<?>> requiredOptions = Stream.of(
+		HOSTS_OPTION,
+		INDEX_OPTION,
+		DOCUMENT_TYPE_OPTION
+	).collect(Collectors.toSet());
+	private static final Set<ConfigOption<?>> optionalOptions = Stream.of(
+		KEY_DELIMITER_OPTION,
+		FAILURE_HANDLER_OPTION,
+		FLUSH_ON_CHECKPOINT_OPTION,
+		BULK_FLASH_MAX_SIZE_OPTION,
+		BULK_FLUSH_MAX_ACTIONS_OPTION,
+		BULK_FLUSH_INTERVAL_OPTION,
+		BULK_FLUSH_BACKOFF_TYPE_OPTION,
+		BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+		BULK_FLUSH_BACKOFF_DELAY_OPTION,
+		CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+		CONNECTION_PATH_PREFIX,
+		FORMAT_OPTION
+	).collect(Collectors.toSet());
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		ElasticsearchValidationUtils.validatePrimaryKey(context.getCatalogTable().getSchema());
+		final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		final SinkFormat<SerializationSchema<RowData>> format = helper.discoverSinkFormat(
+			SerializationFormatFactory.class,
+			FORMAT_OPTION);
+
+		helper.validate();
+		Configuration configuration = new Configuration();
+		context.getCatalogTable()
+			.getOptions()
+			.forEach(configuration::setString);
+		Elasticsearch6Configuration config = new Elasticsearch6Configuration(configuration, context.getClassLoader());
+
+		validate(config, configuration);
+
+		return new Elasticsearch6DynamicSink(
+			format,
+			config,
+			context.getCatalogTable().getSchema());

Review comment:
       I admit I missed that bit. 
   
   The schema is used for two purposes:
   * generating dynamic index -> it should work on the computed columns as well imo
   * computing id based on the primary key -> primary key cannot be defined on a computed column so it is not a problem
   
   For the primary keys I think it's fine because it primary keys cannot be defined on computed columns. It is a problem for the dynamic index though. -I will pass the physical schema and keys separately for now.- I will do as you suggested.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426164011



##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Options for {@link org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch.
+ */
+public class ElasticsearchOptions {
+	/**
+	 * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with
+	 * {@code DISABLED} option.
+	 */
+	public enum BackOffType {
+		DISABLED,
+		CONSTANT,
+		EXPONENTIAL
+	}
+
+	public static final ConfigOption<List<String>> HOSTS_OPTION =
+		ConfigOptions.key("hosts")
+			.stringType()
+			.asList()
+			.noDefaultValue()
+			.withDescription("Elasticseatch hosts to connect to.");
+	public static final ConfigOption<String> INDEX_OPTION =
+		ConfigOptions.key("index")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Elasticsearch index for every record.");
+	public static final ConfigOption<String> DOCUMENT_TYPE_OPTION =
+		ConfigOptions.key("document-type")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Elasticsearch document type.");
+	public static final ConfigOption<String> KEY_DELIMITER_OPTION =
+		ConfigOptions.key("document-id.key-delimiter")
+			.stringType()
+			.defaultValue("_")
+			.withDescription("Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
+	public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
+		ConfigOptions.key("failure-handler")
+			.stringType()
+			.defaultValue("fail")
+			.withDescription(Description.builder()
+				.text("Failure handling strategy in case a request to Elasticsearch fails")
+				.list(
+					text("\"fail\" (throws an exception if a request fails and thus causes a job failure),"),
+					text("\"ignore\" (ignores failures and drops the request),"),
+					text("\"retry_rejected\" (re-adds requests that have failed due to queue capacity saturation),"),
+					text("\"class name\" for failure handling with a ActionRequestFailureHandler subclass"))
+				.build());
+	public static final ConfigOption<Boolean> FLUSH_ON_CHECKPOINT_OPTION =
+		ConfigOptions.key("sink.flush-on-checkpoint")
+			.booleanType()
+			.defaultValue(true)
+			.withDescription("Disables flushing on checkpoint");
+	public static final ConfigOption<Integer> BULK_FLUSH_MAX_ACTIONS_OPTION =
+		ConfigOptions.key("sink.bulk-flush.max-actions")
+			.intType()
+			.noDefaultValue()
+			.withDescription("Maximum number of actions to buffer for each bulk request.");
+	public static final ConfigOption<MemorySize> BULK_FLASH_MAX_SIZE_OPTION =
+		ConfigOptions.key("sink.bulk-flush.max-size")
+			.memoryType()
+			.noDefaultValue()
+			.withDescription("Maximum size of buffered actions per bulk request");
+	public static final ConfigOption<Duration> BULK_FLUSH_INTERVAL_OPTION =
+		ConfigOptions.key("sink.bulk-flush.interval")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Bulk flush interval");
+	public static final ConfigOption<BackOffType> BULK_FLUSH_BACKOFF_TYPE_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.type")
+			.enumType(BackOffType.class)
+			.defaultValue(BackOffType.DISABLED)
+			.withDescription("Backoff strategy");
+	public static final ConfigOption<Integer> BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.max-retries")
+			.intType()
+			.noDefaultValue()
+			.withDescription("Maximum number of retries.");
+	public static final ConfigOption<Duration> BULK_FLUSH_BACKOFF_DELAY_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.delay")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Delay between each backoff attempt.");
+	public static final ConfigOption<Duration> CONNECTION_MAX_RETRY_TIMEOUT_OPTION =
+		ConfigOptions.key("connection.max-retry-timeout")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Maximum timeout between retries.");
+	public static final ConfigOption<String> CONNECTION_PATH_PREFIX =
+		ConfigOptions.key("connection.path-prefix")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Prefix string to be added to every REST communication.");
+	public static final ConfigOption<String> FORMAT_OPTION =
+		ConfigOptions.key("format")
+			.stringType()
+			.defaultValue("json")

Review comment:
       I thought about it. I decided not to do that. It would really be a weak check. We can validate only the `factoryIdentifier` of the format. It still does not guarantee it is our `json` format.
   
   Moreover I thought that if we do not add the validation, we give users more flexibility to override the format with their own version. The only requirement on the format is that it produces a valid json document. It does not have to use our format.
   
   Nevertheless I can add that validation, if you want.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99bd627fe8cffe7f4f4417be049b0eb94ec4db79 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519",
       "triggerID" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1536",
       "triggerID" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "triggerType" : "PUSH"
     }, {
       "hash" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1573",
       "triggerID" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cd36d3abe76854bdabb47fd498e51786fee69239",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1576",
       "triggerID" : "cd36d3abe76854bdabb47fd498e51786fee69239",
       "triggerType" : "PUSH"
     }, {
       "hash" : "513b2b6030c6dc4cbe4e56ab433dd57e838ed0a9",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1591",
       "triggerID" : "513b2b6030c6dc4cbe4e56ab433dd57e838ed0a9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 80bbdd72c2aeb3c802deef71436882603590d147 UNKNOWN
   * cd36d3abe76854bdabb47fd498e51786fee69239 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1576) 
   * 513b2b6030c6dc4cbe4e56ab433dd57e838ed0a9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1591) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519",
       "triggerID" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1536",
       "triggerID" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "triggerType" : "PUSH"
     }, {
       "hash" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1573",
       "triggerID" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cd36d3abe76854bdabb47fd498e51786fee69239",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1576",
       "triggerID" : "cd36d3abe76854bdabb47fd498e51786fee69239",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 80bbdd72c2aeb3c802deef71436882603590d147 UNKNOWN
   * fba63e0698c8c60a04efb10a8914ea548bacc653 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1536) 
   * 25a6ad18a9f957a5f2081e9e9282aa967343987b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1573) 
   * cd36d3abe76854bdabb47fd498e51786fee69239 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1576) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519",
       "triggerID" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1536",
       "triggerID" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "triggerType" : "PUSH"
     }, {
       "hash" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1573",
       "triggerID" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cd36d3abe76854bdabb47fd498e51786fee69239",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1576",
       "triggerID" : "cd36d3abe76854bdabb47fd498e51786fee69239",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 80bbdd72c2aeb3c802deef71436882603590d147 UNKNOWN
   * 25a6ad18a9f957a5f2081e9e9282aa967343987b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1573) 
   * cd36d3abe76854bdabb47fd498e51786fee69239 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1576) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519",
       "triggerID" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 93f46cc0d8572f70b238640b6dbfe0b207f8c1aa Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519) 
   * 80bbdd72c2aeb3c802deef71436882603590d147 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426162683



##########
File path: flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.testutils.ElasticsearchResource;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.client.Client;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT tests for {@link Elasticsearch6DynamicSink}.
+ */
+public class Elasticsearch6DynamicSinkITCase {
+
+	@ClassRule
+	public static ElasticsearchResource elasticsearchResource = new ElasticsearchResource("es-6-dynamic-sink-tests");
+
+	@Test
+	public void testWritingDocuments() throws Exception {

Review comment:
       Could you use DDL to verify the elasticsearch sink to have a better coverage? I think we can use `TableEnvironment#fromValues` to generate source data. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519",
       "triggerID" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1536",
       "triggerID" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "triggerType" : "PUSH"
     }, {
       "hash" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1573",
       "triggerID" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cd36d3abe76854bdabb47fd498e51786fee69239",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1576",
       "triggerID" : "cd36d3abe76854bdabb47fd498e51786fee69239",
       "triggerType" : "PUSH"
     }, {
       "hash" : "513b2b6030c6dc4cbe4e56ab433dd57e838ed0a9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "513b2b6030c6dc4cbe4e56ab433dd57e838ed0a9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 80bbdd72c2aeb3c802deef71436882603590d147 UNKNOWN
   * cd36d3abe76854bdabb47fd498e51786fee69239 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1576) 
   * 513b2b6030c6dc4cbe4e56ab433dd57e838ed0a9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519",
       "triggerID" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1536",
       "triggerID" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "triggerType" : "PUSH"
     }, {
       "hash" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 80bbdd72c2aeb3c802deef71436882603590d147 UNKNOWN
   * fba63e0698c8c60a04efb10a8914ea548bacc653 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1536) 
   * 25a6ad18a9f957a5f2081e9e9282aa967343987b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519",
       "triggerID" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1536",
       "triggerID" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 80bbdd72c2aeb3c802deef71436882603590d147 UNKNOWN
   * fba63e0698c8c60a04efb10a8914ea548bacc653 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1536) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519",
       "triggerID" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1536",
       "triggerID" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 93f46cc0d8572f70b238640b6dbfe0b207f8c1aa Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519) 
   * 80bbdd72c2aeb3c802deef71436882603590d147 UNKNOWN
   * fba63e0698c8c60a04efb10a8914ea548bacc653 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1536) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426156300



##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Options for {@link org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch.
+ */
+public class ElasticsearchOptions {
+	/**
+	 * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with
+	 * {@code DISABLED} option.
+	 */
+	public enum BackOffType {
+		DISABLED,
+		CONSTANT,
+		EXPONENTIAL
+	}
+
+	public static final ConfigOption<List<String>> HOSTS_OPTION =
+		ConfigOptions.key("hosts")
+			.stringType()
+			.asList()
+			.noDefaultValue()
+			.withDescription("Elasticseatch hosts to connect to.");
+	public static final ConfigOption<String> INDEX_OPTION =
+		ConfigOptions.key("index")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Elasticsearch index for every record.");
+	public static final ConfigOption<String> DOCUMENT_TYPE_OPTION =
+		ConfigOptions.key("document-type")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Elasticsearch document type.");
+	public static final ConfigOption<String> KEY_DELIMITER_OPTION =
+		ConfigOptions.key("document-id.key-delimiter")
+			.stringType()
+			.defaultValue("_")
+			.withDescription("Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
+	public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
+		ConfigOptions.key("failure-handler")
+			.stringType()
+			.defaultValue("fail")
+			.withDescription(Description.builder()
+				.text("Failure handling strategy in case a request to Elasticsearch fails")
+				.list(
+					text("\"fail\" (throws an exception if a request fails and thus causes a job failure),"),
+					text("\"ignore\" (ignores failures and drops the request),"),
+					text("\"retry_rejected\" (re-adds requests that have failed due to queue capacity saturation),"),
+					text("\"class name\" for failure handling with a ActionRequestFailureHandler subclass"))
+				.build());
+	public static final ConfigOption<Boolean> FLUSH_ON_CHECKPOINT_OPTION =
+		ConfigOptions.key("sink.flush-on-checkpoint")
+			.booleanType()
+			.defaultValue(true)
+			.withDescription("Disables flushing on checkpoint");
+	public static final ConfigOption<Integer> BULK_FLUSH_MAX_ACTIONS_OPTION =
+		ConfigOptions.key("sink.bulk-flush.max-actions")
+			.intType()
+			.noDefaultValue()
+			.withDescription("Maximum number of actions to buffer for each bulk request.");
+	public static final ConfigOption<MemorySize> BULK_FLASH_MAX_SIZE_OPTION =
+		ConfigOptions.key("sink.bulk-flush.max-size")
+			.memoryType()
+			.noDefaultValue()
+			.withDescription("Maximum size of buffered actions per bulk request");
+	public static final ConfigOption<Duration> BULK_FLUSH_INTERVAL_OPTION =
+		ConfigOptions.key("sink.bulk-flush.interval")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Bulk flush interval");
+	public static final ConfigOption<BackOffType> BULK_FLUSH_BACKOFF_TYPE_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.type")

Review comment:
       "sink.bulk-flush.back-off.strategy"?

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Options for {@link org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch.
+ */
+public class ElasticsearchOptions {
+	/**
+	 * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with
+	 * {@code DISABLED} option.
+	 */
+	public enum BackOffType {
+		DISABLED,
+		CONSTANT,
+		EXPONENTIAL
+	}
+
+	public static final ConfigOption<List<String>> HOSTS_OPTION =
+		ConfigOptions.key("hosts")
+			.stringType()
+			.asList()
+			.noDefaultValue()
+			.withDescription("Elasticseatch hosts to connect to.");
+	public static final ConfigOption<String> INDEX_OPTION =
+		ConfigOptions.key("index")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Elasticsearch index for every record.");
+	public static final ConfigOption<String> DOCUMENT_TYPE_OPTION =
+		ConfigOptions.key("document-type")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Elasticsearch document type.");
+	public static final ConfigOption<String> KEY_DELIMITER_OPTION =
+		ConfigOptions.key("document-id.key-delimiter")
+			.stringType()
+			.defaultValue("_")
+			.withDescription("Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
+	public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
+		ConfigOptions.key("failure-handler")
+			.stringType()
+			.defaultValue("fail")
+			.withDescription(Description.builder()
+				.text("Failure handling strategy in case a request to Elasticsearch fails")
+				.list(
+					text("\"fail\" (throws an exception if a request fails and thus causes a job failure),"),
+					text("\"ignore\" (ignores failures and drops the request),"),
+					text("\"retry_rejected\" (re-adds requests that have failed due to queue capacity saturation),"),
+					text("\"class name\" for failure handling with a ActionRequestFailureHandler subclass"))
+				.build());
+	public static final ConfigOption<Boolean> FLUSH_ON_CHECKPOINT_OPTION =
+		ConfigOptions.key("sink.flush-on-checkpoint")
+			.booleanType()
+			.defaultValue(true)
+			.withDescription("Disables flushing on checkpoint");
+	public static final ConfigOption<Integer> BULK_FLUSH_MAX_ACTIONS_OPTION =
+		ConfigOptions.key("sink.bulk-flush.max-actions")
+			.intType()
+			.noDefaultValue()
+			.withDescription("Maximum number of actions to buffer for each bulk request.");
+	public static final ConfigOption<MemorySize> BULK_FLASH_MAX_SIZE_OPTION =
+		ConfigOptions.key("sink.bulk-flush.max-size")
+			.memoryType()
+			.noDefaultValue()
+			.withDescription("Maximum size of buffered actions per bulk request");
+	public static final ConfigOption<Duration> BULK_FLUSH_INTERVAL_OPTION =
+		ConfigOptions.key("sink.bulk-flush.interval")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Bulk flush interval");
+	public static final ConfigOption<BackOffType> BULK_FLUSH_BACKOFF_TYPE_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.type")
+			.enumType(BackOffType.class)
+			.defaultValue(BackOffType.DISABLED)
+			.withDescription("Backoff strategy");
+	public static final ConfigOption<Integer> BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.max-retries")
+			.intType()
+			.noDefaultValue()
+			.withDescription("Maximum number of retries.");
+	public static final ConfigOption<Duration> BULK_FLUSH_BACKOFF_DELAY_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.delay")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Delay between each backoff attempt.");
+	public static final ConfigOption<Duration> CONNECTION_MAX_RETRY_TIMEOUT_OPTION =
+		ConfigOptions.key("connection.max-retry-timeout")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Maximum timeout between retries.");
+	public static final ConfigOption<String> CONNECTION_PATH_PREFIX =
+		ConfigOptions.key("connection.path-prefix")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Prefix string to be added to every REST communication.");
+	public static final ConfigOption<String> FORMAT_OPTION =
+		ConfigOptions.key("format")
+			.stringType()
+			.defaultValue("json")

Review comment:
       I think we should add a validation on the format value? Should throw exception to tell users ES only supports json?

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * Sink function for converting upserts into Elasticsearch {@link ActionRequest}s.
+ */
+@Internal
+class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final IndexGenerator indexGenerator;
+	private final String docType;
+	private final SerializationSchema<RowData> serializationSchema;
+	private final XContentType contentType;
+	private final RequestFactory requestFactory;
+	private final Function<RowData, String> createKey;
+
+	public RowElasticsearchSinkFunction(
+			IndexGenerator indexGenerator,
+			@Nullable String docType, // this is deprecated in es 7+
+			SerializationSchema<RowData> serializationSchema,
+			XContentType contentType,
+			RequestFactory requestFactory,
+			Function<RowData, String> createKey) {
+		this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
+		this.docType = docType;
+		this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
+		this.contentType = Preconditions.checkNotNull(contentType);
+		this.requestFactory = Preconditions.checkNotNull(requestFactory);
+		this.createKey = Preconditions.checkNotNull(createKey);
+	}
+
+	@Override
+	public void process(
+			RowData element,
+			RuntimeContext ctx,
+			RequestIndexer indexer) {
+		switch (element.getRowKind()) {
+			case INSERT:
+			case UPDATE_AFTER:
+				processUpsert(element, indexer);
+				break;
+			case DELETE:

Review comment:
       For now, we have to add `case UPDATE_BEFORE` here, ignore before is just an optimization. There is still cases the before can't be ignored, e.g. multi-sink (one way requires before), filter on agg result (FLINK-9528). 

##########
File path: flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.DOCUMENT_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+
+/**
+ * A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}.
+ */
+@Internal
+public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
+	private static final Set<ConfigOption<?>> requiredOptions = Stream.of(
+		HOSTS_OPTION,
+		INDEX_OPTION,
+		DOCUMENT_TYPE_OPTION
+	).collect(Collectors.toSet());
+	private static final Set<ConfigOption<?>> optionalOptions = Stream.of(
+		KEY_DELIMITER_OPTION,
+		FAILURE_HANDLER_OPTION,
+		FLUSH_ON_CHECKPOINT_OPTION,
+		BULK_FLASH_MAX_SIZE_OPTION,
+		BULK_FLUSH_MAX_ACTIONS_OPTION,
+		BULK_FLUSH_INTERVAL_OPTION,
+		BULK_FLUSH_BACKOFF_TYPE_OPTION,
+		BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+		BULK_FLUSH_BACKOFF_DELAY_OPTION,
+		CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+		CONNECTION_PATH_PREFIX,
+		FORMAT_OPTION
+	).collect(Collectors.toSet());
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		ElasticsearchValidationUtils.validatePrimaryKey(context.getCatalogTable().getSchema());
+		final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		final SinkFormat<SerializationSchema<RowData>> format = helper.discoverSinkFormat(
+			SerializationFormatFactory.class,
+			FORMAT_OPTION);
+
+		helper.validate();
+		Configuration configuration = new Configuration();
+		context.getCatalogTable()
+			.getOptions()
+			.forEach(configuration::setString);
+		Elasticsearch6Configuration config = new Elasticsearch6Configuration(configuration, context.getClassLoader());
+
+		validate(config, configuration);
+
+		return new Elasticsearch6DynamicSink(
+			format,
+			config,
+			context.getCatalogTable().getSchema());

Review comment:
       It's a little tricky here. We should use `TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema())` here to get rid of computed columns. However `TableSchemaUtils.getPhysicalSchema` will remove primary key information too. Maybe we should keep primary key in `TableSchemaUtils.getPhysicalSchema` and use it here. 

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchValidationUtils.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+
+/**
+ * Utility methods for validating Elasticsearch properties.
+ */
+@Internal
+class ElasticsearchValidationUtils {
+
+	private static final Set<LogicalTypeRoot> ILLEGAL_PRIMARY_KEY_TYPES = new LinkedHashSet<>();
+
+	static {
+		ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ARRAY);
+		ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MAP);
+		ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MULTISET);
+		ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.STRUCTURED_TYPE);
+		ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ROW);
+		ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.RAW);
+		ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.BINARY);
+		ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.VARBINARY);
+	}
+
+	/**
+	 * Checks that the table does not have primary key defined on illegal types.
+	 * In Elasticsearch the primary key is used to calculate the Elasticsearch document id,
+	 * which is a string of up to 512 bytes. It cannot have whitespaces. As of now it is calculated
+	 * by concatenating the fields. Certain types do not have a good string representation to be used
+	 * in this scenario. The illegal types are mostly {@link LogicalTypeFamily#COLLECTION} types and
+	 * {@link LogicalTypeRoot#RAW} type.
+	 */
+	public static void validatePrimaryKey(TableSchema schema) {
+		schema.getPrimaryKey().ifPresent(
+			key -> {
+				List<LogicalTypeRoot> illegalTypes = key.getColumns()
+					.stream()
+					.map(fieldName -> {
+						LogicalType logicalType = schema.getFieldDataType(fieldName).get().getLogicalType();
+						if (hasRoot(logicalType, LogicalTypeRoot.DISTINCT_TYPE)) {
+							return ((DistinctType) logicalType).getSourceType().getTypeRoot();
+						} else {
+							return logicalType.getTypeRoot();
+						}
+					})
+					.filter(ILLEGAL_PRIMARY_KEY_TYPES::contains)
+					.collect(Collectors.toList());
+
+				if (!illegalTypes.isEmpty()) {
+					throw new ValidationException(
+						String.format(
+							"The table has a primary key on columns of illegal types: %s.\n" +
+								" Elasticsearch sink does not support primary keys on columns of types: %s.",
+							ILLEGAL_PRIMARY_KEY_TYPES,
+							illegalTypes));

Review comment:
       Switch these two parameter?

##########
File path: flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+
+import org.apache.http.HttpHost;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+
+/**
+ * Elasticsearch 7 specific configuration.
+ */
+@Internal
+final class Elasticsearch6Configuration extends ElasticsearchConfiguration {
+	Elasticsearch6Configuration(ReadableConfig config, ClassLoader classLoader) {
+		super(config, classLoader);
+	}
+
+	public List<HttpHost> getHosts() {
+		return config.get(HOSTS_OPTION).stream()
+			.map(Elasticsearch6Configuration::validateAndParseHostsString)
+			.collect(Collectors.toList());
+	}
+
+	/**
+	 * Parse Hosts String to list.
+	 *
+	 * <p>Hosts String format was given as following:
+	 *
+	 * <pre>
+	 *     connector.hosts = http://host_name:9092;http://host_name:9093

Review comment:
       ```suggestion
   	 *     hosts = http://host_name:9092;http://host_name:9093
   ```

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Options for {@link org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch.
+ */
+public class ElasticsearchOptions {
+	/**
+	 * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with
+	 * {@code DISABLED} option.
+	 */
+	public enum BackOffType {
+		DISABLED,
+		CONSTANT,
+		EXPONENTIAL
+	}
+
+	public static final ConfigOption<List<String>> HOSTS_OPTION =
+		ConfigOptions.key("hosts")
+			.stringType()
+			.asList()
+			.noDefaultValue()
+			.withDescription("Elasticseatch hosts to connect to.");
+	public static final ConfigOption<String> INDEX_OPTION =
+		ConfigOptions.key("index")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Elasticsearch index for every record.");
+	public static final ConfigOption<String> DOCUMENT_TYPE_OPTION =
+		ConfigOptions.key("document-type")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Elasticsearch document type.");
+	public static final ConfigOption<String> KEY_DELIMITER_OPTION =
+		ConfigOptions.key("document-id.key-delimiter")
+			.stringType()
+			.defaultValue("_")
+			.withDescription("Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
+	public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
+		ConfigOptions.key("failure-handler")
+			.stringType()
+			.defaultValue("fail")
+			.withDescription(Description.builder()
+				.text("Failure handling strategy in case a request to Elasticsearch fails")
+				.list(
+					text("\"fail\" (throws an exception if a request fails and thus causes a job failure),"),
+					text("\"ignore\" (ignores failures and drops the request),"),
+					text("\"retry_rejected\" (re-adds requests that have failed due to queue capacity saturation),"),
+					text("\"class name\" for failure handling with a ActionRequestFailureHandler subclass"))
+				.build());
+	public static final ConfigOption<Boolean> FLUSH_ON_CHECKPOINT_OPTION =
+		ConfigOptions.key("sink.flush-on-checkpoint")
+			.booleanType()
+			.defaultValue(true)
+			.withDescription("Disables flushing on checkpoint");
+	public static final ConfigOption<Integer> BULK_FLUSH_MAX_ACTIONS_OPTION =
+		ConfigOptions.key("sink.bulk-flush.max-actions")
+			.intType()
+			.noDefaultValue()
+			.withDescription("Maximum number of actions to buffer for each bulk request.");
+	public static final ConfigOption<MemorySize> BULK_FLASH_MAX_SIZE_OPTION =
+		ConfigOptions.key("sink.bulk-flush.max-size")
+			.memoryType()
+			.noDefaultValue()
+			.withDescription("Maximum size of buffered actions per bulk request");
+	public static final ConfigOption<Duration> BULK_FLUSH_INTERVAL_OPTION =
+		ConfigOptions.key("sink.bulk-flush.interval")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Bulk flush interval");
+	public static final ConfigOption<BackOffType> BULK_FLUSH_BACKOFF_TYPE_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.type")
+			.enumType(BackOffType.class)
+			.defaultValue(BackOffType.DISABLED)
+			.withDescription("Backoff strategy");
+	public static final ConfigOption<Integer> BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.max-retries")
+			.intType()
+			.noDefaultValue()
+			.withDescription("Maximum number of retries.");
+	public static final ConfigOption<Duration> BULK_FLUSH_BACKOFF_DELAY_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.delay")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Delay between each backoff attempt.");
+	public static final ConfigOption<Duration> CONNECTION_MAX_RETRY_TIMEOUT_OPTION =
+		ConfigOptions.key("connection.max-retry-timeout")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Maximum timeout between retries.");
+	public static final ConfigOption<String> CONNECTION_PATH_PREFIX =
+		ConfigOptions.key("connection.path-prefix")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Prefix string to be added to every REST communication.");
+	public static final ConfigOption<String> FORMAT_OPTION =
+		ConfigOptions.key("format")
+			.stringType()
+			.defaultValue("json")
+			.withDescription("Prefix string to be added to every REST communication.");

Review comment:
       Wrong description?

##########
File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7Configuration.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+
+import org.apache.http.HttpHost;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+
+/**
+ * Elasticsearch 7 specific configuration.
+ */
+@Internal
+final class Elasticsearch7Configuration extends ElasticsearchConfiguration {
+	Elasticsearch7Configuration(ReadableConfig config, ClassLoader classLoader) {
+		super(config, classLoader);
+	}
+
+	public List<HttpHost> getHosts() {
+		return config.get(HOSTS_OPTION).stream()
+			.map(Elasticsearch7Configuration::validateAndParseHostsString)
+			.collect(Collectors.toList());
+	}
+
+	/**
+	 * Parse Hosts String to list.
+	 *
+	 * <p>Hosts String format was given as following:
+	 *
+	 * <pre>
+	 *     connector.hosts = http://host_name:9092;http://host_name:9093

Review comment:
       ```suggestion
   	 *     hosts = http://host_name:9092;http://host_name:9093
   ```

##########
File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+
+/**
+ * A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch7DynamicSink}.
+ */
+@Internal
+public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory {
+	private static final Set<ConfigOption<?>> requiredOptions = Stream.of(
+		HOSTS_OPTION,
+		INDEX_OPTION
+	).collect(Collectors.toSet());
+	private static final Set<ConfigOption<?>> optionalOptions = Stream.of(

Review comment:
       Do we need to add `DOCUMENT_TYPE_OPTION` to the ES7 factory? I think it's still valid?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519",
       "triggerID" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99bd627fe8cffe7f4f4417be049b0eb94ec4db79 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473) 
   * 93f46cc0d8572f70b238640b6dbfe0b207f8c1aa Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426209541



##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Options for {@link org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch.
+ */
+public class ElasticsearchOptions {
+	/**
+	 * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with
+	 * {@code DISABLED} option.
+	 */
+	public enum BackOffType {
+		DISABLED,
+		CONSTANT,
+		EXPONENTIAL
+	}
+
+	public static final ConfigOption<List<String>> HOSTS_OPTION =
+		ConfigOptions.key("hosts")
+			.stringType()
+			.asList()
+			.noDefaultValue()
+			.withDescription("Elasticseatch hosts to connect to.");
+	public static final ConfigOption<String> INDEX_OPTION =
+		ConfigOptions.key("index")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Elasticsearch index for every record.");
+	public static final ConfigOption<String> DOCUMENT_TYPE_OPTION =
+		ConfigOptions.key("document-type")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Elasticsearch document type.");
+	public static final ConfigOption<String> KEY_DELIMITER_OPTION =
+		ConfigOptions.key("document-id.key-delimiter")
+			.stringType()
+			.defaultValue("_")
+			.withDescription("Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
+	public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
+		ConfigOptions.key("failure-handler")
+			.stringType()
+			.defaultValue("fail")
+			.withDescription(Description.builder()
+				.text("Failure handling strategy in case a request to Elasticsearch fails")
+				.list(
+					text("\"fail\" (throws an exception if a request fails and thus causes a job failure),"),
+					text("\"ignore\" (ignores failures and drops the request),"),
+					text("\"retry_rejected\" (re-adds requests that have failed due to queue capacity saturation),"),
+					text("\"class name\" for failure handling with a ActionRequestFailureHandler subclass"))
+				.build());
+	public static final ConfigOption<Boolean> FLUSH_ON_CHECKPOINT_OPTION =
+		ConfigOptions.key("sink.flush-on-checkpoint")
+			.booleanType()
+			.defaultValue(true)
+			.withDescription("Disables flushing on checkpoint");
+	public static final ConfigOption<Integer> BULK_FLUSH_MAX_ACTIONS_OPTION =
+		ConfigOptions.key("sink.bulk-flush.max-actions")
+			.intType()
+			.noDefaultValue()
+			.withDescription("Maximum number of actions to buffer for each bulk request.");
+	public static final ConfigOption<MemorySize> BULK_FLASH_MAX_SIZE_OPTION =
+		ConfigOptions.key("sink.bulk-flush.max-size")
+			.memoryType()
+			.noDefaultValue()
+			.withDescription("Maximum size of buffered actions per bulk request");
+	public static final ConfigOption<Duration> BULK_FLUSH_INTERVAL_OPTION =
+		ConfigOptions.key("sink.bulk-flush.interval")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Bulk flush interval");
+	public static final ConfigOption<BackOffType> BULK_FLUSH_BACKOFF_TYPE_OPTION =
+		ConfigOptions.key("sink.bulk-flush.back-off.strategy")
+			.enumType(BackOffType.class)
+			.defaultValue(BackOffType.DISABLED)
+			.withDescription("Backoff strategy");
+	public static final ConfigOption<Integer> BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.max-retries")

Review comment:
       sink.bulk-flush.back-off.max-retries

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Options for {@link org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch.
+ */
+public class ElasticsearchOptions {
+	/**
+	 * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with
+	 * {@code DISABLED} option.
+	 */
+	public enum BackOffType {
+		DISABLED,
+		CONSTANT,
+		EXPONENTIAL
+	}
+
+	public static final ConfigOption<List<String>> HOSTS_OPTION =
+		ConfigOptions.key("hosts")
+			.stringType()
+			.asList()
+			.noDefaultValue()
+			.withDescription("Elasticseatch hosts to connect to.");
+	public static final ConfigOption<String> INDEX_OPTION =
+		ConfigOptions.key("index")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Elasticsearch index for every record.");
+	public static final ConfigOption<String> DOCUMENT_TYPE_OPTION =
+		ConfigOptions.key("document-type")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Elasticsearch document type.");
+	public static final ConfigOption<String> KEY_DELIMITER_OPTION =
+		ConfigOptions.key("document-id.key-delimiter")
+			.stringType()
+			.defaultValue("_")
+			.withDescription("Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
+	public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
+		ConfigOptions.key("failure-handler")
+			.stringType()
+			.defaultValue("fail")
+			.withDescription(Description.builder()
+				.text("Failure handling strategy in case a request to Elasticsearch fails")
+				.list(
+					text("\"fail\" (throws an exception if a request fails and thus causes a job failure),"),
+					text("\"ignore\" (ignores failures and drops the request),"),
+					text("\"retry_rejected\" (re-adds requests that have failed due to queue capacity saturation),"),
+					text("\"class name\" for failure handling with a ActionRequestFailureHandler subclass"))
+				.build());
+	public static final ConfigOption<Boolean> FLUSH_ON_CHECKPOINT_OPTION =
+		ConfigOptions.key("sink.flush-on-checkpoint")
+			.booleanType()
+			.defaultValue(true)
+			.withDescription("Disables flushing on checkpoint");
+	public static final ConfigOption<Integer> BULK_FLUSH_MAX_ACTIONS_OPTION =
+		ConfigOptions.key("sink.bulk-flush.max-actions")
+			.intType()
+			.noDefaultValue()
+			.withDescription("Maximum number of actions to buffer for each bulk request.");
+	public static final ConfigOption<MemorySize> BULK_FLASH_MAX_SIZE_OPTION =
+		ConfigOptions.key("sink.bulk-flush.max-size")
+			.memoryType()
+			.noDefaultValue()
+			.withDescription("Maximum size of buffered actions per bulk request");
+	public static final ConfigOption<Duration> BULK_FLUSH_INTERVAL_OPTION =
+		ConfigOptions.key("sink.bulk-flush.interval")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Bulk flush interval");
+	public static final ConfigOption<BackOffType> BULK_FLUSH_BACKOFF_TYPE_OPTION =
+		ConfigOptions.key("sink.bulk-flush.back-off.strategy")
+			.enumType(BackOffType.class)
+			.defaultValue(BackOffType.DISABLED)
+			.withDescription("Backoff strategy");
+	public static final ConfigOption<Integer> BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.max-retries")
+			.intType()
+			.noDefaultValue()
+			.withDescription("Maximum number of retries.");
+	public static final ConfigOption<Duration> BULK_FLUSH_BACKOFF_DELAY_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.delay")

Review comment:
       sink.bulk-flush.back-off.delay

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * Sink function for converting upserts into Elasticsearch {@link ActionRequest}s.
+ */
+@Internal
+class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final IndexGenerator indexGenerator;
+	private final String docType;
+	private final SerializationSchema<RowData> serializationSchema;
+	private final XContentType contentType;
+	private final RequestFactory requestFactory;
+	private final Function<RowData, String> createKey;
+
+	public RowElasticsearchSinkFunction(
+			IndexGenerator indexGenerator,
+			@Nullable String docType, // this is deprecated in es 7+
+			SerializationSchema<RowData> serializationSchema,
+			XContentType contentType,
+			RequestFactory requestFactory,
+			Function<RowData, String> createKey) {
+		this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
+		this.docType = docType;
+		this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
+		this.contentType = Preconditions.checkNotNull(contentType);
+		this.requestFactory = Preconditions.checkNotNull(requestFactory);
+		this.createKey = Preconditions.checkNotNull(createKey);
+	}
+
+	@Override
+	public void process(
+			RowData element,
+			RuntimeContext ctx,
+			RequestIndexer indexer) {
+		switch (element.getRowKind()) {
+			case INSERT:
+			case UPDATE_BEFORE:

Review comment:
       An `UPDATE_BEFORE` means it's a retraction of previous row, so we should `processDelete` for it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426176060



##########
File path: flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.DOCUMENT_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+
+/**
+ * A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}.
+ */
+@Internal
+public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
+	private static final Set<ConfigOption<?>> requiredOptions = Stream.of(
+		HOSTS_OPTION,
+		INDEX_OPTION,
+		DOCUMENT_TYPE_OPTION
+	).collect(Collectors.toSet());
+	private static final Set<ConfigOption<?>> optionalOptions = Stream.of(
+		KEY_DELIMITER_OPTION,
+		FAILURE_HANDLER_OPTION,
+		FLUSH_ON_CHECKPOINT_OPTION,
+		BULK_FLASH_MAX_SIZE_OPTION,
+		BULK_FLUSH_MAX_ACTIONS_OPTION,
+		BULK_FLUSH_INTERVAL_OPTION,
+		BULK_FLUSH_BACKOFF_TYPE_OPTION,
+		BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+		BULK_FLUSH_BACKOFF_DELAY_OPTION,
+		CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+		CONNECTION_PATH_PREFIX,
+		FORMAT_OPTION
+	).collect(Collectors.toSet());
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		ElasticsearchValidationUtils.validatePrimaryKey(context.getCatalogTable().getSchema());
+		final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		final SinkFormat<SerializationSchema<RowData>> format = helper.discoverSinkFormat(
+			SerializationFormatFactory.class,
+			FORMAT_OPTION);
+
+		helper.validate();
+		Configuration configuration = new Configuration();
+		context.getCatalogTable()
+			.getOptions()
+			.forEach(configuration::setString);
+		Elasticsearch6Configuration config = new Elasticsearch6Configuration(configuration, context.getClassLoader());
+
+		validate(config, configuration);
+
+		return new Elasticsearch6DynamicSink(
+			format,
+			config,
+			context.getCatalogTable().getSchema());

Review comment:
       You are right. BTW I think the `FileSystemTableSink` have the same problem.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519",
       "triggerID" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1536",
       "triggerID" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "triggerType" : "PUSH"
     }, {
       "hash" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1573",
       "triggerID" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cd36d3abe76854bdabb47fd498e51786fee69239",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1576",
       "triggerID" : "cd36d3abe76854bdabb47fd498e51786fee69239",
       "triggerType" : "PUSH"
     }, {
       "hash" : "513b2b6030c6dc4cbe4e56ab433dd57e838ed0a9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1591",
       "triggerID" : "513b2b6030c6dc4cbe4e56ab433dd57e838ed0a9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 80bbdd72c2aeb3c802deef71436882603590d147 UNKNOWN
   * 513b2b6030c6dc4cbe4e56ab433dd57e838ed0a9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1591) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426164011



##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Options for {@link org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch.
+ */
+public class ElasticsearchOptions {
+	/**
+	 * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with
+	 * {@code DISABLED} option.
+	 */
+	public enum BackOffType {
+		DISABLED,
+		CONSTANT,
+		EXPONENTIAL
+	}
+
+	public static final ConfigOption<List<String>> HOSTS_OPTION =
+		ConfigOptions.key("hosts")
+			.stringType()
+			.asList()
+			.noDefaultValue()
+			.withDescription("Elasticseatch hosts to connect to.");
+	public static final ConfigOption<String> INDEX_OPTION =
+		ConfigOptions.key("index")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Elasticsearch index for every record.");
+	public static final ConfigOption<String> DOCUMENT_TYPE_OPTION =
+		ConfigOptions.key("document-type")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Elasticsearch document type.");
+	public static final ConfigOption<String> KEY_DELIMITER_OPTION =
+		ConfigOptions.key("document-id.key-delimiter")
+			.stringType()
+			.defaultValue("_")
+			.withDescription("Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
+	public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
+		ConfigOptions.key("failure-handler")
+			.stringType()
+			.defaultValue("fail")
+			.withDescription(Description.builder()
+				.text("Failure handling strategy in case a request to Elasticsearch fails")
+				.list(
+					text("\"fail\" (throws an exception if a request fails and thus causes a job failure),"),
+					text("\"ignore\" (ignores failures and drops the request),"),
+					text("\"retry_rejected\" (re-adds requests that have failed due to queue capacity saturation),"),
+					text("\"class name\" for failure handling with a ActionRequestFailureHandler subclass"))
+				.build());
+	public static final ConfigOption<Boolean> FLUSH_ON_CHECKPOINT_OPTION =
+		ConfigOptions.key("sink.flush-on-checkpoint")
+			.booleanType()
+			.defaultValue(true)
+			.withDescription("Disables flushing on checkpoint");
+	public static final ConfigOption<Integer> BULK_FLUSH_MAX_ACTIONS_OPTION =
+		ConfigOptions.key("sink.bulk-flush.max-actions")
+			.intType()
+			.noDefaultValue()
+			.withDescription("Maximum number of actions to buffer for each bulk request.");
+	public static final ConfigOption<MemorySize> BULK_FLASH_MAX_SIZE_OPTION =
+		ConfigOptions.key("sink.bulk-flush.max-size")
+			.memoryType()
+			.noDefaultValue()
+			.withDescription("Maximum size of buffered actions per bulk request");
+	public static final ConfigOption<Duration> BULK_FLUSH_INTERVAL_OPTION =
+		ConfigOptions.key("sink.bulk-flush.interval")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Bulk flush interval");
+	public static final ConfigOption<BackOffType> BULK_FLUSH_BACKOFF_TYPE_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.type")
+			.enumType(BackOffType.class)
+			.defaultValue(BackOffType.DISABLED)
+			.withDescription("Backoff strategy");
+	public static final ConfigOption<Integer> BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.max-retries")
+			.intType()
+			.noDefaultValue()
+			.withDescription("Maximum number of retries.");
+	public static final ConfigOption<Duration> BULK_FLUSH_BACKOFF_DELAY_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.delay")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Delay between each backoff attempt.");
+	public static final ConfigOption<Duration> CONNECTION_MAX_RETRY_TIMEOUT_OPTION =
+		ConfigOptions.key("connection.max-retry-timeout")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Maximum timeout between retries.");
+	public static final ConfigOption<String> CONNECTION_PATH_PREFIX =
+		ConfigOptions.key("connection.path-prefix")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Prefix string to be added to every REST communication.");
+	public static final ConfigOption<String> FORMAT_OPTION =
+		ConfigOptions.key("format")
+			.stringType()
+			.defaultValue("json")

Review comment:
       I thought about it. I decided not to do that. It would really be a weak check. We can validate only the `factoryIdentifier` of the format. It still does not guarantee it is out `json` format.
   
   Moreover I thought that if we do not add the validation, we give users more flexibility to override the format with their own version. The only requirement on the format is that it produces a valid json document. It does not have to use our format.
   
   Nevertheless I can add that validation, if you want.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519",
       "triggerID" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1536",
       "triggerID" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "triggerType" : "PUSH"
     }, {
       "hash" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1573",
       "triggerID" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cd36d3abe76854bdabb47fd498e51786fee69239",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1576",
       "triggerID" : "cd36d3abe76854bdabb47fd498e51786fee69239",
       "triggerType" : "PUSH"
     }, {
       "hash" : "513b2b6030c6dc4cbe4e56ab433dd57e838ed0a9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1591",
       "triggerID" : "513b2b6030c6dc4cbe4e56ab433dd57e838ed0a9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "531466e3091df9d6633e2aabc78ed4eb6d111e47",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1598",
       "triggerID" : "531466e3091df9d6633e2aabc78ed4eb6d111e47",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 80bbdd72c2aeb3c802deef71436882603590d147 UNKNOWN
   * 513b2b6030c6dc4cbe4e56ab433dd57e838ed0a9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1591) 
   * 531466e3091df9d6633e2aabc78ed4eb6d111e47 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1598) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426176060



##########
File path: flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.DOCUMENT_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+
+/**
+ * A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}.
+ */
+@Internal
+public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
+	private static final Set<ConfigOption<?>> requiredOptions = Stream.of(
+		HOSTS_OPTION,
+		INDEX_OPTION,
+		DOCUMENT_TYPE_OPTION
+	).collect(Collectors.toSet());
+	private static final Set<ConfigOption<?>> optionalOptions = Stream.of(
+		KEY_DELIMITER_OPTION,
+		FAILURE_HANDLER_OPTION,
+		FLUSH_ON_CHECKPOINT_OPTION,
+		BULK_FLASH_MAX_SIZE_OPTION,
+		BULK_FLUSH_MAX_ACTIONS_OPTION,
+		BULK_FLUSH_INTERVAL_OPTION,
+		BULK_FLUSH_BACKOFF_TYPE_OPTION,
+		BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+		BULK_FLUSH_BACKOFF_DELAY_OPTION,
+		CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+		CONNECTION_PATH_PREFIX,
+		FORMAT_OPTION
+	).collect(Collectors.toSet());
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		ElasticsearchValidationUtils.validatePrimaryKey(context.getCatalogTable().getSchema());
+		final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		final SinkFormat<SerializationSchema<RowData>> format = helper.discoverSinkFormat(
+			SerializationFormatFactory.class,
+			FORMAT_OPTION);
+
+		helper.validate();
+		Configuration configuration = new Configuration();
+		context.getCatalogTable()
+			.getOptions()
+			.forEach(configuration::setString);
+		Elasticsearch6Configuration config = new Elasticsearch6Configuration(configuration, context.getClassLoader());
+
+		validate(config, configuration);
+
+		return new Elasticsearch6DynamicSink(
+			format,
+			config,
+			context.getCatalogTable().getSchema());

Review comment:
       You are right. BTW I think the `FileSystemTableSink` has the same problem.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426165174



##########
File path: flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.DOCUMENT_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+
+/**
+ * A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}.
+ */
+@Internal
+public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
+	private static final Set<ConfigOption<?>> requiredOptions = Stream.of(
+		HOSTS_OPTION,
+		INDEX_OPTION,
+		DOCUMENT_TYPE_OPTION
+	).collect(Collectors.toSet());
+	private static final Set<ConfigOption<?>> optionalOptions = Stream.of(
+		KEY_DELIMITER_OPTION,
+		FAILURE_HANDLER_OPTION,
+		FLUSH_ON_CHECKPOINT_OPTION,
+		BULK_FLASH_MAX_SIZE_OPTION,
+		BULK_FLUSH_MAX_ACTIONS_OPTION,
+		BULK_FLUSH_INTERVAL_OPTION,
+		BULK_FLUSH_BACKOFF_TYPE_OPTION,
+		BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+		BULK_FLUSH_BACKOFF_DELAY_OPTION,
+		CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+		CONNECTION_PATH_PREFIX,
+		FORMAT_OPTION
+	).collect(Collectors.toSet());
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		ElasticsearchValidationUtils.validatePrimaryKey(context.getCatalogTable().getSchema());
+		final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		final SinkFormat<SerializationSchema<RowData>> format = helper.discoverSinkFormat(
+			SerializationFormatFactory.class,
+			FORMAT_OPTION);
+
+		helper.validate();
+		Configuration configuration = new Configuration();
+		context.getCatalogTable()
+			.getOptions()
+			.forEach(configuration::setString);
+		Elasticsearch6Configuration config = new Elasticsearch6Configuration(configuration, context.getClassLoader());
+
+		validate(config, configuration);
+
+		return new Elasticsearch6DynamicSink(
+			format,
+			config,
+			context.getCatalogTable().getSchema());

Review comment:
       I admit I missed that bit. 
   
   The schema is used for two purposes:
   * generating dynamic index -> it should work on the computed columns as well imo
   * computing id based on the primary key -> primary key cannot be defined on a computed column so it is not a problem
   
   For the primary keys I think it's fine because it primary keys cannot be defined on computed columns. It is a problem for the dynamic index though. I will pass the physical schema and keys separately for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519",
       "triggerID" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1536",
       "triggerID" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "triggerType" : "PUSH"
     }, {
       "hash" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1573",
       "triggerID" : "25a6ad18a9f957a5f2081e9e9282aa967343987b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cd36d3abe76854bdabb47fd498e51786fee69239",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1576",
       "triggerID" : "cd36d3abe76854bdabb47fd498e51786fee69239",
       "triggerType" : "PUSH"
     }, {
       "hash" : "513b2b6030c6dc4cbe4e56ab433dd57e838ed0a9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1591",
       "triggerID" : "513b2b6030c6dc4cbe4e56ab433dd57e838ed0a9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "531466e3091df9d6633e2aabc78ed4eb6d111e47",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "531466e3091df9d6633e2aabc78ed4eb6d111e47",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 80bbdd72c2aeb3c802deef71436882603590d147 UNKNOWN
   * 513b2b6030c6dc4cbe4e56ab433dd57e838ed0a9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1591) 
   * 531466e3091df9d6633e2aabc78ed4eb6d111e47 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426166966



##########
File path: flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.DOCUMENT_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+
+/**
+ * A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}.
+ */
+@Internal
+public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
+	private static final Set<ConfigOption<?>> requiredOptions = Stream.of(
+		HOSTS_OPTION,
+		INDEX_OPTION,
+		DOCUMENT_TYPE_OPTION
+	).collect(Collectors.toSet());
+	private static final Set<ConfigOption<?>> optionalOptions = Stream.of(
+		KEY_DELIMITER_OPTION,
+		FAILURE_HANDLER_OPTION,
+		FLUSH_ON_CHECKPOINT_OPTION,
+		BULK_FLASH_MAX_SIZE_OPTION,
+		BULK_FLUSH_MAX_ACTIONS_OPTION,
+		BULK_FLUSH_INTERVAL_OPTION,
+		BULK_FLUSH_BACKOFF_TYPE_OPTION,
+		BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+		BULK_FLUSH_BACKOFF_DELAY_OPTION,
+		CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+		CONNECTION_PATH_PREFIX,
+		FORMAT_OPTION
+	).collect(Collectors.toSet());
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		ElasticsearchValidationUtils.validatePrimaryKey(context.getCatalogTable().getSchema());
+		final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		final SinkFormat<SerializationSchema<RowData>> format = helper.discoverSinkFormat(
+			SerializationFormatFactory.class,
+			FORMAT_OPTION);
+
+		helper.validate();
+		Configuration configuration = new Configuration();
+		context.getCatalogTable()
+			.getOptions()
+			.forEach(configuration::setString);
+		Elasticsearch6Configuration config = new Elasticsearch6Configuration(configuration, context.getClassLoader());
+
+		validate(config, configuration);
+
+		return new Elasticsearch6DynamicSink(
+			format,
+			config,
+			context.getCatalogTable().getSchema());

Review comment:
       I think it's also a problem for the key extract, because we are using the index to extract key. The RowData consumed by sink do not contain computed column. Maybe we should add test for this. 

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Options for {@link org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch.
+ */
+public class ElasticsearchOptions {
+	/**
+	 * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with
+	 * {@code DISABLED} option.
+	 */
+	public enum BackOffType {
+		DISABLED,
+		CONSTANT,
+		EXPONENTIAL
+	}
+
+	public static final ConfigOption<List<String>> HOSTS_OPTION =
+		ConfigOptions.key("hosts")
+			.stringType()
+			.asList()
+			.noDefaultValue()
+			.withDescription("Elasticseatch hosts to connect to.");
+	public static final ConfigOption<String> INDEX_OPTION =
+		ConfigOptions.key("index")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Elasticsearch index for every record.");
+	public static final ConfigOption<String> DOCUMENT_TYPE_OPTION =
+		ConfigOptions.key("document-type")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Elasticsearch document type.");
+	public static final ConfigOption<String> KEY_DELIMITER_OPTION =
+		ConfigOptions.key("document-id.key-delimiter")
+			.stringType()
+			.defaultValue("_")
+			.withDescription("Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
+	public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
+		ConfigOptions.key("failure-handler")
+			.stringType()
+			.defaultValue("fail")
+			.withDescription(Description.builder()
+				.text("Failure handling strategy in case a request to Elasticsearch fails")
+				.list(
+					text("\"fail\" (throws an exception if a request fails and thus causes a job failure),"),
+					text("\"ignore\" (ignores failures and drops the request),"),
+					text("\"retry_rejected\" (re-adds requests that have failed due to queue capacity saturation),"),
+					text("\"class name\" for failure handling with a ActionRequestFailureHandler subclass"))
+				.build());
+	public static final ConfigOption<Boolean> FLUSH_ON_CHECKPOINT_OPTION =
+		ConfigOptions.key("sink.flush-on-checkpoint")
+			.booleanType()
+			.defaultValue(true)
+			.withDescription("Disables flushing on checkpoint");
+	public static final ConfigOption<Integer> BULK_FLUSH_MAX_ACTIONS_OPTION =
+		ConfigOptions.key("sink.bulk-flush.max-actions")
+			.intType()
+			.noDefaultValue()
+			.withDescription("Maximum number of actions to buffer for each bulk request.");
+	public static final ConfigOption<MemorySize> BULK_FLASH_MAX_SIZE_OPTION =
+		ConfigOptions.key("sink.bulk-flush.max-size")
+			.memoryType()
+			.noDefaultValue()
+			.withDescription("Maximum size of buffered actions per bulk request");
+	public static final ConfigOption<Duration> BULK_FLUSH_INTERVAL_OPTION =
+		ConfigOptions.key("sink.bulk-flush.interval")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Bulk flush interval");
+	public static final ConfigOption<BackOffType> BULK_FLUSH_BACKOFF_TYPE_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.type")
+			.enumType(BackOffType.class)
+			.defaultValue(BackOffType.DISABLED)
+			.withDescription("Backoff strategy");
+	public static final ConfigOption<Integer> BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.max-retries")
+			.intType()
+			.noDefaultValue()
+			.withDescription("Maximum number of retries.");
+	public static final ConfigOption<Duration> BULK_FLUSH_BACKOFF_DELAY_OPTION =
+		ConfigOptions.key("sink.bulk-flush.backoff.delay")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Delay between each backoff attempt.");
+	public static final ConfigOption<Duration> CONNECTION_MAX_RETRY_TIMEOUT_OPTION =
+		ConfigOptions.key("connection.max-retry-timeout")
+			.durationType()
+			.noDefaultValue()
+			.withDescription("Maximum timeout between retries.");
+	public static final ConfigOption<String> CONNECTION_PATH_PREFIX =
+		ConfigOptions.key("connection.path-prefix")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Prefix string to be added to every REST communication.");
+	public static final ConfigOption<String> FORMAT_OPTION =
+		ConfigOptions.key("format")
+			.stringType()
+			.defaultValue("json")

Review comment:
       Makes sense to me. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426164192



##########
File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+
+/**
+ * A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch7DynamicSink}.
+ */
+@Internal
+public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory {
+	private static final Set<ConfigOption<?>> requiredOptions = Stream.of(
+		HOSTS_OPTION,
+		INDEX_OPTION
+	).collect(Collectors.toSet());
+	private static final Set<ConfigOption<?>> optionalOptions = Stream.of(

Review comment:
       The `documentType` is deprecated starting from ES7. In our old implementation we are ignoring the user provided value, even though it is required.
   
   I made it explicit and removed the `docType` whatsoever from es7.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426167179



##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * Sink function for converting upserts into Elasticsearch {@link ActionRequest}s.
+ */
+@Internal
+class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final IndexGenerator indexGenerator;
+	private final String docType;
+	private final SerializationSchema<RowData> serializationSchema;
+	private final XContentType contentType;
+	private final RequestFactory requestFactory;
+	private final Function<RowData, String> createKey;
+
+	public RowElasticsearchSinkFunction(
+			IndexGenerator indexGenerator,
+			@Nullable String docType, // this is deprecated in es 7+
+			SerializationSchema<RowData> serializationSchema,
+			XContentType contentType,
+			RequestFactory requestFactory,
+			Function<RowData, String> createKey) {
+		this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
+		this.docType = docType;
+		this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
+		this.contentType = Preconditions.checkNotNull(contentType);
+		this.requestFactory = Preconditions.checkNotNull(requestFactory);
+		this.createKey = Preconditions.checkNotNull(createKey);
+	}
+
+	@Override
+	public void process(
+			RowData element,
+			RuntimeContext ctx,
+			RequestIndexer indexer) {
+		switch (element.getRowKind()) {
+			case INSERT:
+			case UPDATE_AFTER:
+				processUpsert(element, indexer);
+				break;
+			case DELETE:

Review comment:
       > I am not sure if I understood your comment.
   Will we still get the UPDATE_BEFORE event though we say in org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSink#getChangelogMode that we do not support it?
   
   Yes. 
   
   Currently, only `UPDATE_BEFORE` can be ignored (but may still receive it). If a sink want to ignore `UPDATE_AFTER` or `DELETE`, an exception will be thrown.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629425634


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1473",
       "triggerID" : "99bd627fe8cffe7f4f4417be049b0eb94ec4db79",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519",
       "triggerID" : "93f46cc0d8572f70b238640b6dbfe0b207f8c1aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "80bbdd72c2aeb3c802deef71436882603590d147",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fba63e0698c8c60a04efb10a8914ea548bacc653",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 93f46cc0d8572f70b238640b6dbfe0b207f8c1aa Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1519) 
   * 80bbdd72c2aeb3c802deef71436882603590d147 UNKNOWN
   * fba63e0698c8c60a04efb10a8914ea548bacc653 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12184:
URL: https://github.com/apache/flink/pull/12184#issuecomment-629405311


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 71a42dd37a3cd688addba121d393c2ea1fedb938 (Fri May 15 18:12:50 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12184: [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12184:
URL: https://github.com/apache/flink/pull/12184#discussion_r426165863



##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * Sink function for converting upserts into Elasticsearch {@link ActionRequest}s.
+ */
+@Internal
+class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final IndexGenerator indexGenerator;
+	private final String docType;
+	private final SerializationSchema<RowData> serializationSchema;
+	private final XContentType contentType;
+	private final RequestFactory requestFactory;
+	private final Function<RowData, String> createKey;
+
+	public RowElasticsearchSinkFunction(
+			IndexGenerator indexGenerator,
+			@Nullable String docType, // this is deprecated in es 7+
+			SerializationSchema<RowData> serializationSchema,
+			XContentType contentType,
+			RequestFactory requestFactory,
+			Function<RowData, String> createKey) {
+		this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
+		this.docType = docType;
+		this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
+		this.contentType = Preconditions.checkNotNull(contentType);
+		this.requestFactory = Preconditions.checkNotNull(requestFactory);
+		this.createKey = Preconditions.checkNotNull(createKey);
+	}
+
+	@Override
+	public void process(
+			RowData element,
+			RuntimeContext ctx,
+			RequestIndexer indexer) {
+		switch (element.getRowKind()) {
+			case INSERT:
+			case UPDATE_AFTER:
+				processUpsert(element, indexer);
+				break;
+			case DELETE:

Review comment:
       I am not sure if I understood your comment.
   Will we still get the `UPDATE_BEFORE` event though we say in `org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSink#getChangelogMode` that we do not support it?
   
   If we do, what should I do with it? shall I treat the same way as `UPDATE_AFTER`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org