You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/07 07:13:31 UTC

[GitHub] [flink] fapaul commented on a change in pull request #17374: [FLINK-24327][connectors/elasticsearch] Add Elasticsearch 7 sink for table API

fapaul commented on a change in pull request #17374:
URL: https://github.com/apache/flink/pull/17374#discussion_r723896941



##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java
##########
@@ -109,6 +115,27 @@ public int getIndex() {
                 .orElseGet(() -> (Function<RowData, String> & Serializable) (row) -> null);
     }
 
+    public static Function<RowData, String> createKeyExtractor(
+            ResolvedSchema resolvedSchema, String keyDelimiter) {
+        Optional<UniqueConstraint> primaryKey = resolvedSchema.getPrimaryKey();
+        if (primaryKey.isPresent()) {
+            int[] primaryKeyIndexes = resolvedSchema.getPrimaryKeyIndexes();
+
+            List<FieldFormatter> formatters = new ArrayList<>();
+            for (int index : primaryKeyIndexes) {
+                //noinspection OptionalGetWithoutIsPresent

Review comment:
       To suppress warnings in Flink we use the `@SuppressWarnings()` annotation rather than these type of comments.

##########
File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
##########
@@ -54,54 +44,20 @@
  */
 @Internal
 final class Elasticsearch7DynamicSink implements DynamicTableSink {
-    @VisibleForTesting
-    static final Elasticsearch7RequestFactory REQUEST_FACTORY =
-            new Elasticsearch7DynamicSink.Elasticsearch7RequestFactory();
 
     private final EncodingFormat<SerializationSchema<RowData>> format;
-    private final TableSchema schema;
+    private final DynamicTableFactory.Context factoryContext;
     private final Elasticsearch7Configuration config;
 
-    public Elasticsearch7DynamicSink(
-            EncodingFormat<SerializationSchema<RowData>> format,
-            Elasticsearch7Configuration config,
-            TableSchema schema) {
-        this(format, config, schema, (ElasticsearchSink.Builder::new));
-    }
-
-    // --------------------------------------------------------------
-    // Hack to make configuration testing possible.
-    //
-    // The code in this block should never be used outside of tests.
-    // Having a way to inject a builder we can assert the builder in
-    // the test. We can not assert everything though, e.g. it is not
-    // possible to assert flushing on checkpoint, as it is configured
-    // on the sink itself.
-    // --------------------------------------------------------------
-
-    private final ElasticSearchBuilderProvider builderProvider;
-
-    @FunctionalInterface
-    interface ElasticSearchBuilderProvider {
-        ElasticsearchSink.Builder<RowData> createBuilder(
-                List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
-    }
-
     Elasticsearch7DynamicSink(
             EncodingFormat<SerializationSchema<RowData>> format,
             Elasticsearch7Configuration config,
-            TableSchema schema,
-            ElasticSearchBuilderProvider builderProvider) {
+            DynamicTableFactory.Context factoryContext) {

Review comment:
       If we only pass the information about the primaryKey we do not need to pass the whole context but only the extracted row data type. (`factoryContext#getPhysicalRowDataType`)

##########
File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
##########
@@ -114,188 +70,65 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
     }
 
     @Override
-    public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
-        return () -> {
-            SerializationSchema<RowData> format =
-                    this.format.createRuntimeEncoder(context, schema.toRowDataType());
-
-            final RowElasticsearchSinkFunction upsertFunction =
-                    new RowElasticsearchSinkFunction(
-                            IndexGeneratorFactory.createIndexGenerator(config.getIndex(), schema),
-                            null, // this is deprecated in es 7+
-                            format,
-                            XContentType.JSON,
-                            REQUEST_FACTORY,
-                            KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()));
-
-            final ElasticsearchSink.Builder<RowData> builder =
-                    builderProvider.createBuilder(config.getHosts(), upsertFunction);
-
-            builder.setFailureHandler(config.getFailureHandler());
-            builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
-            builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
-            builder.setBulkFlushInterval(config.getBulkFlushInterval());
-            builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
-            config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
-            config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
-            config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
-
-            // we must overwrite the default factory which is defined with a lambda because of a bug
-            // in shading lambda serialization shading see FLINK-18006
-            if (config.getUsername().isPresent()
-                    && config.getPassword().isPresent()
-                    && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
-                    && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
-                builder.setRestClientFactory(
-                        new AuthRestClientFactory(
-                                config.getPathPrefix().orElse(null),
-                                config.getUsername().get(),
-                                config.getPassword().get()));
-            } else {
-                builder.setRestClientFactory(
-                        new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
-            }
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        DataType dataType = factoryContext.getPhysicalRowDataType();
+        SerializationSchema<RowData> format = this.format.createRuntimeEncoder(context, dataType);
+
+        final RowElasticsearchEmitter rowElasticsearchEmitter =
+                new RowElasticsearchEmitter(
+                        IndexGeneratorFactory.createIndexGenerator(
+                                config.getIndex(),
+                                DataType.getFieldNames(dataType),
+                                DataType.getFieldDataTypes(dataType)),
+                        format,
+                        XContentType.JSON,
+                        KeyExtractor.createKeyExtractor(

Review comment:
       I think we can further reduce the complexity here. We already extract the primary key information in `Elasticsearch7DynamicSinkFactory`. If a key is present we can pass the `LogicalType` directly to this class. `DataType#getLogicalType()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org