You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2022/04/29 20:39:26 UTC

[bahir-flink] branch master updated: [BAHIR-243] Change KuduTestHarness with TestContainers

This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6429023  [BAHIR-243] Change KuduTestHarness with TestContainers
6429023 is described below

commit 642902373b632ed51340ccb8038911322e7a147c
Author: Joao Boto <bo...@boto.pro>
AuthorDate: Thu Mar 11 19:03:43 2021 +0100

    [BAHIR-243] Change KuduTestHarness with TestContainers
---
 flink-connector-flume/pom.xml                      |  1 +
 flink-connector-kudu/pom.xml                       |  4 +
 .../connectors/kudu/batch/KuduInputFormatTest.java |  4 +-
 .../kudu/batch/KuduOutputFormatTest.java           |  8 +-
 .../connectors/kudu/connector/KuduTestBase.java    | 85 +++++++++++++++++-----
 .../connectors/kudu/streaming/KuduSinkTest.java    | 11 ++-
 .../connectors/kudu/table/KuduCatalogTest.java     | 20 ++---
 .../kudu/table/KuduTableFactoryTest.java           | 10 +--
 .../kudu/table/KuduTableSourceITCase.java          |  2 +-
 .../connectors/kudu/table/KuduTableSourceTest.java |  2 +-
 10 files changed, 100 insertions(+), 47 deletions(-)

diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml
index 9ffa363..7aa2f91 100644
--- a/flink-connector-flume/pom.xml
+++ b/flink-connector-flume/pom.xml
@@ -87,6 +87,7 @@ under the License.
 		<dependency>
 			<groupId>org.testcontainers</groupId>
 			<artifactId>testcontainers</artifactId>
+			<scope>test</scope>
 		</dependency>
 	</dependencies>
 
diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 2a2dde2..9ace382 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -157,6 +157,10 @@
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
index fb7d8e3..126f7fd 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduInputFormatTest.java
@@ -39,7 +39,7 @@ class KuduInputFormatTest extends KuduTestBase {
 
     @Test
     void testInvalidTableInfo() {
-        String masterAddresses = harness.getMasterAddressesAsString();
+        String masterAddresses = getMasterAddress();
         KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
         Assertions.assertThrows(NullPointerException.class, () -> new KuduRowInputFormat(readerConfig, null));
     }
@@ -71,7 +71,7 @@ class KuduInputFormatTest extends KuduTestBase {
     }
 
     private List<Row> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception {
-        String masterAddresses = harness.getMasterAddressesAsString();
+        String masterAddresses = getMasterAddress();
         KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
         KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo, new ArrayList<>(),
                 fieldProjection == null ? null : Arrays.asList(fieldProjection));
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
index 22fa0a4..693e113 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java
@@ -39,14 +39,14 @@ class KuduOutputFormatTest extends KuduTestBase {
 
     @Test
     void testInvalidTableInfo() {
-        String masterAddresses = harness.getMasterAddressesAsString();
+        String masterAddresses = getMasterAddress();
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
         Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(writerConfig, null, null));
     }
 
     @Test
     void testNotTableExist() {
-        String masterAddresses = harness.getMasterAddressesAsString();
+        String masterAddresses = getMasterAddress();
         KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false);
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
         KuduOutputFormat<Row> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
@@ -55,7 +55,7 @@ class KuduOutputFormatTest extends KuduTestBase {
 
     @Test
     void testOutputWithStrongConsistency() throws Exception {
-        String masterAddresses = harness.getMasterAddressesAsString();
+        String masterAddresses = getMasterAddress();
 
         KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true);
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder
@@ -80,7 +80,7 @@ class KuduOutputFormatTest extends KuduTestBase {
 
     @Test
     void testOutputWithEventualConsistency() throws Exception {
-        String masterAddresses = harness.getMasterAddressesAsString();
+        String masterAddresses = getMasterAddress();
 
         KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true);
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
index 36572c6..3a872c5 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
@@ -26,22 +26,21 @@ import org.apache.flink.connectors.kudu.connector.writer.KuduWriter;
 import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
 import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
 import org.apache.flink.types.Row;
-
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduScanner;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.*;
 import org.apache.kudu.shaded.com.google.common.collect.Lists;
-import org.apache.kudu.test.KuduTestHarness;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.migrationsupport.rules.ExternalResourceSupport;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+import org.testcontainers.shaded.com.google.common.io.Closer;
+import org.testcontainers.shaded.com.google.common.net.HostAndPort;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -51,9 +50,19 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@ExtendWith(ExternalResourceSupport.class)
 public class KuduTestBase {
 
+    private static final String DOCKER_IMAGE = "apache/kudu:1.11.1";
+    private static final Integer KUDU_MASTER_PORT = 7051;
+    private static final Integer KUDU_TSERVER_PORT = 7050;
+    private static final Integer NUMBER_OF_REPLICA = 3;
+
+    private static GenericContainer<?> master;
+    private static List<GenericContainer<?>> tServers;
+
+    private static String masterAddress;
+    private static KuduClient kuduClient;
+
     private static final Object[][] booksTableData = {
             {1001, "Java for dummies", "Tan Ah Teck", 11.11, 11},
             {1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22},
@@ -61,20 +70,60 @@ public class KuduTestBase {
             {1004, "A Cup of Java", "Kumar", 44.44, 44},
             {1005, "A Teaspoon of Java", "Kevin Jones", 55.55, 55}};
 
-    public static KuduTestHarness harness;
     public static String[] columns = new String[]{"id", "title", "author", "price", "quantity"};
 
     @BeforeAll
     public static void beforeClass() throws Exception {
-        harness = new KuduTestHarness();
-        harness.before();
+        Network network = Network.newNetwork();
+
+        ImmutableList.Builder<GenericContainer<?>> tServersBuilder = ImmutableList.builder();
+        master = new GenericContainer<>(DOCKER_IMAGE)
+                .withExposedPorts(KUDU_MASTER_PORT, 8051)
+                .withCommand("master")
+                .withNetwork(network)
+                .withNetworkAliases("kudu-master");
+        master.start();
+        masterAddress = HostAndPort.fromParts(master.getHost(), master.getMappedPort(KUDU_MASTER_PORT)).toString();
+
+        for (int instance = 1; instance <= NUMBER_OF_REPLICA; instance++) {
+            String instanceName = "kudu-tserver-" + instance;
+            GenericContainer<?> tableServer = new GenericContainer<>(DOCKER_IMAGE)
+                    .withExposedPorts(KUDU_TSERVER_PORT)
+                    .withCommand("tserver")
+                    .withEnv("KUDU_MASTERS", "kudu-master:" + KUDU_MASTER_PORT)
+                    .withEnv("TSERVER_ARGS", "--fs_wal_dir=/var/lib/kudu/tserver --use_hybrid_clock=false --rpc_advertised_addresses=" + instanceName)
+                    .withNetwork(network)
+                    .withNetworkAliases(instanceName)
+                    .dependsOn(master);
+            tableServer.start();
+            tServersBuilder.add(tableServer);
+        }
+        tServers = tServersBuilder.build();
+
+        System.out.println(HostAndPort.fromParts(master.getHost(), master.getMappedPort(8051)).toString());
+        kuduClient = new KuduClient.KuduClientBuilder(masterAddress).build();
     }
 
     @AfterAll
     public static void afterClass() throws Exception {
-        harness.after();
+        kuduClient.close();
+        try (Closer closer = Closer.create()) {
+            closer.register(master::stop);
+            tServers.forEach(tabletServer -> closer.register(tabletServer::stop));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String getMasterAddress() {
+        return masterAddress;
     }
 
+    public KuduClient getClient() {
+        return kuduClient;
+    }
+
+
     public static KuduTableInfo booksTableInfo(String tableName, boolean createIfNotExist) {
 
         KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName);
@@ -159,7 +208,7 @@ public class KuduTestBase {
 
     protected void setUpDatabase(KuduTableInfo tableInfo) {
         try {
-            String masterAddresses = harness.getMasterAddressesAsString();
+            String masterAddresses = getMasterAddress();
             KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
             KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
             booksDataRow().forEach(row -> {
@@ -177,7 +226,7 @@ public class KuduTestBase {
 
     protected void cleanDatabase(KuduTableInfo tableInfo) {
         try {
-            String masterAddresses = harness.getMasterAddressesAsString();
+            String masterAddresses = getMasterAddress();
             KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
             KuduWriter kuduWriter = new KuduWriter(tableInfo, writerConfig, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
             kuduWriter.deleteTable();
@@ -188,7 +237,7 @@ public class KuduTestBase {
     }
 
     protected List<Row> readRows(KuduTableInfo tableInfo) throws Exception {
-        String masterAddresses = harness.getMasterAddressesAsString();
+        String masterAddresses = getMasterAddress();
         KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build();
         KuduReader reader = new KuduReader(tableInfo, readerConfig);
 
@@ -222,7 +271,7 @@ public class KuduTestBase {
     }
 
     protected void validateSingleKey(String tableName) throws Exception {
-        KuduTable kuduTable = harness.getClient().openTable(tableName);
+        KuduTable kuduTable = getClient().openTable(tableName);
         Schema schema = kuduTable.getSchema();
 
         assertEquals(1, schema.getPrimaryKeyColumnCount());
@@ -231,7 +280,7 @@ public class KuduTestBase {
         assertTrue(schema.getColumn("first").isKey());
         assertFalse(schema.getColumn("second").isKey());
 
-        KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+        KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build();
         List<RowResult> rows = new ArrayList<>();
         scanner.forEach(rows::add);
 
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
index 4d74fda..6791765 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
@@ -59,15 +59,14 @@ public class KuduSinkTest extends KuduTestBase {
 
     @Test
     void testInvalidTableInfo() {
-        harness.getClient();
-        String masterAddresses = harness.getMasterAddressesAsString();
+        String masterAddresses = getMasterAddress();
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
         Assertions.assertThrows(NullPointerException.class, () -> new KuduSink<>(writerConfig, null, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT)));
     }
 
     @Test
     void testNotTableExist() {
-        String masterAddresses = harness.getMasterAddressesAsString();
+        String masterAddresses = getMasterAddress();
         KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false);
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build();
         KuduSink<Row> sink = new KuduSink<>(writerConfig, tableInfo, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT));
@@ -78,7 +77,7 @@ public class KuduSinkTest extends KuduTestBase {
 
     @Test
     void testOutputWithStrongConsistency() throws Exception {
-        String masterAddresses = harness.getMasterAddressesAsString();
+        String masterAddresses = getMasterAddress();
 
         KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true);
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder
@@ -102,7 +101,7 @@ public class KuduSinkTest extends KuduTestBase {
 
     @Test
     void testOutputWithEventualConsistency() throws Exception {
-        String masterAddresses = harness.getMasterAddressesAsString();
+        String masterAddresses = getMasterAddress();
 
         KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), true);
         KuduWriterConfig writerConfig = KuduWriterConfig.Builder
@@ -130,7 +129,7 @@ public class KuduSinkTest extends KuduTestBase {
 
     @Test
     void testSpeed() throws Exception {
-        String masterAddresses = harness.getMasterAddressesAsString();
+        String masterAddresses = getMasterAddress();
 
         KuduTableInfo tableInfo = KuduTableInfo
                 .forTable("test_speed")
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
index 2bc8b12..1927631 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
@@ -65,7 +65,7 @@ public class KuduCatalogTest extends KuduTestBase {
     @BeforeEach
     public void init() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+        catalog = new KuduCatalog(getMasterAddress());
         tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerStreamingMode(env);
         tableEnv.registerCatalog("kudu", catalog);
         tableEnv.useCatalog("kudu");
@@ -92,7 +92,7 @@ public class KuduCatalogTest extends KuduTestBase {
         validateSingleKey("TestTable1R");
 
         tableEnv.executeSql("DROP TABLE TestTable1R");
-        assertFalse(harness.getClient().tableExists("TestTable1R"));
+        assertFalse(getClient().tableExists("TestTable1R"));
     }
 
     @Test
@@ -160,7 +160,7 @@ public class KuduCatalogTest extends KuduTestBase {
 
     @Test
     public void dataStreamEndToEstTest() throws Exception {
-        KuduCatalog catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+        KuduCatalog catalog = new KuduCatalog(getMasterAddress());
         // Creating table through catalog
         KuduTableFactory tableFactory = catalog.getKuduTableFactory();
 
@@ -189,7 +189,7 @@ public class KuduCatalogTest extends KuduTestBase {
         // Writing with simple sink
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(2);
-        KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(harness.getMasterAddressesAsString()).build();
+        KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(getMasterAddress()).build();
         env.fromCollection(input).addSink(
                 new KuduSink<>(
                         writerConfig,
@@ -228,10 +228,10 @@ public class KuduCatalogTest extends KuduTestBase {
                 .getJobExecutionResult()
                 .get(1, TimeUnit.MINUTES);
 
-        KuduTable kuduTable = harness.getClient().openTable("TestTableTsC");
+        KuduTable kuduTable = getClient().openTable("TestTableTsC");
         assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType());
 
-        KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+        KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build();
         List<RowResult> rows = new ArrayList<>();
         scanner.forEach(rows::add);
 
@@ -272,7 +272,7 @@ public class KuduCatalogTest extends KuduTestBase {
     }
 
     private void validateManyTypes(String tableName) throws Exception {
-        KuduTable kuduTable = harness.getClient().openTable(tableName);
+        KuduTable kuduTable = getClient().openTable(tableName);
         Schema schema = kuduTable.getSchema();
 
         assertEquals(Type.STRING, schema.getColumn("first").getType());
@@ -286,7 +286,7 @@ public class KuduCatalogTest extends KuduTestBase {
         assertEquals(Type.DOUBLE, schema.getColumn("ninth").getType());
         assertEquals(Type.UNIXTIME_MICROS, schema.getColumn("tenth").getType());
 
-        KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+        KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build();
         List<RowResult> rows = new ArrayList<>();
         scanner.forEach(rows::add);
 
@@ -304,7 +304,7 @@ public class KuduCatalogTest extends KuduTestBase {
     }
 
     private void validateMultiKey(String tableName) throws Exception {
-        KuduTable kuduTable = harness.getClient().openTable(tableName);
+        KuduTable kuduTable = getClient().openTable(tableName);
         Schema schema = kuduTable.getSchema();
 
         assertEquals(2, schema.getPrimaryKeyColumnCount());
@@ -315,7 +315,7 @@ public class KuduCatalogTest extends KuduTestBase {
 
         assertFalse(schema.getColumn("third").isKey());
 
-        KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+        KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build();
         List<RowResult> rows = new ArrayList<>();
         scanner.forEach(rows::add);
 
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
index a8ec686..f6482da 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
@@ -61,7 +61,7 @@ public class KuduTableFactoryTest extends KuduTestBase {
     public void init() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
         tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerStreamingMode(env);
-        kuduMasters = harness.getMasterAddressesAsString();
+        kuduMasters = getMasterAddress();
     }
 
     @Test
@@ -119,10 +119,10 @@ public class KuduTableFactoryTest extends KuduTestBase {
                 .getJobExecutionResult()
                 .get(1, TimeUnit.MINUTES);
 
-        KuduTable kuduTable = harness.getClient().openTable("TestTableTs");
+        KuduTable kuduTable = getClient().openTable("TestTableTs");
         assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType());
 
-        KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+        KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build();
         HashSet<Timestamp> results = new HashSet<>();
         scanner.forEach(sc -> results.add(sc.getTimestamp("second")));
 
@@ -156,8 +156,8 @@ public class KuduTableFactoryTest extends KuduTestBase {
                 .get(1, TimeUnit.MINUTES);
 
         // Validate that both insertions were into the same table
-        KuduTable kuduTable = harness.getClient().openTable("TestTable12");
-        KuduScanner scanner = harness.getClient().newScannerBuilder(kuduTable).build();
+        KuduTable kuduTable = getClient().openTable("TestTable12");
+        KuduScanner scanner = getClient().newScannerBuilder(kuduTable).build();
         List<RowResult> rows = new ArrayList<>();
         scanner.forEach(rows::add);
 
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
index b83497c..2a043f9 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
@@ -41,7 +41,7 @@ public class KuduTableSourceITCase extends KuduTestBase {
         KuduTableInfo tableInfo = booksTableInfo("books", true);
         setUpDatabase(tableInfo);
         tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-        catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+        catalog = new KuduCatalog(getMasterAddress());
         tableEnv.registerCatalog("kudu", catalog);
         tableEnv.useCatalog("kudu");
     }
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
index e02a297..43734e4 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
@@ -67,7 +67,7 @@ public class KuduTableSourceTest extends KuduTestBase {
     public void init() {
         KuduTableInfo tableInfo = booksTableInfo("books", true);
         setUpDatabase(tableInfo);
-        catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+        catalog = new KuduCatalog(getMasterAddress());
         ObjectPath op = new ObjectPath(EnvironmentSettings.DEFAULT_BUILTIN_DATABASE, "books");
         try {
             kuduTableSource = catalog.getKuduTableFactory().createTableSource(op, catalog.getTable(op));