You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/01/11 07:56:26 UTC

[flink] 01/03: [FLINK-25266][tests] Consolidate Kafka testutils to make them part of the test jar

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b5ade691459fcbdadafc7dfa959495d84c78626d
Author: Fabian Paul <fa...@ververica.com>
AuthorDate: Tue Dec 14 16:01:02 2021 +0100

    [FLINK-25266][tests] Consolidate Kafka testutils to make them part of the test jar
---
 .../kafka/sink/FlinkKafkaInternalProducerITCase.java    |  2 +-
 .../flink/connector/kafka/sink/KafkaSinkITCase.java     |  3 ++-
 .../flink/connector/kafka/sink/KafkaTransactionLog.java |  2 +-
 .../connector/kafka/sink/KafkaTransactionLogITCase.java |  2 +-
 .../flink/connector/kafka/sink/KafkaWriterITCase.java   |  4 ++--
 .../flink/connector/kafka/source/KafkaSourceITCase.java |  6 +++---
 .../kafka/source/enumerator/KafkaEnumeratorTest.java    |  2 +-
 .../enumerator/initializer/OffsetsInitializerTest.java  |  2 +-
 .../enumerator/subscriber/KafkaSubscriberTest.java      |  2 +-
 .../source/reader/KafkaPartitionSplitReaderTest.java    |  4 ++--
 .../kafka/source/reader/KafkaSourceReaderTest.java      |  4 ++--
 .../testutils/KafkaMultipleTopicExternalContext.java    |  2 +-
 .../testutils/KafkaPartitionDataWriter.java             |  2 +-
 .../testutils/KafkaSingleTopicExternalContext.java      |  2 +-
 .../{source => }/testutils/KafkaSourceTestEnv.java      |  2 +-
 .../connector/kafka/{sink => testutils}/KafkaUtil.java  | 17 +++++++++--------
 .../connectors/kafka/KafkaTestEnvironmentImpl.java      |  2 +-
 .../flink/tests/util/kafka/KafkaSourceE2ECase.java      |  4 ++--
 18 files changed, 33 insertions(+), 31 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
index ca2c6b7..18f902f 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
@@ -46,7 +46,7 @@ import java.util.Properties;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.connector.kafka.sink.KafkaUtil.createKafkaContainer;
+import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
 import static org.apache.flink.util.DockerImageVersions.KAFKA;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index f1c41ca..2a35112 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.testutils.KafkaUtil;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -94,7 +95,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
-import static org.apache.flink.connector.kafka.sink.KafkaUtil.createKafkaContainer;
+import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
 import static org.apache.flink.util.DockerImageVersions.KAFKA;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.hasItems;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java
index c6f715f..440d785 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java
@@ -33,7 +33,7 @@ import java.util.Properties;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.connector.kafka.sink.KafkaUtil.drainAllRecordsFromTopic;
+import static org.apache.flink.connector.kafka.testutils.KafkaUtil.drainAllRecordsFromTopic;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME;
 
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java
index e42ba95..b20adb6 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java
@@ -44,7 +44,7 @@ import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.Transact
 import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.Ongoing;
 import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.PrepareAbort;
 import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.PrepareCommit;
-import static org.apache.flink.connector.kafka.sink.KafkaUtil.createKafkaContainer;
+import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
 import static org.apache.flink.util.DockerImageVersions.KAFKA;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index 04b4de4..f646080 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -63,8 +63,8 @@ import java.util.PriorityQueue;
 import java.util.Properties;
 import java.util.stream.IntStream;
 
-import static org.apache.flink.connector.kafka.sink.KafkaUtil.createKafkaContainer;
-import static org.apache.flink.connector.kafka.sink.KafkaUtil.drainAllRecordsFromTopic;
+import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
+import static org.apache.flink.connector.kafka.testutils.KafkaUtil.drainAllRecordsFromTopic;
 import static org.apache.flink.util.DockerImageVersions.KAFKA;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index 9b5496f..6bf376d 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -26,9 +26,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
