You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/09 02:18:08 UTC
[incubator-inlong] branch master updated: [INLONG-2401][Bug][InLong-Sort] Fix bugs in kafka sink unit tests (#2401) (#2402)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2d31cd2 [INLONG-2401][Bug][InLong-Sort] Fix bugs in kafka sink unit tests (#2401) (#2402)
2d31cd2 is described below
commit 2d31cd2674967965ccbb5363b0c4977a3bf0232d
Author: Kevin Wen <89...@users.noreply.github.com>
AuthorDate: Wed Feb 9 10:16:47 2022 +0800
[INLONG-2401][Bug][InLong-Sort] Fix bugs in kafka sink unit tests (#2401) (#2402)
---
inlong-sort/sort-single-tenant/pom.xml | 14 ++--
.../flink/kafka/KafkaSinkTestBase.java | 60 +++++++++++++----
.../flink/kafka/RowToJsonKafkaSinkTest.java | 11 +---
.../flink/kafka/RowToStringKafkaSinkTest.java | 11 +---
.../sort/singletenant/flink/utils/NetUtils.java | 77 ++++++++++++++++++++++
5 files changed, 136 insertions(+), 37 deletions(-)
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index b771490..4f32c57 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -54,12 +54,6 @@
</dependency>
<dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>sort-connectors</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<scope>provided</scope>
@@ -104,6 +98,14 @@
<scope>test</scope>
</dependency>
+ <!-- kafka sink tests will be failed without it -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.10.0</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${flink.scala.binary.version}</artifactId>
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
index aa075ea..e4ffcc8 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
@@ -31,6 +31,7 @@ import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.network.ListenerName;
@@ -42,11 +43,14 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.mutable.ArraySeq;
import java.io.IOException;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -59,9 +63,13 @@ import java.util.concurrent.TimeoutException;
import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
+import static org.apache.inlong.sort.singletenant.flink.utils.NetUtils.getUnusedLocalPort;
+import static org.junit.Assert.assertNull;
public abstract class KafkaSinkTestBase {
+ private static final Logger logger = LoggerFactory.getLogger(KafkaSinkTestBase.class);
+
@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();
@@ -84,13 +92,20 @@ public abstract class KafkaSinkTestBase {
@Before
public void setup() throws Exception {
prepareData();
+ logger.info("Prepare data passed.");
startZK();
+ logger.info("ZK started.");
startKafkaServer();
+ logger.info("Kafka server started.");
prepareKafkaClientProps();
+ logger.info("Kafka client properties prepared.");
kafkaAdmin = AdminClient.create(kafkaClientProperties);
+ logger.info("Kafka admin started.");
addTopic();
+ logger.info("Topic added to kafka server.");
kafkaConsumer = new KafkaConsumer<>(kafkaClientProperties);
+ logger.info("Kafka consumer started.");
}
private void startZK() throws Exception {
@@ -102,6 +117,7 @@ public abstract class KafkaSinkTestBase {
Properties kafkaProperties = new Properties();
final String KAFKA_HOST = "localhost";
kafkaProperties.put("advertised.host.name", KAFKA_HOST);
+ kafkaProperties.put("port", Integer.toString(getUnusedLocalPort(1024)));
kafkaProperties.put("broker.id", "1");
kafkaProperties.put("log.dir", tempFolder.newFolder().getAbsolutePath());
kafkaProperties.put("zookeeper.connect", zkServer.getConnectString());
@@ -120,6 +136,7 @@ public abstract class KafkaSinkTestBase {
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
)
);
+ logger.info("Kafka broker conn str = " + brokerConnStr);
}
private void prepareKafkaClientProps() {
@@ -149,17 +166,29 @@ public abstract class KafkaSinkTestBase {
@After
public void clean() throws IOException {
- kafkaConsumer.close();
- kafkaConsumer = null;
+ if (kafkaConsumer != null) {
+ kafkaConsumer.close();
+ kafkaConsumer = null;
+ logger.info("Kafka consumer closed.");
+ }
- kafkaAdmin.close();
- kafkaAdmin = null;
+ if (kafkaAdmin != null) {
+ kafkaAdmin.close();
+ kafkaAdmin = null;
+ logger.info("Kafka admin closed.");
+ }
- kafkaServer.shutdown();
- kafkaServer = null;
+ if (kafkaServer != null) {
+ kafkaServer.shutdown();
+ kafkaServer = null;
+ logger.info("Kafka server closed.");
+ }
- zkServer.close();
- zkServer = null;
+ if (zkServer != null) {
+ zkServer.close();
+ zkServer = null;
+ logger.info("ZK closed.");
+ }
}
@Test(timeout = 3 * 60 * 1000)
@@ -192,21 +221,30 @@ public abstract class KafkaSinkTestBase {
private void verify() throws InterruptedException {
kafkaConsumer.subscribe(Collections.singleton(topic));
+ List<String> results = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
- if (records.isEmpty() || records.count() != testRows.size()) {
+ if (!records.isEmpty()) {
+ for (ConsumerRecord<String, String> record : records) {
+ assertNull(record.key());
+ results.add(record.value());
+ }
+ }
+
+ if (results.size() != testRows.size()) {
//noinspection BusyWait
Thread.sleep(1000);
+ logger.info("for topic " + topic + ", record size = " + results.size());
continue;
}
- verifyData(records);
+ verifyData(results);
break;
}
}
- protected abstract void verifyData(ConsumerRecords<String, String> records);
+ protected abstract void verifyData(List<String> results);
private TestingSource createTestingSource() {
TestingSource testingSource = new TestingSource();
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
index f92025c..7bade20 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
@@ -26,8 +26,6 @@ import org.apache.inlong.sort.formats.common.MapFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.serialization.JsonSerializationInfo;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.HashMap;
@@ -35,7 +33,6 @@ import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
public class RowToJsonKafkaSinkTest extends KafkaSinkTestBase {
@Override
@@ -72,13 +69,7 @@ public class RowToJsonKafkaSinkTest extends KafkaSinkTestBase {
}
@Override
- protected void verifyData(ConsumerRecords<String, String> records) {
- List<String> results = new ArrayList<>();
- for (ConsumerRecord<String, String> record : records) {
- assertNull(record.key());
- results.add(record.value());
- }
-
+ protected void verifyData(List<String> results) {
List<String> expectedData = new ArrayList<>();
expectedData.add("{\"f1\":\"zhangsan\",\"f2\":{\"high\":170.5},\"f3\":[123]}");
expectedData.add("{\"f1\":\"lisi\",\"f2\":{\"high\":180.5},\"f3\":[1234]}");
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
index 3617571..fc10f1f 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
@@ -22,14 +22,11 @@ import org.apache.flink.types.Row;
import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
public class RowToStringKafkaSinkTest extends KafkaSinkTestBase {
@@ -56,13 +53,7 @@ public class RowToStringKafkaSinkTest extends KafkaSinkTestBase {
}
@Override
- protected void verifyData(ConsumerRecords<String, String> records) {
- List<String> results = new ArrayList<>();
- for (ConsumerRecord<String, String> record : records) {
- assertNull(record.key());
- results.add(record.value());
- }
-
+ protected void verifyData(List<String> results) {
List<String> expectedData = new ArrayList<>();
testRows.forEach(row -> expectedData.add(row.toString()));
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/NetUtils.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/NetUtils.java
new file mode 100644
index 0000000..66f53c4
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/NetUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.utils;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+public class NetUtils {
+ /**
+ * Looking for an unused local port starting from given port.
+ * If the given port is less than or equal to 0, then start from 1024.
+ *
+ * @param start the given start port
+ * @return an unused local port, -1 if not found
+ */
+ public static int getUnusedLocalPort(int start) {
+ for (int port = Math.max(start, 1024); port <= 65535; port++) {
+ if (!isLocalPortInUse(port)) {
+ return port;
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * Check whether the given port is in use at local.
+ *
+ * @param port port to be checked
+ * @return true if in use, otherwise false
+ */
+ public static boolean isLocalPortInUse(int port) {
+ boolean flag = true;
+ try {
+ flag = isPortInUse("127.0.0.1", port);
+ } catch (Exception ignored) {
+ // ignored
+ }
+ return flag;
+ }
+
+ /**
+ * Check whether the given port is in use online.
+ *
+ * @param host IP
+ * @param port port
+ * @return true if in use, otherwise false
+ * @throws UnknownHostException thrown if the given IP is unknown
+ */
+ public static boolean isPortInUse(String host, int port) throws UnknownHostException {
+ InetAddress theAddress = InetAddress.getByName(host);
+ try {
+ new Socket(theAddress, port);
+ return true;
+ } catch (IOException ignored) {
+ // ignored
+ }
+ return false;
+ }
+}