You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/14 16:14:37 UTC

[GitHub] [flink] AHeise commented on a change in pull request #17374: [FLINK-24327][connectors/elasticsearch] Add Elasticsearch 7 sink for table API

AHeise commented on a change in pull request #17374:
URL: https://github.com/apache/flink/pull/17374#discussion_r729115471



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
##########
@@ -95,11 +95,11 @@
          *
          * <pre>{@code
          * // Project some fields into a new data type
-         * DataType projectedDataType = DataType.projectRow(
+         * DataType projectedDataType = DataType.projectFields(

Review comment:
       Just FYI, it's more common to see hotfixes before the main commits.

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchValidationUtils.java
##########
@@ -93,5 +95,39 @@ public static void validatePrimaryKey(TableSchema schema) {
                         });
     }
 
+    /**
+     * Checks that the table does not have a primary key defined on illegal types. In Elasticsearch
+     * the primary key is used to calculate the Elasticsearch document id, which is a string of up
+     * to 512 bytes. It cannot have whitespaces. As of now it is calculated by concatenating the
+     * fields. Certain types do not have a good string representation to be used in this scenario.
+     * The illegal types are mostly {@link LogicalTypeFamily#COLLECTION} types and {@link
+     * LogicalTypeRoot#RAW} type.
+     */
+    public static void validatePrimaryKey(DataType primaryKeyDataType) {
+        List<DataType> fieldDataTypes = DataType.getFieldDataTypes(primaryKeyDataType);
+        List<LogicalTypeRoot> illegalTypes =
+                fieldDataTypes.stream()
+                        .map(DataType::getLogicalType)
+                        .map(
+                                logicalType -> {
+                                    if (hasRoot(logicalType, LogicalTypeRoot.DISTINCT_TYPE)) {
+                                        return ((DistinctType) logicalType)
+                                                .getSourceType()
+                                                .getTypeRoot();
+                                    } else {
+                                        return logicalType.getTypeRoot();
+                                    }
+                                })
+                        .filter(ILLEGAL_PRIMARY_KEY_TYPES::contains)

Review comment:
       Wouldn't it make more sense to use a white list instead of a black list?

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java
##########
@@ -109,6 +110,23 @@ public int getIndex() {
                 .orElseGet(() -> (Function<RowData, String> & Serializable) (row) -> null);
     }
 
+    public static Function<RowData, String> createKeyExtractor(
+            List<LogicalTypeWithIndex> primaryKeyTypesWithIndex, String keyDelimiter) {
+        if (!primaryKeyTypesWithIndex.isEmpty()) {
+            FieldFormatter[] formatters =
+                    primaryKeyTypesWithIndex.stream()
+                            .map(
+                                    logicalTypeWithIndex ->
+                                            toFormatter(
+                                                    logicalTypeWithIndex.index,
+                                                    logicalTypeWithIndex.logicalType))
+                            .toArray(FieldFormatter[]::new);
+            return new KeyExtractor(formatters, keyDelimiter);
+        } else {
+            return (Function<RowData, String> & Serializable) (row) -> null;

Review comment:
       This is a bit awkward. I'd either define a SerializableFunction interface, have a no-op KeyExtractor subclass, or even go towards constructing the function at runtime and only ship the primary key.

##########
File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7ConnectorOptions.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+
+import java.time.Duration;
+import java.util.List;
+
+/** Options for the Elasticsearch connector. */
+public class Elasticsearch7ConnectorOptions {

Review comment:
       Is this really resolved?

##########
File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
##########
@@ -19,89 +19,50 @@
 package org.apache.flink.streaming.connectors.elasticsearch.table;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
-import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;
+import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBuilder;
+import org.apache.flink.connector.elasticsearch.sink.FlushBackoffType;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.SinkProvider;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
 
-import javax.annotation.Nullable;
-
 import java.util.List;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@link DynamicTableSink} that describes how to create a {@link ElasticsearchSink} from a
  * logical description.
  */
 @Internal
 final class Elasticsearch7DynamicSink implements DynamicTableSink {
-    @VisibleForTesting
-    static final Elasticsearch7RequestFactory REQUEST_FACTORY =
-            new Elasticsearch7DynamicSink.Elasticsearch7RequestFactory();
 
     private final EncodingFormat<SerializationSchema<RowData>> format;
-    private final TableSchema schema;
+    private final DataType physicalRowDataType;
+    private final List<LogicalTypeWithIndex> primaryKeyLogicalTypesWithIndex;
     private final Elasticsearch7Configuration config;
 
-    public Elasticsearch7DynamicSink(
-            EncodingFormat<SerializationSchema<RowData>> format,
-            Elasticsearch7Configuration config,
-            TableSchema schema) {
-        this(format, config, schema, (ElasticsearchSink.Builder::new));
-    }
-
-    // --------------------------------------------------------------
-    // Hack to make configuration testing possible.
-    //
-    // The code in this block should never be used outside of tests.
-    // Having a way to inject a builder we can assert the builder in
-    // the test. We can not assert everything though, e.g. it is not
-    // possible to assert flushing on checkpoint, as it is configured
-    // on the sink itself.
-    // --------------------------------------------------------------
-
-    private final ElasticSearchBuilderProvider builderProvider;
-
-    @FunctionalInterface
-    interface ElasticSearchBuilderProvider {
-        ElasticsearchSink.Builder<RowData> createBuilder(
-                List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
-    }
-
     Elasticsearch7DynamicSink(
             EncodingFormat<SerializationSchema<RowData>> format,
-            Elasticsearch7Configuration config,
-            TableSchema schema,
-            ElasticSearchBuilderProvider builderProvider) {
-        this.format = format;
-        this.schema = schema;
-        this.config = config;
-        this.builderProvider = builderProvider;
+            DataType physicalRowDataType,
+            List<LogicalTypeWithIndex> primaryKeyLogicalTypesWithIndex,
+            Elasticsearch7Configuration config) {

Review comment:
       Move up to align with old signature?

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchValidationUtils.java
##########
@@ -93,5 +95,39 @@ public static void validatePrimaryKey(TableSchema schema) {
                         });
     }
 
+    /**
+     * Checks that the table does not have a primary key defined on illegal types. In Elasticsearch
+     * the primary key is used to calculate the Elasticsearch document id, which is a string of up
+     * to 512 bytes. It cannot have whitespaces. As of now it is calculated by concatenating the
+     * fields. Certain types do not have a good string representation to be used in this scenario.
+     * The illegal types are mostly {@link LogicalTypeFamily#COLLECTION} types and {@link
+     * LogicalTypeRoot#RAW} type.
+     */
+    public static void validatePrimaryKey(DataType primaryKeyDataType) {
+        List<DataType> fieldDataTypes = DataType.getFieldDataTypes(primaryKeyDataType);
+        List<LogicalTypeRoot> illegalTypes =
+                fieldDataTypes.stream()
+                        .map(DataType::getLogicalType)
+                        .map(
+                                logicalType -> {
+                                    if (hasRoot(logicalType, LogicalTypeRoot.DISTINCT_TYPE)) {
+                                        return ((DistinctType) logicalType)
+                                                .getSourceType()
+                                                .getTypeRoot();
+                                    } else {
+                                        return logicalType.getTypeRoot();
+                                    }

Review comment:
       This looks awkward to me and I see quite a few instances of 
   ```
      public static boolean hasWellDefinedString(LogicalType logicalType) {
           if (logicalType instanceof DistinctType) {
               return hasWellDefinedString(((DistinctType) logicalType).getSourceType());
           }
    ```
    
    Should `DataType::getLogicalType` maybe resolve DistinctType implicitly? @twalthr 

##########
File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
##########
@@ -114,188 +75,64 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
     }
 
     @Override
-    public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
-        return () -> {
-            SerializationSchema<RowData> format =
-                    this.format.createRuntimeEncoder(context, schema.toRowDataType());
-
-            final RowElasticsearchSinkFunction upsertFunction =
-                    new RowElasticsearchSinkFunction(
-                            IndexGeneratorFactory.createIndexGenerator(config.getIndex(), schema),
-                            null, // this is deprecated in es 7+
-                            format,
-                            XContentType.JSON,
-                            REQUEST_FACTORY,
-                            KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()));
-
-            final ElasticsearchSink.Builder<RowData> builder =
-                    builderProvider.createBuilder(config.getHosts(), upsertFunction);
-
-            builder.setFailureHandler(config.getFailureHandler());
-            builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
-            builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
-            builder.setBulkFlushInterval(config.getBulkFlushInterval());
-            builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
-            config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
-            config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
-            config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
-
-            // we must overwrite the default factory which is defined with a lambda because of a bug
-            // in shading lambda serialization shading see FLINK-18006
-            if (config.getUsername().isPresent()
-                    && config.getPassword().isPresent()
-                    && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
-                    && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
-                builder.setRestClientFactory(
-                        new AuthRestClientFactory(
-                                config.getPathPrefix().orElse(null),
-                                config.getUsername().get(),
-                                config.getPassword().get()));
-            } else {
-                builder.setRestClientFactory(
-                        new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
-            }
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        SerializationSchema<RowData> format =
+                this.format.createRuntimeEncoder(context, physicalRowDataType);
+
+        final RowElasticsearchEmitter rowElasticsearchEmitter =
+                new RowElasticsearchEmitter(
+                        IndexGeneratorFactory.createIndexGenerator(
+                                config.getIndex(),
+                                DataType.getFieldNames(physicalRowDataType),
+                                DataType.getFieldDataTypes(physicalRowDataType)),
+                        format,
+                        XContentType.JSON,
+                        KeyExtractor.createKeyExtractor(
+                                primaryKeyLogicalTypesWithIndex, config.getKeyDelimiter()));
+
+        final ElasticsearchSinkBuilder<RowData> builder = ElasticsearchSink.builder();
+        builder.setEmitter(rowElasticsearchEmitter);
+        builder.setHosts(config.getHosts().toArray(new HttpHost[0]));
+        builder.setDeliveryGuarantee(config.getDeliveryGuarantee());
+        builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+        builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
+        builder.setBulkFlushInterval(config.getBulkFlushInterval());
+
+        if (config.getBulkFlushBackoffType().isPresent()) {
+            FlushBackoffType backoffType = config.getBulkFlushBackoffType().get();
+            int backoffMaxRetries = config.getBulkFlushBackoffRetries().get();
+            long backoffDelayMs = config.getBulkFlushBackoffDelay().get();
+
+            builder.setBulkFlushBackoffStrategy(backoffType, backoffMaxRetries, backoffDelayMs);
+        }
 
-            final ElasticsearchSink<RowData> sink = builder.build();
+        if (config.getUsername().isPresent()
+                && config.getPassword().isPresent()
+                && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
+                && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
+            builder.setConnectionPassword(config.getPassword().get());
+            builder.setConnectionUsername(config.getUsername().get());
+        }

Review comment:
       I'd set username and password independently. Afaik in the ES docker image, you can set only password. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org