You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/07/08 16:16:10 UTC

[pulsar] branch master updated: Cleanup tests in the presto module (#4683)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c421ca6  Cleanup tests in the presto module (#4683)
c421ca6 is described below

commit c421ca6e5b0c99f7728c8651ad9a81005bfd176e
Author: vzhikserg <vz...@users.noreply.github.com>
AuthorDate: Mon Jul 8 18:16:05 2019 +0200

    Cleanup tests in the presto module (#4683)
    
    * Add static import statements for Assert to simplify the test in the presto module
    
    * Use the preferred way of the schema's creation. The predicates and functions were converted to lambda
---
 .../pulsar/sql/presto/TestPulsarConnector.java     | 102 +++++++-----------
 .../pulsar/sql/presto/TestPulsarMetadata.java      | 108 +++++++++----------
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  |  45 ++++----
 .../pulsar/sql/presto/TestPulsarSplitManager.java  | 115 ++++++++++-----------
 4 files changed, 169 insertions(+), 201 deletions(-)

diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 1a5e87a..c38c0ac 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -75,7 +75,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static com.facebook.presto.spi.type.DateType.DATE;
@@ -224,21 +223,19 @@ public abstract class TestPulsarConnector {
             partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_6.toString(), 7);
 
             topicsToSchemas = new HashMap<>();
-            topicsToSchemas.put(TOPIC_1.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
-            topicsToSchemas.put(TOPIC_2.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
-            topicsToSchemas.put(TOPIC_3.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
-            topicsToSchemas.put(TOPIC_4.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
-            topicsToSchemas.put(TOPIC_5.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
-            topicsToSchemas.put(TOPIC_6.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
-
-
-            topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
-
-            topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
-            topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
-            topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
-            topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
-            topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_1.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_2.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_3.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_4.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_5.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_6.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
+
+            topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
 
             fooTypes = new HashMap<>();
             fooTypes.put("field1", IntegerType.INTEGER);
@@ -536,14 +533,9 @@ public abstract class TestPulsarConnector {
                     fieldNames10,
                     positionIndices10));
 
-
-            fooColumnHandles.addAll(PulsarInternalColumn.getInternalFields().stream().map(
-                    new Function<PulsarInternalColumn, PulsarColumnHandle>() {
-                        @Override
-                        public PulsarColumnHandle apply(PulsarInternalColumn pulsarInternalColumn) {
-                            return pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false);
-                        }
-                    }).collect(Collectors.toList()));
+            fooColumnHandles.addAll(PulsarInternalColumn.getInternalFields().stream()
+                .map(pulsarInternalColumn -> pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false))
+                .collect(Collectors.toList()));
 
             splits = new HashMap<>();
 
@@ -564,11 +556,11 @@ public abstract class TestPulsarConnector {
             fooFunctions = new HashMap<>();
 
             fooFunctions.put("field1", integer -> integer);
-            fooFunctions.put("field2", integer -> String.valueOf(integer));
-            fooFunctions.put("field3", integer -> integer.floatValue());
-            fooFunctions.put("field4", integer -> integer.doubleValue());
+            fooFunctions.put("field2", String::valueOf);
+            fooFunctions.put("field3", Integer::floatValue);
+            fooFunctions.put("field4", Integer::doubleValue);
             fooFunctions.put("field5", integer -> integer % 2 == 0);
-            fooFunctions.put("field6", integer -> integer.longValue());
+            fooFunctions.put("field6", Integer::longValue);
             fooFunctions.put("timestamp", integer -> System.currentTimeMillis());
             fooFunctions.put("time", integer -> {
                 LocalTime now = LocalTime.now(ZoneId.systemDefault());
@@ -647,45 +639,24 @@ public abstract class TestPulsarConnector {
     private static final Logger log = Logger.get(TestPulsarConnector.class);
 
     protected static List<String> getNamespace(String tenant) {
-        return new LinkedList<>(topicNames.stream().filter(new Predicate<TopicName>() {
-            @Override
-            public boolean test(TopicName topicName) {
-                return topicName.getTenant().equals(tenant);
-            }
-        }).map(new Function<TopicName, String>() {
-            @Override
-            public String apply(TopicName topicName) {
-                return topicName.getNamespace();
-            }
-        }).collect(Collectors.toSet()));
+        return topicNames.stream()
+            .filter(topicName -> topicName.getTenant().equals(tenant))
+            .map(TopicName::getNamespace)
+            .distinct()
+            .collect(Collectors.toCollection(LinkedList::new));
     }
 
     protected static List<String> getTopics(String ns) {
-        return topicNames.stream().filter(new Predicate<TopicName>() {
-            @Override
-            public boolean test(TopicName topicName) {
-                return topicName.getNamespace().equals(ns);
-            }
-        }).map(new Function<TopicName, String>() {
-            @Override
-            public String apply(TopicName topicName) {
-                return topicName.toString();
-            }
-        }).collect(Collectors.toList());
+        return topicNames.stream()
+            .filter(topicName -> topicName.getNamespace().equals(ns))
+            .map(TopicName::toString).collect(Collectors.toList());
     }
 
     protected static List<String> getPartitionedTopics(String ns) {
-        return partitionedTopicNames.stream().filter(new Predicate<TopicName>() {
-            @Override
-            public boolean test(TopicName topicName) {
-                return topicName.getNamespace().equals(ns);
-            }
-        }).map(new Function<TopicName, String>() {
-            @Override
-            public String apply(TopicName topicName) {
-                return topicName.toString();
-            }
-        }).collect(Collectors.toList());
+        return partitionedTopicNames.stream()
+            .filter(topicName -> topicName.getNamespace().equals(ns))
+            .map(TopicName::toString)
+            .collect(Collectors.toList());
     }
 
     @BeforeMethod
@@ -696,12 +667,9 @@ public abstract class TestPulsarConnector {
         this.pulsarConnectorConfig.setMaxSplitMessageQueueSize(100);
 
         Tenants tenants = mock(Tenants.class);
-        doReturn(new LinkedList<>(topicNames.stream().map(new Function<TopicName, String>() {
-            @Override
-            public String apply(TopicName topicName) {
-                return topicName.getTenant();
-            }
-        }).collect(Collectors.toSet()))).when(tenants).getTenants();
+        doReturn(new LinkedList<>(topicNames.stream()
+            .map(TopicName::getTenant)
+            .collect(Collectors.toSet()))).when(tenants).getTenants();
 
         Namespaces namespaces = mock(Namespaces.class);
 
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index f03b34f..a704be6 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -34,7 +34,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.core.Response;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import java.util.Arrays;
@@ -49,6 +48,11 @@ import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 @Test(singleThreaded = true)
 public class TestPulsarMetadata extends TestPulsarConnector {
@@ -62,7 +66,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
 
         String[] expectedSchemas = {NAMESPACE_NAME_1.toString(), NAMESPACE_NAME_2.toString(),
                 NAMESPACE_NAME_3.toString(), NAMESPACE_NAME_4.toString()};
-        Assert.assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
+        assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
     }
 
     @Test
@@ -73,14 +77,14 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         ConnectorTableHandle connectorTableHandle
                 = this.pulsarMetadata.getTableHandle(mock(ConnectorSession.class), schemaTableName);
 
-        Assert.assertTrue(connectorTableHandle instanceof PulsarTableHandle);
+        assertTrue(connectorTableHandle instanceof PulsarTableHandle);
 
         PulsarTableHandle pulsarTableHandle = (PulsarTableHandle) connectorTableHandle;
 
-        Assert.assertEquals(pulsarTableHandle.getConnectorId(), pulsarConnectorId.toString());
-        Assert.assertEquals(pulsarTableHandle.getSchemaName(), TOPIC_1.getNamespace());
-        Assert.assertEquals(pulsarTableHandle.getTableName(), TOPIC_1.getLocalName());
-        Assert.assertEquals(pulsarTableHandle.getTopicName(), TOPIC_1.getLocalName());
+        assertEquals(pulsarTableHandle.getConnectorId(), pulsarConnectorId.toString());
+        assertEquals(pulsarTableHandle.getSchemaName(), TOPIC_1.getNamespace());
+        assertEquals(pulsarTableHandle.getTableName(), TOPIC_1.getLocalName());
+        assertEquals(pulsarTableHandle.getTopicName(), TOPIC_1.getLocalName());
     }
 
     @Test
@@ -101,10 +105,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
             ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                     pulsarTableHandle);
 
-            Assert.assertEquals(tableMetadata.getTable().getSchemaName(), topic.getNamespace());
-            Assert.assertEquals(tableMetadata.getTable().getTableName(), topic.getLocalName());
+            assertEquals(tableMetadata.getTable().getSchemaName(), topic.getNamespace());
+            assertEquals(tableMetadata.getTable().getTableName(), topic.getLocalName());
 
-            Assert.assertEquals(tableMetadata.getColumns().size(),
+            assertEquals(tableMetadata.getColumns().size(),
                     fooColumnHandles.size());
 
             List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
@@ -115,7 +119,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
 
             for (ColumnMetadata column : tableMetadata.getColumns()) {
                 if (PulsarInternalColumn.getInternalFieldsMap().containsKey(column.getName())) {
-                    Assert.assertEquals(column.getComment(),
+                    assertEquals(column.getComment(),
                             PulsarInternalColumn.getInternalFieldsMap()
                                     .get(column.getName()).getColumnMetadata(true).getComment());
                 }
@@ -123,7 +127,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
                 fieldNames.remove(column.getName());
             }
 
-            Assert.assertTrue(fieldNames.isEmpty());
+            assertTrue(fieldNames.isEmpty());
         }
     }
 
@@ -140,10 +144,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         try {
             ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                     pulsarTableHandle);
-            Assert.fail("Invalid schema should have generated an exception");
+            fail("Invalid schema should have generated an exception");
         } catch (PrestoException e) {
-            Assert.assertEquals(e.getErrorCode(), NOT_FOUND.toErrorCode());
-            Assert.assertEquals(e.getMessage(), "Schema wrong-tenant/wrong-ns does not exist");
+            assertEquals(e.getErrorCode(), NOT_FOUND.toErrorCode());
+            assertEquals(e.getMessage(), "Schema wrong-tenant/wrong-ns does not exist");
         }
     }
 
@@ -160,10 +164,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         try {
             ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                     pulsarTableHandle);
-            Assert.fail("Invalid table should have generated an exception");
+            fail("Invalid table should have generated an exception");
         } catch (TableNotFoundException e) {
-            Assert.assertEquals(e.getErrorCode(), NOT_FOUND.toErrorCode());
-            Assert.assertEquals(e.getMessage(), "Table 'tenant-1/ns-1.wrong-topic' not found");
+            assertEquals(e.getErrorCode(), NOT_FOUND.toErrorCode());
+            assertEquals(e.getMessage(), "Table 'tenant-1/ns-1.wrong-topic' not found");
         }
     }
 
@@ -183,7 +187,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
 
         ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                 pulsarTableHandle);
-        Assert.assertEquals(tableMetadata.getColumns().size(), 0);
+        assertEquals(tableMetadata.getColumns().size(), 0);
     }
 
     @Test
@@ -203,10 +207,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         try {
             ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                     pulsarTableHandle);
-            Assert.fail("Table without schema should have generated an exception");
+            fail("Table without schema should have generated an exception");
         } catch (PrestoException e) {
-            Assert.assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
-            Assert.assertEquals(e.getMessage(),
+            assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
+            assertEquals(e.getMessage(),
                     "Topic persistent://tenant-1/ns-1/topic-1 does not have a valid schema");
         }
     }
@@ -228,27 +232,27 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         try {
             ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                     pulsarTableHandle);
-            Assert.fail("Table without schema should have generated an exception");
+            fail("Table without schema should have generated an exception");
         } catch (PrestoException e) {
-            Assert.assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
-            Assert.assertEquals(e.getMessage(),
+            assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
+            assertEquals(e.getMessage(),
                     "Topic persistent://tenant-1/ns-1/topic-1 does not have a valid schema");
         }
     }
 
     @Test
     public void testListTable() {
-        Assert.assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), null).isEmpty());
-        Assert.assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), "wrong-tenant/wrong-ns")
+        assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), null).isEmpty());
+        assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), "wrong-tenant/wrong-ns")
                 .isEmpty());
 
         SchemaTableName[] expectedTopics1 = {new SchemaTableName(TOPIC_4.getNamespace(), TOPIC_4.getLocalName())};
