You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/07 14:15:33 UTC

[GitHub] [flink] alpreu commented on a change in pull request #17374: [FLINK-24327][connectors/elasticsearch] Add Elasticsearch 7 sink for table API

alpreu commented on a change in pull request #17374:
URL: https://github.com/apache/flink/pull/17374#discussion_r724221489



##########
File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7Configuration.java
##########
@@ -20,20 +20,91 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.elasticsearch.sink.FlushBackoffType;
 import org.apache.flink.table.api.ValidationException;
 
 import org.apache.http.HttpHost;
 
+import java.time.Duration;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.DELIVERY_GUARANTEE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.PASSWORD_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.USERNAME_OPTION;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Elasticsearch 7 specific configuration. */
 @Internal
-final class Elasticsearch7Configuration extends ElasticsearchConfiguration {
+final class Elasticsearch7Configuration {
+    protected final ReadableConfig config;
+    private final ClassLoader classLoader;
+
     Elasticsearch7Configuration(ReadableConfig config, ClassLoader classLoader) {
-        super(config, classLoader);
+        this.config = checkNotNull(config);
+        this.classLoader = checkNotNull(classLoader);
+    }
+
+    public int getBulkFlushMaxActions() {
+        int maxActions = config.get(ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+        // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+        return maxActions == 0 ? -1 : maxActions;
+    }
+
+    public long getBulkFlushMaxByteSize() {
+        long maxSize =
+                config.get(ElasticsearchConnectorOptions.BULK_FLASH_MAX_SIZE_OPTION).getBytes();
+        // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+        return maxSize == 0 ? -1 : maxSize;
+    }
+
+    public long getBulkFlushInterval() {
+        long interval = config.get(BULK_FLUSH_INTERVAL_OPTION).toMillis();
+        // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+        return interval == 0 ? -1 : interval;

Review comment:
       Good catch, they are also checked and documented in the ElasticsearchSinkBuilder




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org