-import org.apache.flink.connector.kafka.source.testutils.KafkaMultipleTopicExternalContext;
-import org.apache.flink.connector.kafka.source.testutils.KafkaSingleTopicExternalContext;
-import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
+import org.apache.flink.connector.kafka.testutils.KafkaMultipleTopicExternalContext;
+import org.apache.flink.connector.kafka.testutils.KafkaSingleTopicExternalContext;
+import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv;
 import org.apache.flink.connectors.test.common.environment.MiniClusterTestEnvironment;
 import org.apache.flink.connectors.test.common.external.DefaultContainerizedExternalSystem;
 import org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index 6097cbf..9216172 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStopping
 import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
-import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
+import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv;
 import org.apache.flink.mock.Whitebox;
 
 import org.apache.kafka.clients.admin.AdminClient;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
index 618a363..6c696f0 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.kafka.source.enumerator.initializer;
 
 import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
-import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
+import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv;
 
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
index fc0f7de..ea69a40 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.kafka.source.enumerator.subscriber;
 
-import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
+import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.kafka.clients.admin.AdminClient;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index 7aecc33..a34ed51 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
-import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
+import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -68,7 +68,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
+import static org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index d5c9abb..fb494ae 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.connector.kafka.source.KafkaSourceTestUtils;
 import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
-import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
+import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv;
 import org.apache.flink.connector.testutils.source.reader.SourceReaderTestBase;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
@@ -78,7 +78,7 @@ import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderM
 import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.KAFKA_SOURCE_READER_METRIC_GROUP;
 import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.PARTITION_GROUP;
 import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.TOPIC_GROUP;
-import static org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv.NUM_PARTITIONS;
+import static org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv.NUM_PARTITIONS;
 import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
 import static org.assertj.core.api.Assertions.assertThat;
 
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaMultipleTopicExternalContext.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaMultipleTopicExternalContext.java
similarity index 98%
rename from flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaMultipleTopicExternalContext.java
rename to flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaMultipleTopicExternalContext.java
index 1548a3e..3e49004 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaMultipleTopicExternalContext.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaMultipleTopicExternalContext.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kafka.source.testutils;
+package org.apache.flink.connector.kafka.testutils;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaPartitionDataWriter.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaPartitionDataWriter.java
similarity index 97%
rename from flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaPartitionDataWriter.java
rename to flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaPartitionDataWriter.java
index 5485137..3b76f13 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaPartitionDataWriter.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaPartitionDataWriter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kafka.source.testutils;
+package org.apache.flink.connector.kafka.testutils;
 
 import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
 
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSingleTopicExternalContext.java
similarity index 99%
rename from flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
rename to flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSingleTopicExternalContext.java
index d816500..2dabb42 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSingleTopicExternalContext.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kafka.source.testutils;
+package org.apache.flink.connector.kafka.testutils;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java
similarity index 99%
rename from flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java
rename to flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java
index 286934b..853cbc8 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestEnv.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kafka.source.testutils;
+package org.apache.flink.connector.kafka.testutils;
 
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java
similarity index 93%
rename from flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java
rename to flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java
index a5c6097..ef1a0ee 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kafka.sink;
+package org.apache.flink.connector.kafka.testutils;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index b601680..680ca43 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -18,10 +18,10 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.connector.kafka.sink.KafkaUtil;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
 import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.connector.kafka.testutils.KafkaUtil;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
index 9f5e9f7..ae22d78 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.tests.util.kafka;
 
-import org.apache.flink.connector.kafka.source.testutils.KafkaMultipleTopicExternalContext;
-import org.apache.flink.connector.kafka.source.testutils.KafkaSingleTopicExternalContext;
+import org.apache.flink.connector.kafka.testutils.KafkaMultipleTopicExternalContext;
+import org.apache.flink.connector.kafka.testutils.KafkaSingleTopicExternalContext;
 import org.apache.flink.connectors.test.common.external.DefaultContainerizedExternalSystem;
 import org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory;
 import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;