-        Assert.assertEquals(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
+        assertEquals(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
                 NAMESPACE_NAME_3.toString()), Arrays.asList(expectedTopics1));
 
         SchemaTableName[] expectedTopics2 = {new SchemaTableName(TOPIC_5.getNamespace(), TOPIC_5.getLocalName()),
                 new SchemaTableName(TOPIC_6.getNamespace(), TOPIC_6.getLocalName())};
-        Assert.assertEquals(new HashSet<>(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
+        assertEquals(new HashSet<>(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
                 NAMESPACE_NAME_4.toString())), new HashSet<>(Arrays.asList(expectedTopics2)));
     }
 
@@ -267,25 +271,25 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         }
 
         for (String field : fieldNames) {
-            Assert.assertNotNull(columnHandleMap.get(field));
+            assertNotNull(columnHandleMap.get(field));
             PulsarColumnHandle pulsarColumnHandle = (PulsarColumnHandle) columnHandleMap.get(field);
             PulsarInternalColumn pulsarInternalColumn = PulsarInternalColumn.getInternalFieldsMap().get(field);
             if (pulsarInternalColumn != null) {
-                Assert.assertEquals(pulsarColumnHandle,
+                assertEquals(pulsarColumnHandle,
                         pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false));
             } else {
                 Schema schema = new Schema.Parser().parse(new String(topicsToSchemas.get(TOPIC_1.getSchemaName())
                         .getSchema()));
-                Assert.assertEquals(pulsarColumnHandle.getConnectorId(), pulsarConnectorId.toString());
-                Assert.assertEquals(pulsarColumnHandle.getName(), field);
-                Assert.assertEquals(pulsarColumnHandle.getPositionIndices(), fooPositionIndices.get(field));
-                Assert.assertEquals(pulsarColumnHandle.getFieldNames(), fooFieldNames.get(field));
-                Assert.assertEquals(pulsarColumnHandle.getType(), fooTypes.get(field));
-                Assert.assertEquals(pulsarColumnHandle.isHidden(), false);
+                assertEquals(pulsarColumnHandle.getConnectorId(), pulsarConnectorId.toString());
+                assertEquals(pulsarColumnHandle.getName(), field);
+                assertEquals(pulsarColumnHandle.getPositionIndices(), fooPositionIndices.get(field));
+                assertEquals(pulsarColumnHandle.getFieldNames(), fooFieldNames.get(field));
+                assertEquals(pulsarColumnHandle.getType(), fooTypes.get(field));
+                assertFalse(pulsarColumnHandle.isHidden());
             }
             columnHandleMap.remove(field);
         }
