You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/10 00:55:55 UTC

[pulsar] branch master updated: [pulsar-sql] Make partition as internal column (#4888)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5adc522  [pulsar-sql] Make partition as internal column (#4888)
5adc522 is described below

commit 5adc522e992d64ff564a187a4d08a5a408bed8de
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat Aug 10 08:55:49 2019 +0800

    [pulsar-sql] Make partition as internal column (#4888)
    
    Fixes #4785
    
    ### Motivation
    
    1. Stop return partition name in table list, just return the partitioned topic name in table list. This will avoid huge tables while user create large number of partition.
    2. Make partition as internal column, provide users with the ability to get which partition data in and filtration based on partition. For example:
    ```
    SELECT * FROM "my-table" WHERE "__partition__" = 0;
    SELECT * FROM "my-table" WHERE "__partition__" in (2,3);
    SELECT * FROM "my-table" WHERE "__partition__" < 1;
    ```
    ### Modifications
    
    1. Add "__partition__" internal column.
    2. Add domain handle for "__partition__".
    
    ### Verifying this change
    
    Added new unit test to verify this change
    
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API: (yes)
      - The schema: (no)
      - The default values of configurations: (no)
      - The wire protocol: ( no)
      - The rest endpoints: (no)
      - The admin cli options: (no)
      - Anything that affects deployment: (no)
    
    ### Documentation
    
      - Does this pull request introduce a new feature? (yes)
---
 .../pulsar/sql/presto/PulsarInternalColumn.java    | 22 +++++-
 .../apache/pulsar/sql/presto/PulsarMetadata.java   |  7 +-
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  9 ++-
 .../pulsar/sql/presto/PulsarSplitManager.java      | 86 +++++++++++++++------
 .../pulsar/sql/presto/TestPulsarConnector.java     | 14 +++-
 .../pulsar/sql/presto/TestPulsarMetadata.java      | 14 +++-
 .../pulsar/sql/presto/TestPulsarSplitManager.java  | 90 ++++++++++++++++++++++
 7 files changed, 209 insertions(+), 33 deletions(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
index f511f8a..71107f8 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.util.Objects.requireNonNull;
 
 import com.facebook.presto.spi.type.BigintType;
+import com.facebook.presto.spi.type.IntegerType;
 import com.facebook.presto.spi.type.TimestampType;
 import com.facebook.presto.spi.type.Type;
 import com.facebook.presto.spi.type.VarcharType;
@@ -41,6 +42,21 @@ import org.apache.pulsar.common.api.raw.RawMessage;
 public abstract class PulsarInternalColumn {
 
     /**
+     * Internal column representing the partition.
+     */
+    public static class PartitionColumn extends PulsarInternalColumn {
+
+        PartitionColumn(String name, Type type, String comment) {
+            super(name, type, comment);
+        }
+
+        @Override
+        public Object getData(RawMessage message) {
+            return null;
+        }
+    }
+
+    /**
      * Internal column representing the event time.
      */
     public static class EventTimeColumn extends PulsarInternalColumn {
@@ -151,6 +167,9 @@ public abstract class PulsarInternalColumn {
         }
     }
 
+    public static final PartitionColumn PARTITION = new PartitionColumn("__partition__", IntegerType.INTEGER,
+        "The partition number which the message belongs to");
+
     public static final PulsarInternalColumn EVENT_TIME = new EventTimeColumn("__event_time__", TimestampType
             .TIMESTAMP, "Application defined timestamp in milliseconds of when the event occurred");
 
@@ -207,7 +226,8 @@ public abstract class PulsarInternalColumn {
     }
 
     public static Set<PulsarInternalColumn> getInternalFields() {
-        return ImmutableSet.of(EVENT_TIME, PUBLISH_TIME, MESSAGE_ID, SEQUENCE_ID, PRODUCER_NAME, KEY, PROPERTIES);
+        return ImmutableSet.of(PARTITION, EVENT_TIME, PUBLISH_TIME, MESSAGE_ID, SEQUENCE_ID, PRODUCER_NAME, KEY,
+            PROPERTIES);
     }
 
     public static Map<String, PulsarInternalColumn> getInternalFieldsMap() {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index 360baaf..2ee4a41 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -189,8 +189,11 @@ public class PulsarMetadata implements ConnectorMetadata {
                             + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
                 }
                 if (pulsarTopicList != null) {
-                    pulsarTopicList.forEach(topic -> builder.add(
-                            new SchemaTableName(schemaNameOrNull, TopicName.get(topic).getLocalName())));
+                    pulsarTopicList.stream()
+                        .map(topic -> TopicName.get(topic).getPartitionedTopicName())
+                        .distinct()
+                        .forEach(topic -> builder.add(new SchemaTableName(schemaNameOrNull,
+                            TopicName.get(topic).getLocalName())));
                 }
             }
         }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 5cc6e8f..4d5a25d 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.common.api.raw.MessageParser;
 import org.apache.pulsar.common.api.raw.RawMessage;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.sql.presto.PulsarInternalColumn.PartitionColumn;
 import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.SpscArrayQueue;
 
@@ -91,6 +92,7 @@ public class PulsarRecordCursor implements RecordCursor {
     // are empty or not
     private final long splitSize;
     private long entriesProcessed = 0;
+    private int partition = -1;
 
 
     private static final Logger log = Logger.get(PulsarRecordCursor.class);
@@ -127,6 +129,7 @@ public class PulsarRecordCursor implements RecordCursor {
         PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
         this.columnHandles = columnHandles;
         this.pulsarSplit = pulsarSplit;
+        this.partition = TopicName.getPartitionIndex(pulsarSplit.getTableName());
         this.pulsarConnectorConfig = pulsarConnectorConfig;
         this.maxBatchSize = pulsarConnectorConfig.getMaxEntryReadBatchSize();
         this.messageQueue = new SpscArrayQueue<>(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
@@ -426,7 +429,11 @@ public class PulsarRecordCursor implements RecordCursor {
         if (pulsarColumnHandle.isInternal()) {
             String fieldName = this.columnHandles.get(fieldIndex).getName();
             PulsarInternalColumn pulsarInternalColumn = this.internalColumnMap.get(fieldName);
-            data = pulsarInternalColumn.getData(this.currentMessage);
+            if (pulsarInternalColumn instanceof PartitionColumn) {
+                data = this.partition;
+            } else {
+                data = pulsarInternalColumn.getData(this.currentMessage);
+            }
         } else {
             data = this.schemaHandler.extractField(fieldIndex, this.currentRecord);
         }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 6da5df4..8531df0 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -39,6 +39,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import io.airlift.log.Logger;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -136,45 +137,84 @@ public class PulsarSplitManager implements ConnectorSplitManager {
     @VisibleForTesting
     Collection<PulsarSplit> getSplitsPartitionedTopic(int numSplits, TopicName topicName, PulsarTableHandle
             tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain) throws Exception {
-        int numPartitions;
-        try {
-            numPartitions = (this.pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString())).partitions;
-        } catch (PulsarAdminException e) {
-            if (e.getStatusCode() == 401) {
-                throw new PrestoException(QUERY_REJECTED,
-                        String.format("Failed to get metadata for partitioned topic %s: Unauthorized", topicName));
-            }
 
-            throw new RuntimeException("Failed to get metadata for partitioned topic "
-                + topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
+        List<Integer> predicatedPartitions = getPredicatedPartitions(topicName, tupleDomain);
+        if (log.isDebugEnabled()) {
+            log.debug("Partition filter result %s", predicatedPartitions);
         }
 
-        int actualNumSplits = Math.max(numPartitions, numSplits);
+        int actualNumSplits = Math.max(predicatedPartitions.size(), numSplits);
 
-        int splitsPerPartition = actualNumSplits / numPartitions;
+        int splitsPerPartition = actualNumSplits / predicatedPartitions.size();
 
-        int splitRemainder = actualNumSplits % numPartitions;
+        int splitRemainder = actualNumSplits % predicatedPartitions.size();
 
         ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig)
                 .getManagedLedgerFactory();
 
         List<PulsarSplit> splits = new LinkedList<>();
-        for (int i = 0; i < numPartitions; i++) {
-
+        for (int i = 0; i < predicatedPartitions.size(); i++) {
             int splitsForThisPartition = (splitRemainder > i) ? splitsPerPartition + 1 : splitsPerPartition;
             splits.addAll(
-                    getSplitsForTopic(
-                            topicName.getPartition(i).getPersistenceNamingEncoding(),
-                            managedLedgerFactory,
-                            splitsForThisPartition,
-                            tableHandle,
-                            schemaInfo,
-                            topicName.getPartition(i).getLocalName(),
-                            tupleDomain));
+                getSplitsForTopic(
+                    topicName.getPartition(predicatedPartitions.get(i)).getPersistenceNamingEncoding(),
+                    managedLedgerFactory,
+                    splitsForThisPartition,
+                    tableHandle,
+                    schemaInfo,
+                    topicName.getPartition(predicatedPartitions.get(i)).getLocalName(),
+                    tupleDomain));
         }
         return splits;
     }
 
+    private List<Integer> getPredicatedPartitions(TopicName topicName, TupleDomain<ColumnHandle> tupleDomain) {
+        int numPartitions;
+        try {
+            numPartitions = (this.pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString())).partitions;
+        } catch (PulsarAdminException e) {
+            if (e.getStatusCode() == 401) {
+                throw new PrestoException(QUERY_REJECTED,
+                    String.format("Failed to get metadata for partitioned topic %s: Unauthorized", topicName));
+            }
+
+            throw new RuntimeException("Failed to get metadata for partitioned topic "
+                + topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
+        }
+        List<Integer> predicatePartitions = new ArrayList<>();
+        if (tupleDomain.getDomains().isPresent()) {
+            Domain domain = tupleDomain.getDomains().get().get(PulsarInternalColumn.PARTITION
+                .getColumnHandle(connectorId, false));
+            if (domain != null) {
+                domain.getValues().getValuesProcessor().consume(
+                    ranges -> domain.getValues().getRanges().getOrderedRanges().forEach(range -> {
+                        Integer low = 0;
+                        Integer high = numPartitions;
+                        if (!range.getLow().isLowerUnbounded() && range.getLow().getValueBlock().isPresent()) {
+                            low = range.getLow().getValueBlock().get().getInt(0, 0);
+                        }
+                        if (!range.getHigh().isLowerUnbounded() && range.getHigh().getValueBlock().isPresent()) {
+                            high = range.getHigh().getValueBlock().get().getInt(0, 0);
+                        }
+                        for (int i = low; i <= high; i++) {
+                            predicatePartitions.add(i);
+                        }
+                    }),
+                    discreteValues -> {},
+                    allOrNone -> {});
+            } else {
+                for (int i = 0; i < numPartitions; i++) {
+                    predicatePartitions.add(i);
+                }
+            }
+        } else {
+            for (int i = 0; i < numPartitions; i++) {
+                predicatePartitions.add(i);
+            }
+        }
+        return predicatePartitions;
+    }
+
     @VisibleForTesting
     Collection<PulsarSplit> getSplitsNonPartitionedTopic(int numSplits, TopicName topicName,
             PulsarTableHandle tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain)
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 84228d5..030876c 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -71,6 +71,7 @@ import java.time.LocalDate;
 import java.time.LocalTime;
 import java.time.ZoneId;
 import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -130,6 +131,7 @@ public abstract class TestPulsarConnector {
     protected static final TopicName TOPIC_5 = TopicName.get("persistent", NAMESPACE_NAME_4, "topic-1");
     protected static final TopicName TOPIC_6 = TopicName.get("persistent", NAMESPACE_NAME_4, "topic-2");
 
+
     protected static final TopicName PARTITIONED_TOPIC_1 = TopicName.get("persistent", NAMESPACE_NAME_1,
             "partitioned-topic-1");
     protected static final TopicName PARTITIONED_TOPIC_2 = TopicName.get("persistent", NAMESPACE_NAME_1,
@@ -216,6 +218,7 @@ public abstract class TestPulsarConnector {
             partitionedTopicNames.add(PARTITIONED_TOPIC_5);
             partitionedTopicNames.add(PARTITIONED_TOPIC_6);
 
+
             partitionedTopicsToPartitions = new HashMap<>();
             partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_1.toString(), 2);
             partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_2.toString(), 3);
@@ -270,6 +273,7 @@ public abstract class TestPulsarConnector {
             topicsToNumEntries.put(TOPIC_4.getSchemaName(), 12345L);
             topicsToNumEntries.put(TOPIC_5.getSchemaName(), 8000L);
             topicsToNumEntries.put(TOPIC_6.getSchemaName(), 1L);
+
             topicsToNumEntries.put(PARTITIONED_TOPIC_1.getSchemaName(), 1233L);
             topicsToNumEntries.put(PARTITIONED_TOPIC_2.getSchemaName(), 8000L);
             topicsToNumEntries.put(PARTITIONED_TOPIC_3.getSchemaName(), 100L);
@@ -649,9 +653,15 @@ public abstract class TestPulsarConnector {
     }
 
     protected static List<String> getTopics(String ns) {
-        return topicNames.stream()
+        List<String> topics = new ArrayList<>(topicNames.stream()
             .filter(topicName -> topicName.getNamespace().equals(ns))
-            .map(TopicName::toString).collect(Collectors.toList());
+            .map(TopicName::toString).collect(Collectors.toList()));
+        partitionedTopicNames.stream().filter(topicName -> topicName.getNamespace().equals(ns)).forEach(topicName -> {
+            for (Integer i = 0; i < partitionedTopicsToPartitions.get(topicName.toString()); i++) {
+                topics.add(TopicName.get(topicName + "-partition-" + i).toString());
+            }
+        });
+        return topics;
     }
 
     protected static List<String> getPartitionedTopics(String ns) {
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index 829aac9..2a1a58c 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -262,14 +262,20 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), "wrong-tenant/wrong-ns")
                 .isEmpty());
 
-        SchemaTableName[] expectedTopics1 = {new SchemaTableName(TOPIC_4.getNamespace(), TOPIC_4.getLocalName())};
+        SchemaTableName[] expectedTopics1 = {new SchemaTableName(
+            TOPIC_4.getNamespace(), TOPIC_4.getLocalName()),
+            new SchemaTableName(PARTITIONED_TOPIC_4.getNamespace(), PARTITIONED_TOPIC_4.getLocalName())
+        };
         assertEquals(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
                 NAMESPACE_NAME_3.toString()), Arrays.asList(expectedTopics1));
 
         SchemaTableName[] expectedTopics2 = {new SchemaTableName(TOPIC_5.getNamespace(), TOPIC_5.getLocalName()),
-                new SchemaTableName(TOPIC_6.getNamespace(), TOPIC_6.getLocalName())};
+                new SchemaTableName(TOPIC_6.getNamespace(), TOPIC_6.getLocalName()),
+            new SchemaTableName(PARTITIONED_TOPIC_5.getNamespace(), PARTITIONED_TOPIC_5.getLocalName()),
+            new SchemaTableName(PARTITIONED_TOPIC_6.getNamespace(), PARTITIONED_TOPIC_6.getLocalName()),
+        };
         assertEquals(new HashSet<>(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
-                NAMESPACE_NAME_4.toString())), new HashSet<>(Arrays.asList(expectedTopics2)));
+            NAMESPACE_NAME_4.toString())), new HashSet<>(Arrays.asList(expectedTopics2)));
     }
 
     @Test(dataProvider = "rewriteNamespaceDelimiter")
@@ -315,7 +321,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
                 = this.pulsarMetadata.listTableColumns(mock(ConnectorSession.class),
                 new SchemaTablePrefix(TOPIC_1.getNamespace()));
 
-        assertEquals(tableColumnsMap.size(), 2);
+        assertEquals(tableColumnsMap.size(), 4);
         List<ColumnMetadata> columnMetadataList
                 = tableColumnsMap.get(new SchemaTableName(TOPIC_1.getNamespace(), TOPIC_1.getLocalName()));
         assertNotNull(columnMetadataList);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index adeaf90..e442522 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -31,6 +31,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import java.util.Collection;
@@ -40,13 +41,16 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
+import static com.facebook.presto.spi.type.IntegerType.INTEGER;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
 @Test(singleThreaded = true)
 public class TestPulsarSplitManager extends TestPulsarConnector {
@@ -288,5 +292,91 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
         }
     }
 
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testPartitionFilter(String delimiter) throws Exception {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
+        for (TopicName topicName : partitionedTopicNames) {
+            setup();
+            log.info("!----- topic: %s -----!", topicName);
+            PulsarTableHandle pulsarTableHandle = mock(PulsarTableHandle.class);
+            when(pulsarTableHandle.getConnectorId()).thenReturn(pulsarConnectorId.toString());
+            when(pulsarTableHandle.getSchemaName()).thenReturn(topicName.getNamespace());
+            when(pulsarTableHandle.getTopicName()).thenReturn(topicName.getLocalName());
+            when(pulsarTableHandle.getTableName()).thenReturn(topicName.getLocalName());
+
+            // test single domain with equal low and high of "__partition__"
+            Map<ColumnHandle, Domain> domainMap = new HashMap<>();
+            Domain domain = Domain.create(ValueSet.ofRanges(Range.range(INTEGER, 0L, true,
+                0L, true)), false);
+            domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain);
+            TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);
+            Collection<PulsarSplit> splits = this.pulsarSplitManager.getSplitsPartitionedTopic(2, topicName, pulsarTableHandle,
+                schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain);
+            if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) {
+                Assert.assertEquals(splits.size(), 2);
+            }
+            for (PulsarSplit split : splits) {
+                assertEquals(TopicName.getPartitionIndex(split.getTableName()), 0);
+            }
+
+            // test multiple domain with equal low and high of "__partition__"
+            domainMap.clear();
+            domain = Domain.create(ValueSet.ofRanges(
+                Range.range(INTEGER, 0L, true, 0L, true),
+                Range.range(INTEGER, 3L, true, 3L, true)),
+                false);
+            domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain);
+            tupleDomain = TupleDomain.withColumnDomains(domainMap);
+            splits = this.pulsarSplitManager.getSplitsPartitionedTopic(1, topicName, pulsarTableHandle,
+                schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain);
+            if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) {
+                Assert.assertEquals(splits.size(), 2);
+            }
+            for (PulsarSplit split : splits) {
+                assertTrue(TopicName.getPartitionIndex(split.getTableName()) == 0 || TopicName.getPartitionIndex(split.getTableName()) == 3);
+            }
+
+            // test single domain with unequal low and high of "__partition__"
+            domainMap.clear();
+            domain = Domain.create(ValueSet.ofRanges(
+                Range.range(INTEGER, 0L, true, 2L, true)),
+                false);
+            domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain);
+            tupleDomain = TupleDomain.withColumnDomains(domainMap);
+            splits = this.pulsarSplitManager.getSplitsPartitionedTopic(2, topicName, pulsarTableHandle,
+                schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain);
+            if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) {
+                Assert.assertEquals(splits.size(), 3);
+            }
+            for (PulsarSplit split : splits) {
+                assertTrue(TopicName.getPartitionIndex(split.getTableName()) == 0
+                    || TopicName.getPartitionIndex(split.getTableName()) == 1
+                    || TopicName.getPartitionIndex(split.getTableName()) == 2);
+            }
+
+            // test multiple domain with unequal low and high of "__partition__"
+            domainMap.clear();
+            domain = Domain.create(ValueSet.ofRanges(
+                Range.range(INTEGER, 0L, true, 1L, true),
+                Range.range(INTEGER, 3L, true, 4L, true)),
+                false);
+            domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain);
+            tupleDomain = TupleDomain.withColumnDomains(domainMap);
+            splits = this.pulsarSplitManager.getSplitsPartitionedTopic(2, topicName, pulsarTableHandle,
+                schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain);
+            if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) {
+                Assert.assertEquals(splits.size(), 4);
+            }
+            for (PulsarSplit split : splits) {
+                assertTrue(TopicName.getPartitionIndex(split.getTableName()) == 0
+                    || TopicName.getPartitionIndex(split.getTableName()) == 1
+                    || TopicName.getPartitionIndex(split.getTableName()) == 3
+                    || TopicName.getPartitionIndex(split.getTableName()) == 4);
+            }
+        }
+
+
+    }
+
 
 }