You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/07/08 16:16:10 UTC
[pulsar] branch master updated: Cleanup tests in the presto module
(#4683)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c421ca6 Cleanup tests in the presto module (#4683)
c421ca6 is described below
commit c421ca6e5b0c99f7728c8651ad9a81005bfd176e
Author: vzhikserg <vz...@users.noreply.github.com>
AuthorDate: Mon Jul 8 18:16:05 2019 +0200
Cleanup tests in the presto module (#4683)
* Add static import statements for Assert to simplify the test in the presto module
* Use the preferred way of the schema's creation. The predicates and functions were converted to lambda
---
.../pulsar/sql/presto/TestPulsarConnector.java | 102 +++++++-----------
.../pulsar/sql/presto/TestPulsarMetadata.java | 108 +++++++++----------
.../pulsar/sql/presto/TestPulsarRecordCursor.java | 45 ++++----
.../pulsar/sql/presto/TestPulsarSplitManager.java | 115 ++++++++++-----------
4 files changed, 169 insertions(+), 201 deletions(-)
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 1a5e87a..c38c0ac 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -75,7 +75,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
-import java.util.function.Predicate;
import java.util.stream.Collectors;
import static com.facebook.presto.spi.type.DateType.DATE;
@@ -224,21 +223,19 @@ public abstract class TestPulsarConnector {
partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_6.toString(), 7);
topicsToSchemas = new HashMap<>();
- topicsToSchemas.put(TOPIC_1.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
- topicsToSchemas.put(TOPIC_2.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
- topicsToSchemas.put(TOPIC_3.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
- topicsToSchemas.put(TOPIC_4.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
- topicsToSchemas.put(TOPIC_5.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
- topicsToSchemas.put(TOPIC_6.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
-
-
- topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
-
- topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
- topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
- topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
- topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
- topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_1.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_2.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_3.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_4.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_5.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_6.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
+
+ topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
fooTypes = new HashMap<>();
fooTypes.put("field1", IntegerType.INTEGER);
@@ -536,14 +533,9 @@ public abstract class TestPulsarConnector {
fieldNames10,
positionIndices10));
-
- fooColumnHandles.addAll(PulsarInternalColumn.getInternalFields().stream().map(
- new Function<PulsarInternalColumn, PulsarColumnHandle>() {
- @Override
- public PulsarColumnHandle apply(PulsarInternalColumn pulsarInternalColumn) {
- return pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false);
- }
- }).collect(Collectors.toList()));
+ fooColumnHandles.addAll(PulsarInternalColumn.getInternalFields().stream()
+ .map(pulsarInternalColumn -> pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false))
+ .collect(Collectors.toList()));
splits = new HashMap<>();
@@ -564,11 +556,11 @@ public abstract class TestPulsarConnector {
fooFunctions = new HashMap<>();
fooFunctions.put("field1", integer -> integer);
- fooFunctions.put("field2", integer -> String.valueOf(integer));
- fooFunctions.put("field3", integer -> integer.floatValue());
- fooFunctions.put("field4", integer -> integer.doubleValue());
+ fooFunctions.put("field2", String::valueOf);
+ fooFunctions.put("field3", Integer::floatValue);
+ fooFunctions.put("field4", Integer::doubleValue);
fooFunctions.put("field5", integer -> integer % 2 == 0);
- fooFunctions.put("field6", integer -> integer.longValue());
+ fooFunctions.put("field6", Integer::longValue);
fooFunctions.put("timestamp", integer -> System.currentTimeMillis());
fooFunctions.put("time", integer -> {
LocalTime now = LocalTime.now(ZoneId.systemDefault());
@@ -647,45 +639,24 @@ public abstract class TestPulsarConnector {
private static final Logger log = Logger.get(TestPulsarConnector.class);
protected static List<String> getNamespace(String tenant) {
- return new LinkedList<>(topicNames.stream().filter(new Predicate<TopicName>() {
- @Override
- public boolean test(TopicName topicName) {
- return topicName.getTenant().equals(tenant);
- }
- }).map(new Function<TopicName, String>() {
- @Override
- public String apply(TopicName topicName) {
- return topicName.getNamespace();
- }
- }).collect(Collectors.toSet()));
+ return topicNames.stream()
+ .filter(topicName -> topicName.getTenant().equals(tenant))
+ .map(TopicName::getNamespace)
+ .distinct()
+ .collect(Collectors.toCollection(LinkedList::new));
}
protected static List<String> getTopics(String ns) {
- return topicNames.stream().filter(new Predicate<TopicName>() {
- @Override
- public boolean test(TopicName topicName) {
- return topicName.getNamespace().equals(ns);
- }
- }).map(new Function<TopicName, String>() {
- @Override
- public String apply(TopicName topicName) {
- return topicName.toString();
- }
- }).collect(Collectors.toList());
+ return topicNames.stream()
+ .filter(topicName -> topicName.getNamespace().equals(ns))
+ .map(TopicName::toString).collect(Collectors.toList());
}
protected static List<String> getPartitionedTopics(String ns) {
- return partitionedTopicNames.stream().filter(new Predicate<TopicName>() {
- @Override
- public boolean test(TopicName topicName) {
- return topicName.getNamespace().equals(ns);
- }
- }).map(new Function<TopicName, String>() {
- @Override
- public String apply(TopicName topicName) {
- return topicName.toString();
- }
- }).collect(Collectors.toList());
+ return partitionedTopicNames.stream()
+ .filter(topicName -> topicName.getNamespace().equals(ns))
+ .map(TopicName::toString)
+ .collect(Collectors.toList());
}
@BeforeMethod
@@ -696,12 +667,9 @@ public abstract class TestPulsarConnector {
this.pulsarConnectorConfig.setMaxSplitMessageQueueSize(100);
Tenants tenants = mock(Tenants.class);
- doReturn(new LinkedList<>(topicNames.stream().map(new Function<TopicName, String>() {
- @Override
- public String apply(TopicName topicName) {
- return topicName.getTenant();
- }
- }).collect(Collectors.toSet()))).when(tenants).getTenants();
+ doReturn(new LinkedList<>(topicNames.stream()
+ .map(TopicName::getTenant)
+ .collect(Collectors.toSet()))).when(tenants).getTenants();
Namespaces namespaces = mock(Namespaces.class);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index f03b34f..a704be6 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -34,7 +34,6 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.core.Response;
-import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Arrays;
@@ -49,6 +48,11 @@ import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
@Test(singleThreaded = true)
public class TestPulsarMetadata extends TestPulsarConnector {
@@ -62,7 +66,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
String[] expectedSchemas = {NAMESPACE_NAME_1.toString(), NAMESPACE_NAME_2.toString(),
NAMESPACE_NAME_3.toString(), NAMESPACE_NAME_4.toString()};
- Assert.assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
+ assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
}
@Test
@@ -73,14 +77,14 @@ public class TestPulsarMetadata extends TestPulsarConnector {
ConnectorTableHandle connectorTableHandle
= this.pulsarMetadata.getTableHandle(mock(ConnectorSession.class), schemaTableName);
- Assert.assertTrue(connectorTableHandle instanceof PulsarTableHandle);
+ assertTrue(connectorTableHandle instanceof PulsarTableHandle);
PulsarTableHandle pulsarTableHandle = (PulsarTableHandle) connectorTableHandle;
- Assert.assertEquals(pulsarTableHandle.getConnectorId(), pulsarConnectorId.toString());
- Assert.assertEquals(pulsarTableHandle.getSchemaName(), TOPIC_1.getNamespace());
- Assert.assertEquals(pulsarTableHandle.getTableName(), TOPIC_1.getLocalName());
- Assert.assertEquals(pulsarTableHandle.getTopicName(), TOPIC_1.getLocalName());
+ assertEquals(pulsarTableHandle.getConnectorId(), pulsarConnectorId.toString());
+ assertEquals(pulsarTableHandle.getSchemaName(), TOPIC_1.getNamespace());
+ assertEquals(pulsarTableHandle.getTableName(), TOPIC_1.getLocalName());
+ assertEquals(pulsarTableHandle.getTopicName(), TOPIC_1.getLocalName());
}
@Test
@@ -101,10 +105,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
pulsarTableHandle);
- Assert.assertEquals(tableMetadata.getTable().getSchemaName(), topic.getNamespace());
- Assert.assertEquals(tableMetadata.getTable().getTableName(), topic.getLocalName());
+ assertEquals(tableMetadata.getTable().getSchemaName(), topic.getNamespace());
+ assertEquals(tableMetadata.getTable().getTableName(), topic.getLocalName());
- Assert.assertEquals(tableMetadata.getColumns().size(),
+ assertEquals(tableMetadata.getColumns().size(),
fooColumnHandles.size());
List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
@@ -115,7 +119,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
for (ColumnMetadata column : tableMetadata.getColumns()) {
if (PulsarInternalColumn.getInternalFieldsMap().containsKey(column.getName())) {
- Assert.assertEquals(column.getComment(),
+ assertEquals(column.getComment(),
PulsarInternalColumn.getInternalFieldsMap()
.get(column.getName()).getColumnMetadata(true).getComment());
}
@@ -123,7 +127,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
fieldNames.remove(column.getName());
}
- Assert.assertTrue(fieldNames.isEmpty());
+ assertTrue(fieldNames.isEmpty());
}
}
@@ -140,10 +144,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
try {
ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
pulsarTableHandle);
- Assert.fail("Invalid schema should have generated an exception");
+ fail("Invalid schema should have generated an exception");
} catch (PrestoException e) {
- Assert.assertEquals(e.getErrorCode(), NOT_FOUND.toErrorCode());
- Assert.assertEquals(e.getMessage(), "Schema wrong-tenant/wrong-ns does not exist");
+ assertEquals(e.getErrorCode(), NOT_FOUND.toErrorCode());
+ assertEquals(e.getMessage(), "Schema wrong-tenant/wrong-ns does not exist");
}
}
@@ -160,10 +164,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
try {
ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
pulsarTableHandle);
- Assert.fail("Invalid table should have generated an exception");
+ fail("Invalid table should have generated an exception");
} catch (TableNotFoundException e) {
- Assert.assertEquals(e.getErrorCode(), NOT_FOUND.toErrorCode());
- Assert.assertEquals(e.getMessage(), "Table 'tenant-1/ns-1.wrong-topic' not found");
+ assertEquals(e.getErrorCode(), NOT_FOUND.toErrorCode());
+ assertEquals(e.getMessage(), "Table 'tenant-1/ns-1.wrong-topic' not found");
}
}
@@ -183,7 +187,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
pulsarTableHandle);
- Assert.assertEquals(tableMetadata.getColumns().size(), 0);
+ assertEquals(tableMetadata.getColumns().size(), 0);
}
@Test
@@ -203,10 +207,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
try {
ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
pulsarTableHandle);
- Assert.fail("Table without schema should have generated an exception");
+ fail("Table without schema should have generated an exception");
} catch (PrestoException e) {
- Assert.assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
- Assert.assertEquals(e.getMessage(),
+ assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
+ assertEquals(e.getMessage(),
"Topic persistent://tenant-1/ns-1/topic-1 does not have a valid schema");
}
}
@@ -228,27 +232,27 @@ public class TestPulsarMetadata extends TestPulsarConnector {
try {
ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
pulsarTableHandle);
- Assert.fail("Table without schema should have generated an exception");
+ fail("Table without schema should have generated an exception");
} catch (PrestoException e) {
- Assert.assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
- Assert.assertEquals(e.getMessage(),
+ assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
+ assertEquals(e.getMessage(),
"Topic persistent://tenant-1/ns-1/topic-1 does not have a valid schema");
}
}
@Test
public void testListTable() {
- Assert.assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), null).isEmpty());
- Assert.assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), "wrong-tenant/wrong-ns")
+ assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), null).isEmpty());
+ assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), "wrong-tenant/wrong-ns")
.isEmpty());
SchemaTableName[] expectedTopics1 = {new SchemaTableName(TOPIC_4.getNamespace(), TOPIC_4.getLocalName())};
- Assert.assertEquals(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
+ assertEquals(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
NAMESPACE_NAME_3.toString()), Arrays.asList(expectedTopics1));
SchemaTableName[] expectedTopics2 = {new SchemaTableName(TOPIC_5.getNamespace(), TOPIC_5.getLocalName()),
new SchemaTableName(TOPIC_6.getNamespace(), TOPIC_6.getLocalName())};
- Assert.assertEquals(new HashSet<>(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
+ assertEquals(new HashSet<>(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
NAMESPACE_NAME_4.toString())), new HashSet<>(Arrays.asList(expectedTopics2)));
}
@@ -267,25 +271,25 @@ public class TestPulsarMetadata extends TestPulsarConnector {
}
for (String field : fieldNames) {
- Assert.assertNotNull(columnHandleMap.get(field));
+ assertNotNull(columnHandleMap.get(field));
PulsarColumnHandle pulsarColumnHandle = (PulsarColumnHandle) columnHandleMap.get(field);
PulsarInternalColumn pulsarInternalColumn = PulsarInternalColumn.getInternalFieldsMap().get(field);
if (pulsarInternalColumn != null) {
- Assert.assertEquals(pulsarColumnHandle,
+ assertEquals(pulsarColumnHandle,
pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false));
} else {
Schema schema = new Schema.Parser().parse(new String(topicsToSchemas.get(TOPIC_1.getSchemaName())
.getSchema()));
- Assert.assertEquals(pulsarColumnHandle.getConnectorId(), pulsarConnectorId.toString());
- Assert.assertEquals(pulsarColumnHandle.getName(), field);
- Assert.assertEquals(pulsarColumnHandle.getPositionIndices(), fooPositionIndices.get(field));
- Assert.assertEquals(pulsarColumnHandle.getFieldNames(), fooFieldNames.get(field));
- Assert.assertEquals(pulsarColumnHandle.getType(), fooTypes.get(field));
- Assert.assertEquals(pulsarColumnHandle.isHidden(), false);
+ assertEquals(pulsarColumnHandle.getConnectorId(), pulsarConnectorId.toString());
+ assertEquals(pulsarColumnHandle.getName(), field);
+ assertEquals(pulsarColumnHandle.getPositionIndices(), fooPositionIndices.get(field));
+ assertEquals(pulsarColumnHandle.getFieldNames(), fooFieldNames.get(field));
+ assertEquals(pulsarColumnHandle.getType(), fooTypes.get(field));
+ assertFalse(pulsarColumnHandle.isHidden());
}
columnHandleMap.remove(field);
}
- Assert.assertTrue(columnHandleMap.isEmpty());
+ assertTrue(columnHandleMap.isEmpty());
}
@Test
@@ -294,11 +298,11 @@ public class TestPulsarMetadata extends TestPulsarConnector {
= this.pulsarMetadata.listTableColumns(mock(ConnectorSession.class),
new SchemaTablePrefix(TOPIC_1.getNamespace()));
- Assert.assertEquals(tableColumnsMap.size(), 2);
+ assertEquals(tableColumnsMap.size(), 2);
List<ColumnMetadata> columnMetadataList
= tableColumnsMap.get(new SchemaTableName(TOPIC_1.getNamespace(), TOPIC_1.getLocalName()));
- Assert.assertNotNull(columnMetadataList);
- Assert.assertEquals(columnMetadataList.size(),
+ assertNotNull(columnMetadataList);
+ assertEquals(columnMetadataList.size(),
fooColumnHandles.size());
List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
@@ -309,7 +313,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
for (ColumnMetadata column : columnMetadataList) {
if (PulsarInternalColumn.getInternalFieldsMap().containsKey(column.getName())) {
- Assert.assertEquals(column.getComment(),
+ assertEquals(column.getComment(),
PulsarInternalColumn.getInternalFieldsMap()
.get(column.getName()).getColumnMetadata(true).getComment());
}
@@ -317,11 +321,11 @@ public class TestPulsarMetadata extends TestPulsarConnector {
fieldNames.remove(column.getName());
}
- Assert.assertTrue(fieldNames.isEmpty());
+ assertTrue(fieldNames.isEmpty());
columnMetadataList = tableColumnsMap.get(new SchemaTableName(TOPIC_2.getNamespace(), TOPIC_2.getLocalName()));
- Assert.assertNotNull(columnMetadataList);
- Assert.assertEquals(columnMetadataList.size(),
+ assertNotNull(columnMetadataList);
+ assertEquals(columnMetadataList.size(),
fooColumnHandles.size());
fieldNames = new LinkedList<>(fooFieldNames.keySet());
@@ -332,7 +336,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
for (ColumnMetadata column : columnMetadataList) {
if (PulsarInternalColumn.getInternalFieldsMap().containsKey(column.getName())) {
- Assert.assertEquals(column.getComment(),
+ assertEquals(column.getComment(),
PulsarInternalColumn.getInternalFieldsMap()
.get(column.getName()).getColumnMetadata(true).getComment());
}
@@ -340,17 +344,17 @@ public class TestPulsarMetadata extends TestPulsarConnector {
fieldNames.remove(column.getName());
}
- Assert.assertTrue(fieldNames.isEmpty());
+ assertTrue(fieldNames.isEmpty());
// test table and schema
tableColumnsMap
= this.pulsarMetadata.listTableColumns(mock(ConnectorSession.class),
new SchemaTablePrefix(TOPIC_4.getNamespace(), TOPIC_4.getLocalName()));
- Assert.assertEquals(tableColumnsMap.size(), 1);
+ assertEquals(tableColumnsMap.size(), 1);
columnMetadataList = tableColumnsMap.get(new SchemaTableName(TOPIC_4.getNamespace(), TOPIC_4.getLocalName()));
- Assert.assertNotNull(columnMetadataList);
- Assert.assertEquals(columnMetadataList.size(),
+ assertNotNull(columnMetadataList);
+ assertEquals(columnMetadataList.size(),
fooColumnHandles.size());
fieldNames = new LinkedList<>(fooFieldNames.keySet());
@@ -361,7 +365,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
for (ColumnMetadata column : columnMetadataList) {
if (PulsarInternalColumn.getInternalFieldsMap().containsKey(column.getName())) {
- Assert.assertEquals(column.getComment(),
+ assertEquals(column.getComment(),
PulsarInternalColumn.getInternalFieldsMap()
.get(column.getName()).getColumnMetadata(true).getComment());
}
@@ -369,6 +373,6 @@ public class TestPulsarMetadata extends TestPulsarConnector {
fieldNames.remove(column.getName());
}
- Assert.assertTrue(fieldNames.isEmpty());
+ assertTrue(fieldNames.isEmpty());
}
}
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 7a8b18f..c81fc40 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -20,13 +20,14 @@ package org.apache.pulsar.sql.presto;
import io.airlift.log.Logger;
import org.apache.pulsar.common.naming.TopicName;
-import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import static org.testng.Assert.assertEquals;
+
@Test(singleThreaded = true)
public class TestPulsarRecordCursor extends TestPulsarConnector {
@@ -50,22 +51,22 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
columnsSeen.add(fooColumnHandles.get(i).getName());
} else {
if (fooColumnHandles.get(i).getName().equals("field1")) {
- Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("field1").apply(count)).longValue());
+ assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("field1").apply(count)).longValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("field2")) {
- Assert.assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) fooFunctions.get("field2").apply(count)).getBytes());
+ assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) fooFunctions.get("field2").apply(count)).getBytes());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("field3")) {
- Assert.assertEquals(pulsarRecordCursor.getLong(i), Float.floatToIntBits(((Float) fooFunctions.get("field3").apply(count)).floatValue()));
+ assertEquals(pulsarRecordCursor.getLong(i), Float.floatToIntBits(((Float) fooFunctions.get("field3").apply(count)).floatValue()));
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("field4")) {
- Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("field4").apply(count)).doubleValue());
+ assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("field4").apply(count)).doubleValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("field5")) {
- Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("field5").apply(count)).booleanValue());
+ assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("field5").apply(count)).booleanValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("field6")) {
- Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("field6").apply(count)).longValue());
+ assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("field6").apply(count)).longValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("timestamp")) {
pulsarRecordCursor.getLong(i);
@@ -77,40 +78,40 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
pulsarRecordCursor.getLong(i);
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.field1")) {
- Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.field1").apply(count)).longValue());
+ assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.field1").apply(count)).longValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.field2")) {
- Assert.assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) fooFunctions.get("bar.field2").apply(count)).getBytes());
+ assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) fooFunctions.get("bar.field2").apply(count)).getBytes());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.field3")) {
- Assert.assertEquals(pulsarRecordCursor.getLong(i), Float.floatToIntBits(((Float) fooFunctions.get("bar.field3").apply(count)).floatValue()));
+ assertEquals(pulsarRecordCursor.getLong(i), Float.floatToIntBits(((Float) fooFunctions.get("bar.field3").apply(count)).floatValue()));
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.test.field4")) {
- Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("bar.test.field4").apply(count)).doubleValue());
+ assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("bar.test.field4").apply(count)).doubleValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.test.field5")) {
- Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("bar.test.field5").apply(count)).booleanValue());
+ assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("bar.test.field5").apply(count)).booleanValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.test.field6")) {
- Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("bar.test.field6").apply(count)).longValue());
+ assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("bar.test.field6").apply(count)).longValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.test.foobar.field1")) {
- Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test.foobar.field1").apply(count)).longValue());
+ assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test.foobar.field1").apply(count)).longValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.test2.field4")) {
- Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("bar.test2.field4").apply(count)).doubleValue());
+ assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("bar.test2.field4").apply(count)).doubleValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.test2.field5")) {
- Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("bar.test2.field5").apply(count)).booleanValue());
+ assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("bar.test2.field5").apply(count)).booleanValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.test2.field6")) {
- Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("bar.test2.field6").apply(count)).longValue());
+ assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("bar.test2.field6").apply(count)).longValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.test2.foobar.field1")) {
- Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test2.foobar.field1").apply(count)).longValue());
+ assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test2.foobar.field1").apply(count)).longValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("field7")) {
- Assert.assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), fooFunctions.get("field7").apply(count).toString().getBytes());
+ assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), fooFunctions.get("field7").apply(count).toString().getBytes());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else {
if (PulsarInternalColumn.getInternalFieldsMap().containsKey(fooColumnHandles.get(i).getName())) {
@@ -119,11 +120,11 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
}
}
}
- Assert.assertEquals(columnsSeen.size(), fooColumnHandles.size());
+ assertEquals(columnsSeen.size(), fooColumnHandles.size());
count++;
}
- Assert.assertEquals(count, topicsToNumEntries.get(topicName.getSchemaName()).longValue());
- Assert.assertEquals(pulsarRecordCursor.getCompletedBytes(), completedBytes);
+ assertEquals(count, topicsToNumEntries.get(topicName.getSchemaName()).longValue());
+ assertEquals(pulsarRecordCursor.getCompletedBytes(), completedBytes);
cleanup();
pulsarRecordCursor.close();
}
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index 7c9b4bb..b86a7a5 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -32,14 +32,12 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.function.Predicate;
import java.util.stream.Collectors;
import static com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
@@ -50,6 +48,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
@Test(singleThreaded = true)
public class TestPulsarSplitManager extends TestPulsarConnector {
@@ -94,23 +93,23 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
int totalSize = 0;
for (PulsarSplit pulsarSplit : resultCaptor.getResult()) {
- Assert.assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
- Assert.assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
- Assert.assertEquals(pulsarSplit.getTableName(), topicName.getLocalName());
- Assert.assertEquals(pulsarSplit.getSchema(),
+ assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
+ assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
+ assertEquals(pulsarSplit.getTableName(), topicName.getLocalName());
+ assertEquals(pulsarSplit.getSchema(),
new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
- Assert.assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
- Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), totalSize);
- Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
- Assert.assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, totalSize));
- Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
- Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), totalSize + pulsarSplit.getSplitSize());
- Assert.assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, totalSize + pulsarSplit.getSplitSize()));
+ assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
+ assertEquals(pulsarSplit.getStartPositionEntryId(), totalSize);
+ assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
+ assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, totalSize));
+ assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
+ assertEquals(pulsarSplit.getEndPositionEntryId(), totalSize + pulsarSplit.getSplitSize());
+ assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, totalSize + pulsarSplit.getSplitSize()));
totalSize += pulsarSplit.getSplitSize();
}
- Assert.assertEquals(totalSize, topicsToNumEntries.get(topicName.getSchemaName()).intValue());
+ assertEquals(totalSize, topicsToNumEntries.get(topicName.getSchemaName()).intValue());
cleanup();
}
@@ -142,23 +141,23 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
List<PulsarSplit> splits = getSplitsForPartition(topicName.getPartition(i), resultCaptor.getResult());
int totalSize = 0;
for (PulsarSplit pulsarSplit : splits) {
- Assert.assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
- Assert.assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
- Assert.assertEquals(pulsarSplit.getTableName(), topicName.getPartition(i).getLocalName());
- Assert.assertEquals(pulsarSplit.getSchema(),
+ assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
+ assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
+ assertEquals(pulsarSplit.getTableName(), topicName.getPartition(i).getLocalName());
+ assertEquals(pulsarSplit.getSchema(),
new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
- Assert.assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
- Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), totalSize);
- Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
- Assert.assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, totalSize));
- Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
- Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), totalSize + pulsarSplit.getSplitSize());
- Assert.assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, totalSize + pulsarSplit.getSplitSize()));
+ assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
+ assertEquals(pulsarSplit.getStartPositionEntryId(), totalSize);
+ assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
+ assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, totalSize));
+ assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
+ assertEquals(pulsarSplit.getEndPositionEntryId(), totalSize + pulsarSplit.getSplitSize());
+ assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, totalSize + pulsarSplit.getSplitSize()));
totalSize += pulsarSplit.getSplitSize();
}
- Assert.assertEquals(totalSize, topicsToNumEntries.get(topicName.getSchemaName()).intValue());
+ assertEquals(totalSize, topicsToNumEntries.get(topicName.getSchemaName()).intValue());
}
cleanup();
@@ -166,13 +165,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
}
private List<PulsarSplit> getSplitsForPartition(TopicName target, Collection<PulsarSplit> splits) {
- return splits.stream().filter(new Predicate<PulsarSplit>() {
- @Override
- public boolean test(PulsarSplit pulsarSplit) {
- TopicName topicName = TopicName.get(pulsarSplit.getSchemaName() + "/" + pulsarSplit.getTableName());
-
- return target.equals(topicName);
- }
+ return splits.stream().filter(pulsarSplit -> {
+ TopicName topicName = TopicName.get(pulsarSplit.getSchemaName() + "/" + pulsarSplit.getTableName());
+ return target.equals(topicName);
}).collect(Collectors.toList());
}
@@ -213,24 +208,24 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
int totalSize = 0;
int initalStart = 1;
for (PulsarSplit pulsarSplit : resultCaptor.getResult()) {
- Assert.assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
- Assert.assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
- Assert.assertEquals(pulsarSplit.getTableName(), topicName.getLocalName());
- Assert.assertEquals(pulsarSplit.getSchema(),
+ assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
+ assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
+ assertEquals(pulsarSplit.getTableName(), topicName.getLocalName());
+ assertEquals(pulsarSplit.getSchema(),
new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
- Assert.assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
- Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), initalStart);
- Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
- Assert.assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, initalStart));
- Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
- Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), initalStart + pulsarSplit.getSplitSize());
- Assert.assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, initalStart + pulsarSplit
+ assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
+ assertEquals(pulsarSplit.getStartPositionEntryId(), initalStart);
+ assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
+ assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, initalStart));
+ assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
+ assertEquals(pulsarSplit.getEndPositionEntryId(), initalStart + pulsarSplit.getSplitSize());
+ assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, initalStart + pulsarSplit
.getSplitSize()));
initalStart += pulsarSplit.getSplitSize();
totalSize += pulsarSplit.getSplitSize();
}
- Assert.assertEquals(totalSize, 49);
+ assertEquals(totalSize, 49);
}
@@ -273,26 +268,26 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
for (int i = 0; i < partitions; i++) {
List<PulsarSplit> splits = getSplitsForPartition(topicName.getPartition(i), resultCaptor.getResult());
int totalSize = 0;
- int initalStart = 1;
+ int initialStart = 1;
for (PulsarSplit pulsarSplit : splits) {
- Assert.assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
- Assert.assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
- Assert.assertEquals(pulsarSplit.getTableName(), topicName.getPartition(i).getLocalName());
- Assert.assertEquals(pulsarSplit.getSchema(),
+ assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
+ assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
+ assertEquals(pulsarSplit.getTableName(), topicName.getPartition(i).getLocalName());
+ assertEquals(pulsarSplit.getSchema(),
new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
- Assert.assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
- Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), initalStart);
- Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
- Assert.assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, initalStart));
- Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
- Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), initalStart + pulsarSplit.getSplitSize());
- Assert.assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, initalStart + pulsarSplit.getSplitSize()));
-
- initalStart += pulsarSplit.getSplitSize();
+ assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
+ assertEquals(pulsarSplit.getStartPositionEntryId(), initialStart);
+ assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
+ assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, initialStart));
+ assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
+ assertEquals(pulsarSplit.getEndPositionEntryId(), initialStart + pulsarSplit.getSplitSize());
+ assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, initialStart + pulsarSplit.getSplitSize()));
+
+ initialStart += pulsarSplit.getSplitSize();
totalSize += pulsarSplit.getSplitSize();
}
- Assert.assertEquals(totalSize, 49);
+ assertEquals(totalSize, 49);
}
}