-        Assert.assertTrue(columnHandleMap.isEmpty());
+        assertTrue(columnHandleMap.isEmpty());
     }
 
     @Test
@@ -294,11 +298,11 @@ public class TestPulsarMetadata extends TestPulsarConnector {
                 = this.pulsarMetadata.listTableColumns(mock(ConnectorSession.class),
                 new SchemaTablePrefix(TOPIC_1.getNamespace()));
 
-        Assert.assertEquals(tableColumnsMap.size(), 2);
+        assertEquals(tableColumnsMap.size(), 2);
         List<ColumnMetadata> columnMetadataList
                 = tableColumnsMap.get(new SchemaTableName(TOPIC_1.getNamespace(), TOPIC_1.getLocalName()));
-        Assert.assertNotNull(columnMetadataList);
-        Assert.assertEquals(columnMetadataList.size(),
+        assertNotNull(columnMetadataList);
+        assertEquals(columnMetadataList.size(),
                 fooColumnHandles.size());
 
         List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
@@ -309,7 +313,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
 
         for (ColumnMetadata column : columnMetadataList) {
             if (PulsarInternalColumn.getInternalFieldsMap().containsKey(column.getName())) {
-                Assert.assertEquals(column.getComment(),
+                assertEquals(column.getComment(),
                         PulsarInternalColumn.getInternalFieldsMap()
                                 .get(column.getName()).getColumnMetadata(true).getComment());
             }
@@ -317,11 +321,11 @@ public class TestPulsarMetadata extends TestPulsarConnector {
             fieldNames.remove(column.getName());
         }
 
-        Assert.assertTrue(fieldNames.isEmpty());
+        assertTrue(fieldNames.isEmpty());
 
         columnMetadataList = tableColumnsMap.get(new SchemaTableName(TOPIC_2.getNamespace(), TOPIC_2.getLocalName()));
-        Assert.assertNotNull(columnMetadataList);
-        Assert.assertEquals(columnMetadataList.size(),
+        assertNotNull(columnMetadataList);
+        assertEquals(columnMetadataList.size(),
                 fooColumnHandles.size());
 
         fieldNames = new LinkedList<>(fooFieldNames.keySet());
@@ -332,7 +336,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
 
         for (ColumnMetadata column : columnMetadataList) {
             if (PulsarInternalColumn.getInternalFieldsMap().containsKey(column.getName())) {
-                Assert.assertEquals(column.getComment(),
+                assertEquals(column.getComment(),
                         PulsarInternalColumn.getInternalFieldsMap()
                                 .get(column.getName()).getColumnMetadata(true).getComment());
             }
@@ -340,17 +344,17 @@ public class TestPulsarMetadata extends TestPulsarConnector {
             fieldNames.remove(column.getName());
         }
 
-        Assert.assertTrue(fieldNames.isEmpty());
+        assertTrue(fieldNames.isEmpty());
 
         // test table and schema
         tableColumnsMap
                 = this.pulsarMetadata.listTableColumns(mock(ConnectorSession.class),
                 new SchemaTablePrefix(TOPIC_4.getNamespace(), TOPIC_4.getLocalName()));
 
-        Assert.assertEquals(tableColumnsMap.size(), 1);
+        assertEquals(tableColumnsMap.size(), 1);
         columnMetadataList = tableColumnsMap.get(new SchemaTableName(TOPIC_4.getNamespace(), TOPIC_4.getLocalName()));
-        Assert.assertNotNull(columnMetadataList);
-        Assert.assertEquals(columnMetadataList.size(),
+        assertNotNull(columnMetadataList);
+        assertEquals(columnMetadataList.size(),
                 fooColumnHandles.size());
 
         fieldNames = new LinkedList<>(fooFieldNames.keySet());
@@ -361,7 +365,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
 
         for (ColumnMetadata column : columnMetadataList) {
             if (PulsarInternalColumn.getInternalFieldsMap().containsKey(column.getName())) {
-                Assert.assertEquals(column.getComment(),
+                assertEquals(column.getComment(),
                         PulsarInternalColumn.getInternalFieldsMap()
                                 .get(column.getName()).getColumnMetadata(true).getComment());
             }
@@ -369,6 +373,6 @@ public class TestPulsarMetadata extends TestPulsarConnector {
             fieldNames.remove(column.getName());
         }
 
-        Assert.assertTrue(fieldNames.isEmpty());
+        assertTrue(fieldNames.isEmpty());
     }
 }
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 7a8b18f..c81fc40 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -20,13 +20,14 @@ package org.apache.pulsar.sql.presto;
 
 import io.airlift.log.Logger;
 import org.apache.pulsar.common.naming.TopicName;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import static org.testng.Assert.assertEquals;
+
 @Test(singleThreaded = true)
 public class TestPulsarRecordCursor extends TestPulsarConnector {
 
@@ -50,22 +51,22 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
                         columnsSeen.add(fooColumnHandles.get(i).getName());
                     } else {
                         if (fooColumnHandles.get(i).getName().equals("field1")) {
-                            Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("field1").apply(count)).longValue());
+                            assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("field1").apply(count)).longValue());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("field2")) {
-                            Assert.assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) fooFunctions.get("field2").apply(count)).getBytes());
+                            assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) fooFunctions.get("field2").apply(count)).getBytes());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("field3")) {
-                            Assert.assertEquals(pulsarRecordCursor.getLong(i), Float.floatToIntBits(((Float) fooFunctions.get("field3").apply(count)).floatValue()));
+                            assertEquals(pulsarRecordCursor.getLong(i), Float.floatToIntBits(((Float) fooFunctions.get("field3").apply(count)).floatValue()));
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("field4")) {
-                            Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("field4").apply(count)).doubleValue());
+                            assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("field4").apply(count)).doubleValue());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("field5")) {
-                            Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("field5").apply(count)).booleanValue());
+                            assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("field5").apply(count)).booleanValue());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("field6")) {
-                            Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("field6").apply(count)).longValue());
+                            assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("field6").apply(count)).longValue());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("timestamp")) {
                             pulsarRecordCursor.getLong(i);
@@ -77,40 +78,40 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
                             pulsarRecordCursor.getLong(i);
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("bar.field1")) {
-                            Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.field1").apply(count)).longValue());
+                            assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.field1").apply(count)).longValue());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("bar.field2")) {
-                            Assert.assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) fooFunctions.get("bar.field2").apply(count)).getBytes());
+                            assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) fooFunctions.get("bar.field2").apply(count)).getBytes());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("bar.field3")) {
-                            Assert.assertEquals(pulsarRecordCursor.getLong(i), Float.floatToIntBits(((Float) fooFunctions.get("bar.field3").apply(count)).floatValue()));
+                            assertEquals(pulsarRecordCursor.getLong(i), Float.floatToIntBits(((Float) fooFunctions.get("bar.field3").apply(count)).floatValue()));
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("bar.test.field4")) {
-                            Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("bar.test.field4").apply(count)).doubleValue());
+                            assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("bar.test.field4").apply(count)).doubleValue());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("bar.test.field5")) {
-                            Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("bar.test.field5").apply(count)).booleanValue());
+                            assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("bar.test.field5").apply(count)).booleanValue());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("bar.test.field6")) {
-                            Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("bar.test.field6").apply(count)).longValue());
+                            assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("bar.test.field6").apply(count)).longValue());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("bar.test.foobar.field1")) {
-                            Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test.foobar.field1").apply(count)).longValue());
+                            assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test.foobar.field1").apply(count)).longValue());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("bar.test2.field4")) {
-                            Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("bar.test2.field4").apply(count)).doubleValue());
+                            assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("bar.test2.field4").apply(count)).doubleValue());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("bar.test2.field5")) {
-                            Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("bar.test2.field5").apply(count)).booleanValue());
+                            assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("bar.test2.field5").apply(count)).booleanValue());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("bar.test2.field6")) {
-                            Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("bar.test2.field6").apply(count)).longValue());
+                            assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("bar.test2.field6").apply(count)).longValue());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("bar.test2.foobar.field1")) {
-                            Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test2.foobar.field1").apply(count)).longValue());
+                            assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test2.foobar.field1").apply(count)).longValue());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else if (fooColumnHandles.get(i).getName().equals("field7")) {
-                            Assert.assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), fooFunctions.get("field7").apply(count).toString().getBytes());
+                            assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), fooFunctions.get("field7").apply(count).toString().getBytes());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else {
                             if (PulsarInternalColumn.getInternalFieldsMap().containsKey(fooColumnHandles.get(i).getName())) {
@@ -119,11 +120,11 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
                         }
                     }
                 }
