You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/22 11:43:35 UTC

[GitHub] [flink] fapaul commented on a change in pull request #17538: [FLINK-24325][connectors/elasticsearch] Create Elasticsearch 6 Sink

fapaul commented on a change in pull request #17538:
URL: https://github.com/apache/flink/pull/17538#discussion_r734446615



##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorTest.java
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.temporal.UnsupportedTemporalTypeException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** Suite tests for {@link IndexGenerator}. */
+public class IndexGeneratorTest {

Review comment:
       Why did you add this class on the `Remove ES 5` commit? Do we need it at all?

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
##########
@@ -77,42 +79,40 @@
     /**
      * Constructor creating an elasticsearch writer.
      *
-     * @param hosts the reachable elasticsearch cluster nodes
      * @param emitter converting incoming records to elasticsearch actions
      * @param flushOnCheckpoint if true all until now received records are flushed after every
      *     checkpoint
      * @param bulkProcessorConfig describing the flushing and failure handling of the used {@link
      *     BulkProcessor}
-     * @param networkClientConfig describing properties of the network connection used to connect to
-     *     the elasticsearch cluster
      * @param metricGroup for the sink writer
      * @param mailboxExecutor Flink's mailbox executor
      */
     public ElasticsearchWriter(
             List<HttpHost> hosts,
+            NetworkClientConfig networkClientConfig,

Review comment:
       I am not sure what happened here but the complete `networkClientConfig` parameter change seems unnecessary and you also deleted some of the docstrings. Please revert all the unnecessary changes.

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java
##########
@@ -66,39 +65,46 @@
 
 /** Tests for {@link ElasticsearchSink}. */
 @Testcontainers
-class ElasticsearchSinkITCase extends TestLogger {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkITCase.class);
+abstract class ElasticsearchSinkBaseITCase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBaseITCase.class);
     private static final String ELASTICSEARCH_PASSWORD = "test-password";
     private static final String ELASTICSEARCH_USER = "elastic";
 
     @Container
-    private static final ElasticsearchContainer ES_CONTAINER =
-            new ElasticsearchContainer(
-                            DockerImageName.parse(DockerImageVersions.ELASTICSEARCH_COMMERCIAL_7))
+    private final ElasticsearchContainer ES_CONTAINER =

Review comment:
       We can probably move the container to the subclasses and only expose a method to retrieve the HTTP host adresses

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
##########
@@ -51,50 +51,39 @@
  * </ul>
  *
  * @param <IN> type of the records converted to Elasticsearch actions
- * @see ElasticsearchSinkBuilder on how to construct a ElasticsearchSink
+ * @see ElasticsearchSinkBuilderBase on how to construct a ElasticsearchSink
  */
 @PublicEvolving
 public class ElasticsearchSink<IN> implements Sink<IN, Void, Void, Void> {
 
     private final List<HttpHost> hosts;
+    private final NetworkClientConfig networkClientConfig;
     private final ElasticsearchEmitter<? super IN> emitter;
     private final BulkProcessorConfig buildBulkProcessorConfig;
-    private final NetworkClientConfig networkClientConfig;
     private final DeliveryGuarantee deliveryGuarantee;
 
-    ElasticsearchSink(
+    public ElasticsearchSink(
             List<HttpHost> hosts,
+            NetworkClientConfig networkClientConfig,
             ElasticsearchEmitter<? super IN> emitter,
             DeliveryGuarantee deliveryGuarantee,
-            BulkProcessorConfig buildBulkProcessorConfig,
-            NetworkClientConfig networkClientConfig) {
-        this.hosts = checkNotNull(hosts);

Review comment:
       Why did you remove the host check? Overall I do not see a reason for this change.

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorTest.java
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;

Review comment:
       I start wondering whether we should move all connector table implementation into `org.apache.flink.connector.xxxx.table` because all connectors should support batch and streaming by now.

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
##########
@@ -209,38 +207,44 @@
      * Sets a prefix which used for every REST communication to the Elasticsearch cluster.
      *
      * @param prefix for the communication
-     * @return {@link ElasticsearchSinkBuilder}
+     * @return {@link ElasticsearchSinkBuilderBase}
      */
-    public ElasticsearchSinkBuilder<IN> setConnectionPathPrefix(String prefix) {
+    public ElasticsearchSinkBuilderBase<IN> setConnectionPathPrefix(String prefix) {
         checkNotNull(prefix);
         this.connectionPathPrefix = prefix;
         return this;
     }
 
-    /** @return {@link ElasticsearchSink} */
+    public abstract BulkRequestConsumerFactory getBulkRequestConsumer();

Review comment:
       Missing doc string.

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
##########
@@ -51,50 +51,39 @@
  * </ul>
  *
  * @param <IN> type of the records converted to Elasticsearch actions
- * @see ElasticsearchSinkBuilder on how to construct a ElasticsearchSink
+ * @see ElasticsearchSinkBuilderBase on how to construct a ElasticsearchSink
  */
 @PublicEvolving
 public class ElasticsearchSink<IN> implements Sink<IN, Void, Void, Void> {
 
     private final List<HttpHost> hosts;
+    private final NetworkClientConfig networkClientConfig;
     private final ElasticsearchEmitter<? super IN> emitter;
     private final BulkProcessorConfig buildBulkProcessorConfig;
-    private final NetworkClientConfig networkClientConfig;
     private final DeliveryGuarantee deliveryGuarantee;
 
-    ElasticsearchSink(
+    public ElasticsearchSink(

Review comment:
       Can be private.

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java
##########
@@ -21,13 +22,13 @@
 
 import java.io.Serializable;
 
-class NetworkClientConfig implements Serializable {
+public class NetworkClientConfig implements Serializable {

Review comment:
       Package-private?

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
##########
@@ -52,11 +53,12 @@
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.function.BiConsumer;
 
 import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-class ElasticsearchWriter<IN> implements SinkWriter<IN, Void, Void> {
+public class ElasticsearchWriter<IN> implements SinkWriter<IN, Void, Void> {

Review comment:
       package-Private?

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java
##########
@@ -66,39 +65,46 @@
 
 /** Tests for {@link ElasticsearchSink}. */
 @Testcontainers
-class ElasticsearchSinkITCase extends TestLogger {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkITCase.class);
+abstract class ElasticsearchSinkBaseITCase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBaseITCase.class);
     private static final String ELASTICSEARCH_PASSWORD = "test-password";
     private static final String ELASTICSEARCH_USER = "elastic";
 
     @Container
-    private static final ElasticsearchContainer ES_CONTAINER =
-            new ElasticsearchContainer(
-                            DockerImageName.parse(DockerImageVersions.ELASTICSEARCH_COMMERCIAL_7))
+    private final ElasticsearchContainer ES_CONTAINER =
+            new ElasticsearchContainer(DockerImageName.parse(getElasticsearchImageName()))
                     .withPassword(ELASTICSEARCH_PASSWORD)
-                    .withLogConsumer(new Slf4jLogConsumer(LOG));
+                    .withLogConsumer(new Slf4jLogConsumer(LOG));;

Review comment:
       Remove `;`

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/BulkRequestConsumerFactory.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.io.Serializable;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+public interface BulkRequestConsumerFactory

Review comment:
       I guess can be private and I am missing a docstring here.

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/BulkProcessorConfig.java
##########
@@ -21,28 +22,31 @@
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-class BulkProcessorConfig implements Serializable {
+public class BulkProcessorConfig implements Serializable {

Review comment:
       Does this need to be public? Please be careful because users might start using these classes although they are internal.

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java
##########
@@ -66,39 +65,46 @@
 
 /** Tests for {@link ElasticsearchSink}. */
 @Testcontainers
-class ElasticsearchSinkITCase extends TestLogger {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkITCase.class);
+abstract class ElasticsearchSinkBaseITCase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBaseITCase.class);
     private static final String ELASTICSEARCH_PASSWORD = "test-password";
     private static final String ELASTICSEARCH_USER = "elastic";
 
     @Container
-    private static final ElasticsearchContainer ES_CONTAINER =
-            new ElasticsearchContainer(
-                            DockerImageName.parse(DockerImageVersions.ELASTICSEARCH_COMMERCIAL_7))
+    private final ElasticsearchContainer ES_CONTAINER =

Review comment:
       Hmm not sure we want to do it like this because this implies we have to restart the container for every test which can be very time-consuming.

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
##########
@@ -100,13 +100,10 @@ void configureBulkProcessorBackoff(
      * Creates a {@link RequestIndexer} that is able to work with {@link BulkProcessor} binary
      * compatible.
      */
-    default RequestIndexer createBulkProcessorIndexer(
+    RequestIndexer createBulkProcessorIndexer(

Review comment:
       Since we have dropped elasticsearch 5 we should also drop classes in `elasticsearch-base` which are no longer needed.

##########
File path: flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch6SinkITCase.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.elasticsearch.client.RestHighLevelClient;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+/** Tests for {@link ElasticsearchSink}. */
+@Testcontainers

Review comment:
       I guess we this annotation because the annotation of the parent class is not inherited?

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java
##########
@@ -66,39 +65,46 @@
 
 /** Tests for {@link ElasticsearchSink}. */
 @Testcontainers
-class ElasticsearchSinkITCase extends TestLogger {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkITCase.class);
+abstract class ElasticsearchSinkBaseITCase extends TestLogger {

Review comment:
       Since we are using JUnit 5 for these tests `TestLogger` will not work. You can use the `TestLoggerExtension` and take a look at the `SourceTestSuiteBase` class how to use it.

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
##########
@@ -77,42 +79,40 @@
     /**
      * Constructor creating an elasticsearch writer.
      *
-     * @param hosts the reachable elasticsearch cluster nodes
      * @param emitter converting incoming records to elasticsearch actions
      * @param flushOnCheckpoint if true all until now received records are flushed after every
      *     checkpoint
      * @param bulkProcessorConfig describing the flushing and failure handling of the used {@link
      *     BulkProcessor}
-     * @param networkClientConfig describing properties of the network connection used to connect to
-     *     the elasticsearch cluster
      * @param metricGroup for the sink writer
      * @param mailboxExecutor Flink's mailbox executor
      */
     public ElasticsearchWriter(
             List<HttpHost> hosts,
+            NetworkClientConfig networkClientConfig,
             ElasticsearchEmitter<? super IN> emitter,
             boolean flushOnCheckpoint,
             BulkProcessorConfig bulkProcessorConfig,
-            NetworkClientConfig networkClientConfig,
             SinkWriterMetricGroup metricGroup,
             MailboxExecutor mailboxExecutor) {
         this.emitter = checkNotNull(emitter);
         this.flushOnCheckpoint = flushOnCheckpoint;
         this.mailboxExecutor = checkNotNull(mailboxExecutor);
+
         this.client =
                 new RestHighLevelClient(
                         configureRestClientBuilder(
                                 RestClient.builder(hosts.toArray(new HttpHost[0])),
                                 networkClientConfig));
+
+        BulkRequestConsumerFactory bulkRequestConsumerFactory =
+                bulkProcessorConfig.getBulkRequestConsumer();
+        BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkRequestConsumer =
+                bulkRequestConsumerFactory.apply(client);

Review comment:
       I think you can inline the calls here.

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
##########
@@ -24,62 +24,56 @@
 
 import org.apache.http.HttpHost;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
-/** Tests for {@link ElasticsearchSinkBuilder}. */
-class ElasticsearchSinkBuilderTest extends TestLogger {
+/** Tests for {@link ElasticsearchSinkBuilderBase}. */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)

Review comment:
       Why do we need this?

##########
File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/BulkProcessorConfig.java
##########
@@ -68,4 +72,8 @@ public int getBulkFlushBackoffRetries() {
     public long getBulkFlushBackOffDelay() {
         return bulkFlushBackOffDelay;
     }
+
+    public BulkRequestConsumerFactory getBulkRequestConsumer() {

Review comment:
       Nit: rename to `getBulkRequestConsumerFactory` and also apply it to the parameter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org