You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/09 08:11:21 UTC

[incubator-inlong] 02/03: [INLONG-2401][Bug][InLong-Sort] Fix bugs in kafka sink unit tests (#2401) (#2402)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.0.0
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit e85c72ec71e55e3fbe2a28c4c44b52d2da7d1846
Author: Kevin Wen <89...@users.noreply.github.com>
AuthorDate: Wed Feb 9 10:16:47 2022 +0800

    [INLONG-2401][Bug][InLong-Sort] Fix bugs in kafka sink unit tests (#2401) (#2402)
---
 inlong-sort/sort-single-tenant/pom.xml             | 14 ++--
 .../flink/kafka/KafkaSinkTestBase.java             | 60 +++++++++++++----
 .../flink/kafka/RowToJsonKafkaSinkTest.java        | 11 +---
 .../flink/kafka/RowToStringKafkaSinkTest.java      | 11 +---
 .../sort/singletenant/flink/utils/NetUtils.java    | 77 ++++++++++++++++++++++
 5 files changed, 136 insertions(+), 37 deletions(-)

diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index dce0bb0..7448a3f 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -54,12 +54,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-connectors</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-core</artifactId>
             <scope>provided</scope>
@@ -104,6 +98,14 @@
             <scope>test</scope>
         </dependency>
 
+        <!-- kafka sink tests will be failed without it -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.10.0</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-test-utils_${flink.scala.binary.version}</artifactId>
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
index aa075ea..e4ffcc8 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
@@ -31,6 +31,7 @@ import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
 import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.network.ListenerName;
@@ -42,11 +43,14 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Option;
 import scala.collection.mutable.ArraySeq;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -59,9 +63,13 @@ import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
 import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
+import static org.apache.inlong.sort.singletenant.flink.utils.NetUtils.getUnusedLocalPort;
+import static org.junit.Assert.assertNull;
 
 public abstract class KafkaSinkTestBase {
 
+    private static final Logger logger = LoggerFactory.getLogger(KafkaSinkTestBase.class);
+
     @ClassRule
     public static TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -84,13 +92,20 @@ public abstract class KafkaSinkTestBase {
     @Before
     public void setup() throws Exception {
         prepareData();
+        logger.info("Prepare data passed.");
 
         startZK();
+        logger.info("ZK started.");
         startKafkaServer();
+        logger.info("Kafka server started.");
         prepareKafkaClientProps();
+        logger.info("Kafka client properties prepared.");
         kafkaAdmin = AdminClient.create(kafkaClientProperties);
+        logger.info("Kafka admin started.");
         addTopic();
+        logger.info("Topic added to kafka server.");
         kafkaConsumer = new KafkaConsumer<>(kafkaClientProperties);
+        logger.info("Kafka consumer started.");
     }
 
     private void startZK() throws Exception {
@@ -102,6 +117,7 @@ public abstract class KafkaSinkTestBase {
         Properties kafkaProperties = new Properties();
         final String KAFKA_HOST = "localhost";
         kafkaProperties.put("advertised.host.name", KAFKA_HOST);
+        kafkaProperties.put("port", Integer.toString(getUnusedLocalPort(1024)));
         kafkaProperties.put("broker.id", "1");
         kafkaProperties.put("log.dir", tempFolder.newFolder().getAbsolutePath());
         kafkaProperties.put("zookeeper.connect", zkServer.getConnectString());
@@ -120,6 +136,7 @@ public abstract class KafkaSinkTestBase {
                         ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
                 )
         );
+        logger.info("Kafka broker conn str = " + brokerConnStr);
     }
 
     private void prepareKafkaClientProps() {
@@ -149,17 +166,29 @@ public abstract class KafkaSinkTestBase {
 
     @After
     public void clean() throws IOException {
-        kafkaConsumer.close();
-        kafkaConsumer = null;
+        if (kafkaConsumer != null) {
+            kafkaConsumer.close();
+            kafkaConsumer = null;
+            logger.info("Kafka consumer closed.");
+        }
 
-        kafkaAdmin.close();
-        kafkaAdmin = null;
+        if (kafkaAdmin != null) {
+            kafkaAdmin.close();
+            kafkaAdmin = null;
+            logger.info("Kafka admin closed.");
+        }
 
-        kafkaServer.shutdown();
-        kafkaServer = null;
+        if (kafkaServer != null) {
+            kafkaServer.shutdown();
+            kafkaServer = null;
+            logger.info("Kafka server closed.");
+        }
 
-        zkServer.close();
-        zkServer = null;
+        if (zkServer != null) {
+            zkServer.close();
+            zkServer = null;
+            logger.info("ZK closed.");
+        }
     }
 
     @Test(timeout = 3 * 60 * 1000)
@@ -192,21 +221,30 @@ public abstract class KafkaSinkTestBase {
 
     private void verify() throws InterruptedException {
         kafkaConsumer.subscribe(Collections.singleton(topic));
+        List<String> results = new ArrayList<>();
         while (true) {
             ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
-            if (records.isEmpty() || records.count() != testRows.size()) {
+            if (!records.isEmpty()) {
+                for (ConsumerRecord<String, String> record : records) {
+                    assertNull(record.key());
+                    results.add(record.value());
+                }
+            }
+
+            if (results.size() != testRows.size()) {
                 //noinspection BusyWait
                 Thread.sleep(1000);
+                logger.info("for topic " + topic + ", record size = " + results.size());
                 continue;
             }
 
-            verifyData(records);
+            verifyData(results);
 
             break;
         }
     }
 
-    protected abstract void verifyData(ConsumerRecords<String, String> records);
+    protected abstract void verifyData(List<String> results);
 
     private TestingSource createTestingSource() {
         TestingSource testingSource = new TestingSource();
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
index f92025c..7bade20 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
@@ -26,8 +26,6 @@ import org.apache.inlong.sort.formats.common.MapFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.serialization.JsonSerializationInfo;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -35,7 +33,6 @@ import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 
 public class RowToJsonKafkaSinkTest extends KafkaSinkTestBase {
     @Override
@@ -72,13 +69,7 @@ public class RowToJsonKafkaSinkTest extends KafkaSinkTestBase {
     }
 
     @Override
-    protected void verifyData(ConsumerRecords<String, String> records) {
-        List<String> results = new ArrayList<>();
-        for (ConsumerRecord<String, String> record : records) {
-            assertNull(record.key());
-            results.add(record.value());
-        }
-
+    protected void verifyData(List<String> results) {
         List<String> expectedData = new ArrayList<>();
         expectedData.add("{\"f1\":\"zhangsan\",\"f2\":{\"high\":170.5},\"f3\":[123]}");
         expectedData.add("{\"f1\":\"lisi\",\"f2\":{\"high\":180.5},\"f3\":[1234]}");
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
index 3617571..fc10f1f 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
@@ -22,14 +22,11 @@ import org.apache.flink.types.Row;
 import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 
 import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 
 public class RowToStringKafkaSinkTest extends KafkaSinkTestBase {
 
@@ -56,13 +53,7 @@ public class RowToStringKafkaSinkTest extends KafkaSinkTestBase {
     }
 
     @Override
-    protected void verifyData(ConsumerRecords<String, String> records) {
-        List<String> results = new ArrayList<>();
-        for (ConsumerRecord<String, String> record : records) {
-            assertNull(record.key());
-            results.add(record.value());
-        }
-
+    protected void verifyData(List<String> results) {
         List<String> expectedData = new ArrayList<>();
         testRows.forEach(row -> expectedData.add(row.toString()));
 
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/NetUtils.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/NetUtils.java
new file mode 100644
index 0000000..66f53c4
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/NetUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.utils;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+public class NetUtils {
+    /**
+     * Looking for an unused local port starting from given port.
+     * If the given port is less than or equal to 0, then start from 1024.
+     *
+     * @param start the given start port
+     * @return an unused local port, -1 if not found
+     */
+    public static int getUnusedLocalPort(int start) {
+        for (int port = Math.max(start, 1024); port <= 65535; port++) {
+            if (!isLocalPortInUse(port)) {
+                return port;
+            }
+        }
+        return -1;
+    }
+
+    /**
+     * Check whether the given port is in use at local.
+     *
+     * @param port port to be checked
+     * @return true if in use, otherwise false
+     */
+    public static boolean isLocalPortInUse(int port) {
+        boolean flag = true;
+        try {
+            flag = isPortInUse("127.0.0.1", port);
+        } catch (Exception ignored) {
+            // ignored
+        }
+        return flag;
+    }
+
+    /**
+     * Check whether the given port is in use online.
+     *
+     * @param host IP
+     * @param port port
+     * @return true if in use, otherwise false
+     * @throws UnknownHostException thrown if the given IP is unknown
+     */
+    public static boolean isPortInUse(String host, int port) throws UnknownHostException {
+        InetAddress theAddress = InetAddress.getByName(host);
+        try {
+            new Socket(theAddress, port);
+            return true;
+        } catch (IOException ignored) {
+            // ignored
+        }
+        return false;
+    }
+}