-                Assert.assertEquals(columnsSeen.size(), fooColumnHandles.size());
+                assertEquals(columnsSeen.size(), fooColumnHandles.size());
                 count++;
             }
-            Assert.assertEquals(count, topicsToNumEntries.get(topicName.getSchemaName()).longValue());
-            Assert.assertEquals(pulsarRecordCursor.getCompletedBytes(), completedBytes);
+            assertEquals(count, topicsToNumEntries.get(topicName.getSchemaName()).longValue());
+            assertEquals(pulsarRecordCursor.getCompletedBytes(), completedBytes);
             cleanup();
             pulsarRecordCursor.close();
         }
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index 7c9b4bb..b86a7a5 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -32,14 +32,12 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
@@ -50,6 +48,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
 
 @Test(singleThreaded = true)
 public class TestPulsarSplitManager extends TestPulsarConnector {
@@ -94,23 +93,23 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
 
             int totalSize = 0;
             for (PulsarSplit pulsarSplit : resultCaptor.getResult()) {
-                Assert.assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
-                Assert.assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
-                Assert.assertEquals(pulsarSplit.getTableName(), topicName.getLocalName());
-                Assert.assertEquals(pulsarSplit.getSchema(),
+                assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
+                assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
+                assertEquals(pulsarSplit.getTableName(), topicName.getLocalName());
+                assertEquals(pulsarSplit.getSchema(),
                         new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
-                Assert.assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
-                Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), totalSize);
-                Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
-                Assert.assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, totalSize));
-                Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
-                Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), totalSize + pulsarSplit.getSplitSize());
-                Assert.assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, totalSize + pulsarSplit.getSplitSize()));
+                assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
+                assertEquals(pulsarSplit.getStartPositionEntryId(), totalSize);
+                assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
+                assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, totalSize));
+                assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
+                assertEquals(pulsarSplit.getEndPositionEntryId(), totalSize + pulsarSplit.getSplitSize());
+                assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, totalSize + pulsarSplit.getSplitSize()));
 
                 totalSize += pulsarSplit.getSplitSize();
             }
 
