You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2022/04/29 20:39:26 UTC
[bahir-flink] branch master updated: [BAHIR-243] Change KuduTestHarness with TestContainers
This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6429023 [BAHIR-243] Change KuduTestHarness with TestContainers
6429023 is described below
commit 642902373b632ed51340ccb8038911322e7a147c
Author: Joao Boto <bo...@boto.pro>
AuthorDate: Thu Mar 11 19:03:43 2021 +0100
[BAHIR-243] Change KuduTestHarness with TestContainers
---
flink-connector-flume/pom.xml | 1 +
flink-connector-kudu/pom.xml | 4 +
.../connectors/kudu/batch/KuduInputFormatTest.java | 4 +-
.../kudu/batch/KuduOutputFormatTest.java | 8 +-
.../connectors/kudu/connector/KuduTestBase.java | 85 +++++++++++++++++-----
.../connectors/kudu/streaming/KuduSinkTest.java | 11 ++-
.../connectors/kudu/table/KuduCatalogTest.java | 20 ++---
.../kudu/table/KuduTableFactoryTest.java | 10 +--
.../kudu/table/KuduTableSourceITCase.java | 2 +-
.../connectors/kudu/table/KuduTableSourceTest.java | 2 +-
10 files changed, 100 insertions(+), 47 deletions(-)
diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml
index 9ffa363..7aa2f91 100644
--- a/flink-connector-flume/pom.xml
+++ b/flink-connector-flume/pom.xml
@@ -87,6 +87,7 @@ under the License.
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 2a2dde2..9ace382 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -157,6 +157,10 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
index fb7d8e3..126f7fd 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
@@ -39,7 +39,7 @@ class KuduInputFormatTest extends KuduTestBase {
@Test
void testInvalidTableInfo() {
- String masterAddresses = harness.getMasterAddressesAsString();
+ String masterAddresses = getMasterAddress();
KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
Assertions.assertThrows(NullPointerException.class, () -> new KuduRowInputFormat(readerConfig, null));
}
@@ -71,7 +71,7 @@ class KuduInputFormatTest extends KuduTestBase {
}
private List<Row> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception {
- String masterAddresses = harness.getMasterAddressesAsString();
+ String masterAddresses = getMasterAddress();
KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo, new ArrayList<>(),
fieldProjection == null ? null : Arrays.asList(fieldProjection));
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
index 22fa0a4..693e113 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
@@ -39,14 +39,14 @@ class KuduOutputFormatTest extends KuduTestBase {
@Test
void testInvalidTableInfo() {
- String masterAddresses = harness.getMasterAddressesAsString();
+ String masterAddresses = getMasterAddress();
KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(writerConfig, null, null));
}
@Test
void testNotTableExist() {
- String masterAddresses = harness.getMasterAddressesAsString();
+ String masterAddresses = getMasterAddress();
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false);
KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
KuduOutputFormat<Row> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
@@ -55,7 +55,7 @@ class KuduOutputFormatTest extends KuduTestBase {
@Test
void testOutputWithStrongConsistency() throws Exception {
- String masterAddresses = harness.getMasterAddressesAsString();
+ String masterAddresses = getMasterAddress();
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true);
KuduWriterConfig writerConfig = KuduWriterConfig.Builder
@@ -80,7 +80,7 @@ class KuduOutputFormatTest extends KuduTestBase {
@Test
void testOutputWithEventualConsistency() throws Exception {
- String masterAddresses = harness.getMasterAddressesAsString();
+ String masterAddresses = getMasterAddress();
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true);
KuduWriterConfig writerConfig = KuduWriterConfig.Builder
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
index 36572c6..3a872c5 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
@@ -26,22 +26,21 @@ import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
import org.apache.flink.types.Row;
-
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduScanner;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.*;
import org.apache.kudu.shaded.com.google.common.collect.Lists;
-import org.apache.kudu.test.KuduTestHarness;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.migrationsupport.rules.ExternalResourceSupport;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+import org.testcontainers.shaded.com.google.common.io.Closer;
+import org.testcontainers.shaded.com.google.common.net.HostAndPort;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -51,9 +50,19 @@ import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-@ExtendWith(ExternalResourceSupport.class)
public class KuduTestBase {
+ private static final String DOCKER_IMAGE = "apache/kudu:1.11.1";
+ private static final Integer KUDU_MASTER_PORT = 7051;
+ private static final Integer KUDU_TSERVER_PORT = 7050;
+ private static final Integer NUMBER_OF_REPLICA = 3;
+
+ private static GenericContainer<?> master;
+ private static List<GenericContainer<?>> tServers;
+
+ private static String masterAddress;
+ private static KuduClient kuduClient;
+
private static final Object[][] booksTableData = {
{1001, "Java for dummies", "Tan Ah Teck", 11.11, 11},
{1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22},
@@ -61,20 +70,60 @@ public class KuduTestBase {
{1004, "A Cup of Java", "Kumar", 44.44, 44},
{1005, "A Teaspoon of Java", "Kevin Jones", 55.55, 55}};
- public static KuduTestHarness harness;
public static String[] columns = new String[]{"id", "title", "author", "price", "quantity"};
@BeforeAll
public static void beforeClass() throws Exception {
- harness = new KuduTestHarness();
- harness.before();
+ Network network = Network.newNetwork();
+
+ ImmutableList.Builder<GenericContainer<?>> tServersBuilder = ImmutableList.builder();
+ master = new GenericContainer<>(DOCKER_IMAGE)
+ .withExposedPorts(KUDU_MASTER_PORT, 8051)
+ .withCommand("master")
+ .withNetwork(network)
+ .withNetworkAliases("kudu-master");
+ master.start();
+ masterAddress = HostAndPort.fromParts(master.getHost(), master.getMappedPort(KUDU_MASTER_PORT)).toString();
+
+ for (int instance = 1; instance <= NUMBER_OF_REPLICA; instance++) {
+ String instanceName = "kudu-tserver-" + instance;
+ GenericContainer<?> tableServer = new GenericContainer<>(DOCKER_IMAGE)
+ .withExposedPorts(KUDU_TSERVER_PORT)
+ .withCommand("tserver")
+ .withEnv("KUDU_MASTERS", "kudu-master:" + KUDU_MASTER_PORT)
+ .withEnv("TSERVER_ARGS", "--fs_wal_dir=/var/lib/kudu/tserver --use_hybrid_clock=false --rpc_advertised_addresses=" + instanceName)
+ .withNetwork(network)
+ .withNetworkAliases(instanceName)
+ .dependsOn(master);
+ tableServer.start();
+ tServersBuilder.add(tableServer);
+ }
+ tServers = tServersBuilder.build();
+
+ System.out.println(HostAndPort.fromParts(master.getHost(), master.getMappedPort(8051)).toString());
+ kuduClient = new KuduClient.KuduClientBuilder(masterAddress).build();
}
@AfterAll
public static void afterClass() throws Exception {
- harness.after();
+ kuduClient.close();
+ try (Closer closer = Closer.create()) {
+ closer.register(master::stop);
+ tServers.forEach(tabletServer -> closer.register(tabletServer::stop));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getMasterAddress() {
+ return masterAddress;
}
+ public KuduClient getClient() {
+ return kuduClient;
+ }
+
+
public static KuduTableInfo booksTableInfo(String tableName, boolean createIfNotExist) {
KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName);
@@ -159,7 +208,7 @@ public class KuduTestBase {
protected void setUpDatabase(KuduTableInfo tableInfo) {
try {
- String masterAddresses = harness.getMasterAddressesAsString();
+ String masterAddresses = getMasterAddress();
KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
booksDataRow().forEach(row -> {
@@ -177,7 +226,7 @@ public class KuduTestBase {
protected void cleanDatabase(KuduTableInfo tableInfo) {
try {
- String masterAddresses = harness.getMasterAddressesAsString();
+ String masterAddresses = getMasterAddress();
KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
kuduWriter.deleteTable();
@@ -188,7 +237,7 @@ public class KuduTestBase {
}
protected List<Row> readRows(KuduTableInfo tableInfo) throws Exception {
- String masterAddresses = harness.getMasterAddressesAsString();
+ String masterAddresses = getMasterAddress();
KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
KuduReader reader = new KuduReader(tableInfo, readerConfig);
@@ -222,7 +271,7 @@ public class KuduTestBase {
}
protected void validateSingleKey(String tableName) throws Exception {
- KuduTable kuduTable = harness.getClient().openTable(tableName);
+ KuduTable kuduTable = getClient().openTable(tableName);
Schema schema = kuduTable.getSchema();
assertEquals(1, schema.getPrimaryKeyColumnCount());
@@ -231,7 +280,7 @@ public class KuduTestBase {
assertTrue(schema.getColumn("first").isKey());
assertFalse(schema.getColumn("second").isKey());
- KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+ KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build();
List<RowResult> rows = new ArrayList<>();
scanner.forEach(rows::add);
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
index 4d74fda..6791765 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
@@ -59,15 +59,14 @@ public class KuduSinkTest extends KuduTestBase {
@Test
void testInvalidTableInfo() {
- harness.getClient();
- String masterAddresses = harness.getMasterAddressesAsString();
+ String masterAddresses = getMasterAddress();
KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(writerConfig, null, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT)));
}
@Test
void testNotTableExist() {
- String masterAddresses = harness.getMasterAddressesAsString();
+ String masterAddresses = getMasterAddress();
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false);
KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
KuduSink<Row> sink = new KuduSink<>(writerConfig, tableInfo, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
@@ -78,7 +77,7 @@ public class KuduSinkTest extends KuduTestBase {
@Test
void testOutputWithStrongConsistency() throws Exception {
- String masterAddresses = harness.getMasterAddressesAsString();
+ String masterAddresses = getMasterAddress();
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true);
KuduWriterConfig writerConfig = KuduWriterConfig.Builder
@@ -102,7 +101,7 @@ public class KuduSinkTest extends KuduTestBase {
@Test
void testOutputWithEventualConsistency() throws Exception {
- String masterAddresses = harness.getMasterAddressesAsString();
+ String masterAddresses = getMasterAddress();
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true);
KuduWriterConfig writerConfig = KuduWriterConfig.Builder
@@ -130,7 +129,7 @@ public class KuduSinkTest extends KuduTestBase {
@Test
void testSpeed() throws Exception {
- String masterAddresses = harness.getMasterAddressesAsString();
+ String masterAddresses = getMasterAddress();
KuduTableInfo tableInfo = KuduTableInfo
.forTable("test_speed")
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
index 2bc8b12..1927631 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
@@ -65,7 +65,7 @@ public class KuduCatalogTest extends KuduTestBase {
@BeforeEach
public void init() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+ catalog = new KuduCatalog(getMasterAddress());
tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerStreamingMode(env);
tableEnv.registerCatalog("kudu", catalog);
tableEnv.useCatalog("kudu");
@@ -92,7 +92,7 @@ public class KuduCatalogTest extends KuduTestBase {
validateSingleKey("TestTable1R");
tableEnv.executeSql("DROP TABLE TestTable1R");
- assertFalse(harness.getClient().tableExists("TestTable1R"));
+ assertFalse(getClient().tableExists("TestTable1R"));
}
@Test
@@ -160,7 +160,7 @@ public class KuduCatalogTest extends KuduTestBase {
@Test
public void dataStreamEndToEstTest() throws Exception {
- KuduCatalog catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+ KuduCatalog catalog = new KuduCatalog(getMasterAddress());
// Creating table through catalog
KuduTableFactory tableFactory = catalog.getKuduTableFactory();
@@ -189,7 +189,7 @@ public class KuduCatalogTest extends KuduTestBase {
// Writing with simple sink
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
- KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(harness.getMasterAddressesAsString()).build();
+ KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(getMasterAddress()).build();
env.fromCollection(input).addSink(
new KuduSink<>(
writerConfig,
@@ -228,10 +228,10 @@ public class KuduCatalogTest extends KuduTestBase {
.getJobExecutionResult()
.get(1, TimeUnit.MINUTES);
- KuduTable kuduTable = harness.getClient().openTable("TestTableTsC");
+ KuduTable kuduTable = getClient().openTable("TestTableTsC");
assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType());
- KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+ KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build();
List<RowResult> rows = new ArrayList<>();
scanner.forEach(rows::add);
@@ -272,7 +272,7 @@ public class KuduCatalogTest extends KuduTestBase {
}
private void validateManyTypes(String tableName) throws Exception {
- KuduTable kuduTable = harness.getClient().openTable(tableName);
+ KuduTable kuduTable = getClient().openTable(tableName);
Schema schema = kuduTable.getSchema();
assertEquals(Type.STRING, schema.getColumn("first").getType());
@@ -286,7 +286,7 @@ public class KuduCatalogTest extends KuduTestBase {
assertEquals(Type.DOUBLE, schema.getColumn("ninth").getType());
assertEquals(Type.UNIXTIME_MICROS, schema.getColumn("tenth").getType());
- KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+ KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build();
List<RowResult> rows = new ArrayList<>();
scanner.forEach(rows::add);
@@ -304,7 +304,7 @@ public class KuduCatalogTest extends KuduTestBase {
}
private void validateMultiKey(String tableName) throws Exception {
- KuduTable kuduTable = harness.getClient().openTable(tableName);
+ KuduTable kuduTable = getClient().openTable(tableName);
Schema schema = kuduTable.getSchema();
assertEquals(2, schema.getPrimaryKeyColumnCount());
@@ -315,7 +315,7 @@ public class KuduCatalogTest extends KuduTestBase {
assertFalse(schema.getColumn("third").isKey());
- KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+ KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build();
List<RowResult> rows = new ArrayList<>();
scanner.forEach(rows::add);
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
index a8ec686..f6482da 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
@@ -61,7 +61,7 @@ public class KuduTableFactoryTest extends KuduTestBase {
public void init() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerStreamingMode(env);
- kuduMasters = harness.getMasterAddressesAsString();
+ kuduMasters = getMasterAddress();
}
@Test
@@ -119,10 +119,10 @@ public class KuduTableFactoryTest extends KuduTestBase {
.getJobExecutionResult()
.get(1, TimeUnit.MINUTES);
- KuduTable kuduTable = harness.getClient().openTable("TestTableTs");
+ KuduTable kuduTable = getClient().openTable("TestTableTs");
assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType());
- KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+ KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build();
HashSet<Timestamp> results = new HashSet<>();
scanner.forEach(sc -> results.add(sc.getTimestamp("second")));
@@ -156,8 +156,8 @@ public class KuduTableFactoryTest extends KuduTestBase {
.get(1, TimeUnit.MINUTES);
// Validate that both insertions were into the same table
- KuduTable kuduTable = harness.getClient().openTable("TestTable12");
- KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+ KuduTable kuduTable = getClient().openTable("TestTable12");
+ KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build();
List<RowResult> rows = new ArrayList<>();
scanner.forEach(rows::add);
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
index b83497c..2a043f9 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
@@ -41,7 +41,7 @@ public class KuduTableSourceITCase extends KuduTestBase {
KuduTableInfo tableInfo = booksTableInfo("books", true);
setUpDatabase(tableInfo);
tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerBatchMode();
- catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+ catalog = new KuduCatalog(getMasterAddress());
tableEnv.registerCatalog("kudu", catalog);
tableEnv.useCatalog("kudu");
}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
index e02a297..43734e4 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
@@ -67,7 +67,7 @@ public class KuduTableSourceTest extends KuduTestBase {
public void init() {
KuduTableInfo tableInfo = booksTableInfo("books", true);
setUpDatabase(tableInfo);
- catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+ catalog = new KuduCatalog(getMasterAddress());
ObjectPath op = new ObjectPath(EnvironmentSettings.DEFAULT_BUILTIN_DATABASE, "books");
try {
kuduTableSource = catalog.getKuduTableFactory().createTableSource(op, catalog.getTable(op));