-            Assert.assertEquals(totalSize, topicsToNumEntries.get(topicName.getSchemaName()).intValue());
+            assertEquals(totalSize, topicsToNumEntries.get(topicName.getSchemaName()).intValue());
             cleanup();
         }
 
@@ -142,23 +141,23 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
                 List<PulsarSplit> splits = getSplitsForPartition(topicName.getPartition(i), resultCaptor.getResult());
                 int totalSize = 0;
                 for (PulsarSplit pulsarSplit : splits) {
-                    Assert.assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
-                    Assert.assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
-                    Assert.assertEquals(pulsarSplit.getTableName(), topicName.getPartition(i).getLocalName());
-                    Assert.assertEquals(pulsarSplit.getSchema(),
+                    assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
+                    assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
+                    assertEquals(pulsarSplit.getTableName(), topicName.getPartition(i).getLocalName());
+                    assertEquals(pulsarSplit.getSchema(),
                             new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
-                    Assert.assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
-                    Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), totalSize);
-                    Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
-                    Assert.assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, totalSize));
-                    Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
-                    Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), totalSize + pulsarSplit.getSplitSize());
-                    Assert.assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, totalSize + pulsarSplit.getSplitSize()));
+                    assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
+                    assertEquals(pulsarSplit.getStartPositionEntryId(), totalSize);
+                    assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
+                    assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, totalSize));
+                    assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
+                    assertEquals(pulsarSplit.getEndPositionEntryId(), totalSize + pulsarSplit.getSplitSize());
+                    assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, totalSize + pulsarSplit.getSplitSize()));
 
                     totalSize += pulsarSplit.getSplitSize();
                 }
 
-                Assert.assertEquals(totalSize, topicsToNumEntries.get(topicName.getSchemaName()).intValue());
+                assertEquals(totalSize, topicsToNumEntries.get(topicName.getSchemaName()).intValue());
             }
 
             cleanup();
@@ -166,13 +165,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
     }
 
     private List<PulsarSplit> getSplitsForPartition(TopicName target, Collection<PulsarSplit> splits) {
-        return splits.stream().filter(new Predicate<PulsarSplit>() {
-            @Override
-            public boolean test(PulsarSplit pulsarSplit) {
-                 TopicName topicName = TopicName.get(pulsarSplit.getSchemaName() + "/" + pulsarSplit.getTableName());
-
-                 return target.equals(topicName);
-            }
+        return splits.stream().filter(pulsarSplit -> {
+             TopicName topicName = TopicName.get(pulsarSplit.getSchemaName() + "/" + pulsarSplit.getTableName());
+             return target.equals(topicName);
         }).collect(Collectors.toList());
     }
 
@@ -213,24 +208,24 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
         int totalSize = 0;
         int initalStart = 1;
         for (PulsarSplit pulsarSplit : resultCaptor.getResult()) {
-            Assert.assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
-            Assert.assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
-            Assert.assertEquals(pulsarSplit.getTableName(), topicName.getLocalName());
-            Assert.assertEquals(pulsarSplit.getSchema(),
+            assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
+            assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
+            assertEquals(pulsarSplit.getTableName(), topicName.getLocalName());
+            assertEquals(pulsarSplit.getSchema(),
                     new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
-            Assert.assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
-            Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), initalStart);
-            Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
-            Assert.assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, initalStart));
-            Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
-            Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), initalStart + pulsarSplit.getSplitSize());
-            Assert.assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, initalStart + pulsarSplit
+            assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
+            assertEquals(pulsarSplit.getStartPositionEntryId(), initalStart);
+            assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
+            assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, initalStart));
+            assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
+            assertEquals(pulsarSplit.getEndPositionEntryId(), initalStart + pulsarSplit.getSplitSize());
+            assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, initalStart + pulsarSplit
                     .getSplitSize()));
 
             initalStart += pulsarSplit.getSplitSize();
             totalSize += pulsarSplit.getSplitSize();
         }
-        Assert.assertEquals(totalSize, 49);
+        assertEquals(totalSize, 49);
 
     }
 
@@ -273,26 +268,26 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
         for (int i = 0; i < partitions; i++) {
             List<PulsarSplit> splits = getSplitsForPartition(topicName.getPartition(i), resultCaptor.getResult());
             int totalSize = 0;
-            int initalStart = 1;
+            int initialStart = 1;
             for (PulsarSplit pulsarSplit : splits) {
-                Assert.assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
-                Assert.assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
-                Assert.assertEquals(pulsarSplit.getTableName(), topicName.getPartition(i).getLocalName());
-                Assert.assertEquals(pulsarSplit.getSchema(),
+                assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
+                assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
+                assertEquals(pulsarSplit.getTableName(), topicName.getPartition(i).getLocalName());
+                assertEquals(pulsarSplit.getSchema(),
                         new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
-                Assert.assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
-                Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), initalStart);
-                Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
-                Assert.assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, initalStart));
-                Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
-                Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), initalStart + pulsarSplit.getSplitSize());
-                Assert.assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, initalStart + pulsarSplit.getSplitSize()));
-
-                initalStart += pulsarSplit.getSplitSize();
+                assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
+                assertEquals(pulsarSplit.getStartPositionEntryId(), initialStart);
+                assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
+                assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, initialStart));
+                assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
+                assertEquals(pulsarSplit.getEndPositionEntryId(), initialStart + pulsarSplit.getSplitSize());
+                assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, initialStart + pulsarSplit.getSplitSize()));
+
+                initialStart += pulsarSplit.getSplitSize();
                 totalSize += pulsarSplit.getSplitSize();
             }
 
-            Assert.assertEquals(totalSize, 49);
+            assertEquals(totalSize, 49);
         }
     }