You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Aloys Zhang <al...@apache.org> on 2022/02/17 15:47:15 UTC

[Discuss][PIP-143] Support split paritions belonging to specified topics in a bundle

Hi Pulsar Community,

This is a PIP discussion on how to support split partitions belonging to
specified topics in a bundle.

The issue can be found: https://github.com/apache/pulsar/issues/13761

I copy the content here for convenience, any suggestions are welcome and
appreciated.


## Motivation

As we all know, a namespace bundle may contain lots of partitions belonging
to different topics.
The throughput of these topics may vary greatly. Some topics may with very
high rate/throughput while other topics have a very low rate/throughput.

These partitions with high rate/throughput can cause broker overload and
bundle unloading.
At this point, if we split bundle manually with `range_equally_divide` or
`topic_count_equally_divide` split algorithm, there may need many times
split before these high rate/through partitions assigned to different new
bundles.

For convenience, we call these high throughput topics `outstanding topic`
and their partitions `outstanding partition` in this PIP.

## Goal

Our goal is to make it easier to split `outstanding partition` into new
bundles.

There are two alternative ways to achieve this. Either of them will add a
new algorithm for bundle split. The difference is how the new bundle split
algorithm is implemented.

One algorithm is to split bundle by `outstanding topic` which will split
the bundle into two new bundles and each new bundle contains an equally
`outstanding partition` once a time.
E.g, a bundle contains lots of topic partitions, and only one `outstanding
topic`(T) with 2  `outstanding partition` (T-partition-n, Tpartition-n+1).
This algorithm split this bundle at the middle point of these two
partition's hashcode.  This algorithm has a disadvantage, it can only deal
with one `outstanding topic`.

So we raised up another algorithm.

The other algorithm is to split the bundle at the hashcode point of the
`outstanding partition` which will split the bundle into three bundles once
a time. The middle one contains the only point the hashcode of the
`outstanding partition, the left one is less than the hashcode, the right
one is more than the hashcode.
E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
(partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
algorithm  is going to split bundle the bundle into five new bundles,
0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
partition-y), 0x08_0x10.

## API Changes

The Admin CLI `bin/pulsar-admin namespaces split-bundle -b ${bundle_range}`
will add a new parameter "--topic" or "-t" for  `outstanding topic` name.

The split interface changed from

```JAVA
void splitNamespaceBundle(String namespace, String bundle, boolean
unloadSplitBundles, String splitAlgorithmName)throws PulsarAdminException;
```

to

```java
void splitNamespaceBundle(String namespace, String bundle, boolean
unloadSplitBundles,
                              String splitAlgorithmName, String topic)
throws PulsarAdminException;
```

## Implementation

There are changes both from the Admin CLI and the broker side.

First, Admin CLI for split bundle should support to specify the
`outstanding topic`,

```java
/**
     * Split namespace bundle.
     *
     * @param namespace
     * @param bundle range of bundle to split
     * @param unloadSplitBundles
     * @param splitAlgorithmName
     * @param topic
     * @throws PulsarAdminException
     */
    void splitNamespaceBundle(String namespace, String bundle, boolean
unloadSplitBundles,
                              String splitAlgorithmName, String topic)
throws PulsarAdminException;

```

```java
/**
     * Split namespace bundle asynchronously.
     *
     * @param namespace
     * @param bundle range of bundle to split
     * @param unloadSplitBundles
     * @param splitAlgorithmName
     */
    CompletableFuture<Void> splitNamespaceBundleAsync(
            String namespace, String bundle, boolean unloadSplitBundles,
String splitAlgorithmName, String topic);
```

And for the broker side, first encapsulates the parameters for bundle split
into a new class `BundleSplitOption`

```java
public class BundleSplitOption {
    private NamespaceService service;
    private NamespaceBundle bundle;
    private String topic;
}
```

add a new split algorithm

```java
ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm implements
NamespaceBundleSplitAlgorithm {
    @Override
    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption
bundleSplitOption) {

        });
    }
}
```

add the new algorithm to `NamespaceBundleSplitAlgorithm`

```JAVA
String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
"specified_topic_count_equally_divide";

List<String> AVAILABLE_ALGORITHMS =
Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
            TOPIC_COUNT_EQUALLY_DIVIDE,
SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);

 NamespaceBundleSplitAlgorithm SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
            new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
```

modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` for
 [[NamespaceService.java](
https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)


```java
public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle,
boolean unload,

 NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {

        final CompletableFuture<Void> unloadFuture = new
CompletableFuture<>();
        final AtomicInteger counter = new
AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
        splitAndOwnBundleOnceAndRetry(bundle, unload, counter,
unloadFuture, splitAlgorithm, topic);

        return unloadFuture;
    }
```

```java
void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
                                       boolean unload,
                                       AtomicInteger counter,
                                       CompletableFuture<Void>
completionFuture,
                                       NamespaceBundleSplitAlgorithm
splitAlgorithm,
                                       String topic) {
```

Also, we change the REST api and broker.conf

```java
public void splitNamespaceBundle(
            @Suspended final AsyncResponse asyncResponse,
            @PathParam("property") String property,
            @PathParam("cluster") String cluster,
            @PathParam("namespace") String namespace,
            @PathParam("bundle") String bundleRange,
            @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
            @QueryParam("unload") @DefaultValue("false") boolean unload,
            @QueryParam("topic") @DefaultValue("") String topic) {}
```

```shell
supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
```

Re: [Discuss][PIP-143] Support split paritions belonging to specified topics in a bundle

Posted by Aloys Zhang <al...@apache.org>.
Hi all,
Since there are no more suggestions for more than 72 hours, I have updated
the PIP  https://github.com/apache/pulsar/issues/13761 according to the
discussion.
And then will send a vote thread soon.

Aloys Zhang <al...@apache.org> 于2022年2月22日周二 10:38写道:

> >Should this be "positions"? We are going to split one bundle into
> multi-bundles,
> in most cases, bundle number will be position number + 1, right?
>
> Sure, it should be “positions” or “positionList”
>
> PengHui Li <pe...@apache.org> 于2022年2月21日周一 20:50写道:
>
>> > And the (anti-)affinity way needs more discussion or maybe we can
>> introduce
>> a new PIP for it.
>>
>> +1
>>
>> Thanks,
>> Penghui
>>
>> On Mon, Feb 21, 2022 at 6:47 PM Haiting Jiang <ji...@apache.org>
>> wrote:
>>
>> > > 2. calculate the position to split this bundle(also need a new API)
>> > Should this be "positions"? We are going to split one bundle into
>> > multi-bundles,
>> > in most cases, bundle number will be position number + 1, right?
>> >
>> > > And the (anti-)affinity way needs more discussion or maybe we can
>> > introduce
>> > > a new PIP for it.
>> > +1, this is not in the scope of this PIP.
>> >
>> >
>> > Thanks,
>> > Haiting
>> >
>> > On 2022/02/21 08:57:01 Aloys Zhang wrote:
>> > > Hi penghui and haiting,
>> > >
>> > > I try to figure out how the (anti-)affinity works.
>> > >
>> > > > if I understand correctly, it looks like if we have a partitioned
>> topic
>> > > with 10
>> > > > partitions under a namespace with 16 bundles, if applies the
>> > > anti-affinity policy,
>> > > > partition-0 map to bundle 0, partition-1 map to bundle 1, and so on.
>> > > > Of course, it is not necessary for every partitioned topic to start
>> > from
>> > > bundle 0,
>> > > > we can use the partition-0 hash to determine the start bundle index.
>> > >
>> > > As penghui described, I think this is a mechanism for assigning topics
>> > to a
>> > > bundle that controls how a topic is mapped to a bundle.
>> > >
>> > > > IMO, this affinity serves the purpose of isolating an abnormal
>> topic to
>> > > some spare
>> > > > brokers.  These brokers host these kind of topics only. Here are
>> some
>> > > cases :
>> > >
>> > > And as haiting mentioned here, It's more like an isolation policy that
>> > > decides topics can be owned by which broker.
>> > > So, I am a little confused about how the (anti-)affinity works now.
>> > >
>> > > Back to this PIP which aims to solve the problem that a small number
>> of
>> > > topics in a bundle have a load that exceeds the average.
>> > > So, we can
>> > > 1. get the positions for the topics with we are interested( need a new
>> > API)
>> > > 2. calculate the position to split this bundle(also need a new API)
>> > > I think this way is enough for solving the problem.
>> > >
>> > > And the (anti-)affinity way needs more discussion or maybe we can
>> > introduce
>> > > a new PIP for it.
>> > > What do you think?@penghui @haiting
>> > >
>> > >
>> > > Thanks,
>> > > Aloys
>> > >
>> > > Aloys Zhang <al...@apache.org> 于2022年2月21日周一 14:39写道:
>> > >
>> > > > Hi, penghui
>> > > >
>> > > > >  The new API does not necessarily have to query by topic one by
>> one,
>> > > > we have listed all the "topic -> position" of a bundle?
>> > > >
>> > > > I see. After we got all the positions of the topics we want to split
>> > in a
>> > > > bundle, it's quite easy for us to decide how to it.
>> > > >
>> > > > Haiting Jiang <ji...@apache.org> 于2022年2月20日周日 12:05写道:
>> > > >
>> > > >> > Do you have an example for affinity? I don't fully understand how
>> > this
>> > > >> is
>> > > >> > used
>> > > >> > in practice.
>> > > >>
>> > > >> IMO, this affinity serves the purpose of isolating an abnormal
>> topic
>> > to
>> > > >> some spare
>> > > >> brokers.  These brokers host these kind of topics only. Here are
>> some
>> > > >> cases :
>> > > >>
>> > > >> 1. A topic may have unexpected short spike traffic flows
>> periodically
>> > and
>> > > >> causing broker overloads and negative impact on other topics.
>> > > >> Until we have more proper solutions, we can always isolate these
>> > topics
>> > > >> first,
>> > > >>  and make the service recover time as small as possible.
>> > > >>
>> > > >> 2. Some users may encounter some bugs in brokers, and we can
>> isolate
>> > the
>> > > >> topic to
>> > > >> exclusive brokers, and use more radical approach to locate the bug,
>> > like
>> > > >> enable debug
>> > > >> level logs or even add some temporary code patch.
>> > > >>
>> > > >> 3. User may already have configured failure domain and
>> anti-affinity
>> > > >> namespace, but with
>> > > >> business logic code changes, some topic may need to migrate from
>> one
>> > > >> namespace
>> > > >> to another. This will take some time for user to change the client
>> > side
>> > > >> config.
>> > > >> In the meanwhile, we can isolate the topic first.
>> > > >>
>> > > >> Thanks,
>> > > >> Haiting
>> > > >>
>> > > >> On 2022/02/18 15:26:09 PengHui Li wrote:
>> > > >> > Hi Haiting,
>> > > >> >
>> > > >> > > I think this approach have more potential with abnormal topic
>> > > >> isolation.
>> > > >> > If we can introduce
>> > > >> > some kind of bundle isolation strategy, (like broker-bundle
>> > affinity and
>> > > >> > anti-affinity mechanism), we can easily isolate some unexpected
>> > traffic
>> > > >> to
>> > > >> > some empty brokers.
>> > > >> > IMO, this would improve the stability of broker cluster.
>> > > >> >
>> > > >> > if I understand correctly, it looks like if we have a partitioned
>> > topic
>> > > >> > with 10
>> > > >> > partitions under a namespace with 16 bundles, if applies the
>> > > >> anti-affinity
>> > > >> > policy,
>> > > >> > partition-0 map to bundle 0, partition-1 map to bundle 1, and so
>> on.
>> > > >> > Of course, it is not necessary for every partitioned topic to
>> start
>> > from
>> > > >> > bundle 0,
>> > > >> > we can use the partition-0 hash to determine the start bundle
>> index.
>> > > >> >
>> > > >> > Do you have an example for affinity? I don't fully understand how
>> > this
>> > > >> is
>> > > >> > used
>> > > >> > in practice.
>> > > >> >
>> > > >> > Best,
>> > > >> > Penghui
>> > > >> >
>> > > >> > On Fri, Feb 18, 2022 at 11:16 PM PengHui Li <pe...@apache.org>
>> > wrote:
>> > > >> >
>> > > >> > > Hi Aloys,
>> > > >> > >
>> > > >> > > >  Do you mean that
>> > > >> > > 1. First, add a new API, maybe `getHashPositioin`,  to get the
>> > hash
>> > > >> > > position in a bundle
>> > > >> > > 2. Then use this position to split the overloaded bundle
>> > > >> > > If so, when we split a bundle with multi partitions of a
>> topic, we
>> > > >> need to
>> > > >> > > call the `getHashPositioin` multi times to get the middle
>> > position of
>> > > >> all
>> > > >> > > these positions.
>> > > >> > >
>> > > >> > > Yes, this want I mean. In this way, users can control to
>> assign 1
>> > > >> topic or
>> > > >> > > 3 topics to one bundle. This is more like increasing the
>> > transparency
>> > > >> of
>> > > >> > > the topic in the bundle, you can all the positions of the
>> topics,
>> > so
>> > > >> how
>> > > >> > > planning for bundle splitting becomes more flexible.
>> > > >> > >
>> > > >> > > The new API does not necessarily have to query by topic one by
>> > one,
>> > > >> > > we have listed all the "topic -> position" of a bundle?
>> > > >> > >
>> > > >> > > Thanks,
>> > > >> > > Penghui
>> > > >> > >
>> > > >> > > On Fri, Feb 18, 2022 at 4:51 PM Haiting Jiang <
>> > > >> jianghaiting@apache.org>
>> > > >> > > wrote:
>> > > >> > >
>> > > >> > >> Hi Aloys,
>> > > >> > >> +1 for this great PIP.
>> > > >> > >>
>> > > >> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
>> > > >> > >> ${bundle_range}`
>> > > >> > >> > will add a new parameter "--topic" or "-t" for  `outstanding
>> > topic`
>> > > >> > >> name.
>> > > >> > >>
>> > > >> > >> Do we have limitation on this "topic" parameter. Can this be a
>> > > >> > >> partitioned topic?
>> > > >> > >> If so, will this new algorithm split the bundle into more
>> than 2
>> > > >> bundles?
>> > > >> > >> like each bundle for
>> > > >> > >> one partition.
>> > > >> > >>
>> > > >> > >> > This algorithm has a disadvantage, it can only deal
>> > > >> > >> > with one `outstanding topic`.
>> > > >> > >>
>> > > >> > >> For this disadvantage, I think it can be solved by extends the
>> > > >> "topic"
>> > > >> > >> parameter from one topic to a topic list.
>> > > >> > >>
>> > > >> > >> > The other algorithm is to split the bundle at the hashcode
>> > point
>> > > >> of the
>> > > >> > >> > `outstanding partition` which will split the bundle into
>> three
>> > > >> bundles
>> > > >> > >> once
>> > > >> > >> > a time. The middle one contains the only point the hashcode
>> of
>> > the
>> > > >> > >> > `outstanding partition, the left one is less than the
>> > hashcode, the
>> > > >> > >> right
>> > > >> > >> > one is more than the hashcode.
>> > > >> > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding
>> > > >> partition`
>> > > >> > >> > (partition-x and partition-y) whose hashcode is 0x03 and
>> 0x07,
>> > this
>> > > >> > >> > algorithm  is going to split bundle the bundle into five new
>> > > >> bundles,
>> > > >> > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08(
>> > for
>> > > >> > >> > partition-y), 0x08_0x10.
>> > > >> > >>
>> > > >> > >> I think this approach have more potential with abnormal topic
>> > > >> isolation.
>> > > >> > >> If we can introduce
>> > > >> > >> some kind of bundle isolation strategy, (like broker-bundle
>> > affinity
>> > > >> and
>> > > >> > >> anti-affinity mechanism), we can easily isolate some
>> unexpected
>> > > >> traffic to
>> > > >> > >> some empty brokers.
>> > > >> > >> IMO, this would improve the stability of broker cluster.
>> > > >> > >>
>> > > >> > >> Thanks,
>> > > >> > >> Haiting
>> > > >> > >>
>> > > >> > >> On 2022/02/17 15:47:15 Aloys Zhang wrote:
>> > > >> > >> > Hi Pulsar Community,
>> > > >> > >> >
>> > > >> > >> > This is a PIP discussion on how to support split partitions
>> > > >> belonging to
>> > > >> > >> > specified topics in a bundle.
>> > > >> > >> >
>> > > >> > >> > The issue can be found:
>> > > >> https://github.com/apache/pulsar/issues/13761
>> > > >> > >> >
>> > > >> > >> > I copy the content here for convenience, any suggestions are
>> > > >> welcome and
>> > > >> > >> > appreciated.
>> > > >> > >> >
>> > > >> > >> >
>> > > >> > >> > ## Motivation
>> > > >> > >> >
>> > > >> > >> > As we all know, a namespace bundle may contain lots of
>> > partitions
>> > > >> > >> belonging
>> > > >> > >> > to different topics.
>> > > >> > >> > The throughput of these topics may vary greatly. Some topics
>> > may
>> > > >> with
>> > > >> > >> very
>> > > >> > >> > high rate/throughput while other topics have a very low
>> > > >> rate/throughput.
>> > > >> > >> >
>> > > >> > >> > These partitions with high rate/throughput can cause broker
>> > > >> overload and
>> > > >> > >> > bundle unloading.
>> > > >> > >> > At this point, if we split bundle manually with
>> > > >> `range_equally_divide`
>> > > >> > >> or
>> > > >> > >> > `topic_count_equally_divide` split algorithm, there may need
>> > many
>> > > >> times
>> > > >> > >> > split before these high rate/through partitions assigned to
>> > > >> different
>> > > >> > >> new
>> > > >> > >> > bundles.
>> > > >> > >> >
>> > > >> > >> > For convenience, we call these high throughput topics
>> > `outstanding
>> > > >> > >> topic`
>> > > >> > >> > and their partitions `outstanding partition` in this PIP.
>> > > >> > >> >
>> > > >> > >> > ## Goal
>> > > >> > >> >
>> > > >> > >> > Our goal is to make it easier to split `outstanding
>> partition`
>> > > >> into new
>> > > >> > >> > bundles.
>> > > >> > >> >
>> > > >> > >> > There are two alternative ways to achieve this. Either of
>> them
>> > > >> will add
>> > > >> > >> a
>> > > >> > >> > new algorithm for bundle split. The difference is how the
>> new
>> > > >> bundle
>> > > >> > >> split
>> > > >> > >> > algorithm is implemented.
>> > > >> > >> >
>> > > >> > >> > One algorithm is to split bundle by `outstanding topic`
>> which
>> > will
>> > > >> split
>> > > >> > >> > the bundle into two new bundles and each new bundle
>> contains an
>> > > >> equally
>> > > >> > >> > `outstanding partition` once a time.
>> > > >> > >> > E.g, a bundle contains lots of topic partitions, and only
>> one
>> > > >> > >> `outstanding
>> > > >> > >> > topic`(T) with 2  `outstanding partition` (T-partition-n,
>> > > >> > >> Tpartition-n+1).
>> > > >> > >> > This algorithm split this bundle at the middle point of
>> these
>> > two
>> > > >> > >> > partition's hashcode.  This algorithm has a disadvantage, it
>> > can
>> > > >> only
>> > > >> > >> deal
>> > > >> > >> > with one `outstanding topic`.
>> > > >> > >> >
>> > > >> > >> > So we raised up another algorithm.
>> > > >> > >> >
>> > > >> > >> > The other algorithm is to split the bundle at the hashcode
>> > point
>> > > >> of the
>> > > >> > >> > `outstanding partition` which will split the bundle into
>> three
>> > > >> bundles
>> > > >> > >> once
>> > > >> > >> > a time. The middle one contains the only point the hashcode
>> of
>> > the
>> > > >> > >> > `outstanding partition, the left one is less than the
>> > hashcode, the
>> > > >> > >> right
>> > > >> > >> > one is more than the hashcode.
>> > > >> > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding
>> > > >> partition`
>> > > >> > >> > (partition-x and partition-y) whose hashcode is 0x03 and
>> 0x07,
>> > this
>> > > >> > >> > algorithm  is going to split bundle the bundle into five new
>> > > >> bundles,
>> > > >> > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08(
>> > for
>> > > >> > >> > partition-y), 0x08_0x10.
>> > > >> > >> >
>> > > >> > >> > ## API Changes
>> > > >> > >> >
>> > > >> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
>> > > >> > >> ${bundle_range}`
>> > > >> > >> > will add a new parameter "--topic" or "-t" for  `outstanding
>> > topic`
>> > > >> > >> name.
>> > > >> > >> >
>> > > >> > >> > The split interface changed from
>> > > >> > >> >
>> > > >> > >> > ```JAVA
>> > > >> > >> > void splitNamespaceBundle(String namespace, String bundle,
>> > boolean
>> > > >> > >> > unloadSplitBundles, String splitAlgorithmName)throws
>> > > >> > >> PulsarAdminException;
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > to
>> > > >> > >> >
>> > > >> > >> > ```java
>> > > >> > >> > void splitNamespaceBundle(String namespace, String bundle,
>> > boolean
>> > > >> > >> > unloadSplitBundles,
>> > > >> > >> >                               String splitAlgorithmName,
>> String
>> > > >> topic)
>> > > >> > >> > throws PulsarAdminException;
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > ## Implementation
>> > > >> > >> >
>> > > >> > >> > There are changes both from the Admin CLI and the broker
>> side.
>> > > >> > >> >
>> > > >> > >> > First, Admin CLI for split bundle should support to specify
>> the
>> > > >> > >> > `outstanding topic`,
>> > > >> > >> >
>> > > >> > >> > ```java
>> > > >> > >> > /**
>> > > >> > >> >      * Split namespace bundle.
>> > > >> > >> >      *
>> > > >> > >> >      * @param namespace
>> > > >> > >> >      * @param bundle range of bundle to split
>> > > >> > >> >      * @param unloadSplitBundles
>> > > >> > >> >      * @param splitAlgorithmName
>> > > >> > >> >      * @param topic
>> > > >> > >> >      * @throws PulsarAdminException
>> > > >> > >> >      */
>> > > >> > >> >     void splitNamespaceBundle(String namespace, String
>> bundle,
>> > > >> boolean
>> > > >> > >> > unloadSplitBundles,
>> > > >> > >> >                               String splitAlgorithmName,
>> String
>> > > >> topic)
>> > > >> > >> > throws PulsarAdminException;
>> > > >> > >> >
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > ```java
>> > > >> > >> > /**
>> > > >> > >> >      * Split namespace bundle asynchronously.
>> > > >> > >> >      *
>> > > >> > >> >      * @param namespace
>> > > >> > >> >      * @param bundle range of bundle to split
>> > > >> > >> >      * @param unloadSplitBundles
>> > > >> > >> >      * @param splitAlgorithmName
>> > > >> > >> >      */
>> > > >> > >> >     CompletableFuture<Void> splitNamespaceBundleAsync(
>> > > >> > >> >             String namespace, String bundle, boolean
>> > > >> unloadSplitBundles,
>> > > >> > >> > String splitAlgorithmName, String topic);
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > And for the broker side, first encapsulates the parameters
>> for
>> > > >> bundle
>> > > >> > >> split
>> > > >> > >> > into a new class `BundleSplitOption`
>> > > >> > >> >
>> > > >> > >> > ```java
>> > > >> > >> > public class BundleSplitOption {
>> > > >> > >> >     private NamespaceService service;
>> > > >> > >> >     private NamespaceBundle bundle;
>> > > >> > >> >     private String topic;
>> > > >> > >> > }
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > add a new split algorithm
>> > > >> > >> >
>> > > >> > >> > ```java
>> > > >> > >> > ublic class
>> > SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm
>> > > >> > >> implements
>> > > >> > >> > NamespaceBundleSplitAlgorithm {
>> > > >> > >> >     @Override
>> > > >> > >> >     public CompletableFuture<List<Long>>
>> > > >> > >> getSplitBoundary(BundleSplitOption
>> > > >> > >> > bundleSplitOption) {
>> > > >> > >> >
>> > > >> > >> >         });
>> > > >> > >> >     }
>> > > >> > >> > }
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > add the new algorithm to `NamespaceBundleSplitAlgorithm`
>> > > >> > >> >
>> > > >> > >> > ```JAVA
>> > > >> > >> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
>> > > >> > >> > "specified_topic_count_equally_divide";
>> > > >> > >> >
>> > > >> > >> > List<String> AVAILABLE_ALGORITHMS =
>> > > >> > >> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
>> > > >> > >> >             TOPIC_COUNT_EQUALLY_DIVIDE,
>> > > >> > >> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
>> > > >> > >> >
>> > > >> > >> >  NamespaceBundleSplitAlgorithm
>> > > >> > >> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
>> > > >> > >> >             new
>> > > >> SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > modify the `splitAndOwnBundle` and
>> > `splitAndOwnBundleOnceAndRetry`
>> > > >> for
>> > > >> > >> >  [[NamespaceService.java](
>> > > >> > >> >
>> > > >> > >>
>> > > >>
>> >
>> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1
>> > > >> > >> )
>> > > >> > >> >
>> > > >> > >> >
>> > > >> > >> > ```java
>> > > >> > >> > public CompletableFuture<Void>
>> > splitAndOwnBundle(NamespaceBundle
>> > > >> bundle,
>> > > >> > >> > boolean unload,
>> > > >> > >> >
>> > > >> > >> >  NamespaceBundleSplitAlgorithm splitAlgorithm, String
>> topic) {
>> > > >> > >> >
>> > > >> > >> >         final CompletableFuture<Void> unloadFuture = new
>> > > >> > >> > CompletableFuture<>();
>> > > >> > >> >         final AtomicInteger counter = new
>> > > >> > >> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
>> > > >> > >> >         splitAndOwnBundleOnceAndRetry(bundle, unload,
>> counter,
>> > > >> > >> > unloadFuture, splitAlgorithm, topic);
>> > > >> > >> >
>> > > >> > >> >         return unloadFuture;
>> > > >> > >> >     }
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > ```java
>> > > >> > >> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
>> > > >> > >> >                                        boolean unload,
>> > > >> > >> >                                        AtomicInteger
>> counter,
>> > > >> > >> >
>> CompletableFuture<Void>
>> > > >> > >> > completionFuture,
>> > > >> > >> >
>> > > >> NamespaceBundleSplitAlgorithm
>> > > >> > >> > splitAlgorithm,
>> > > >> > >> >                                        String topic) {
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > Also, we change the REST api and broker.conf
>> > > >> > >> >
>> > > >> > >> > ```java
>> > > >> > >> > public void splitNamespaceBundle(
>> > > >> > >> >             @Suspended final AsyncResponse asyncResponse,
>> > > >> > >> >             @PathParam("property") String property,
>> > > >> > >> >             @PathParam("cluster") String cluster,
>> > > >> > >> >             @PathParam("namespace") String namespace,
>> > > >> > >> >             @PathParam("bundle") String bundleRange,
>> > > >> > >> >             @QueryParam("authoritative")
>> @DefaultValue("false")
>> > > >> boolean
>> > > >> > >> > authoritative,
>> > > >> > >> >             @QueryParam("unload") @DefaultValue("false")
>> > boolean
>> > > >> unload,
>> > > >> > >> >             @QueryParam("topic") @DefaultValue("") String
>> > topic) {}
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > ```shell
>> > > >> > >> >
>> > > >> > >>
>> > > >>
>> >
>> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >>
>> > > >> > >
>> > > >> >
>> > > >>
>> > > >
>> > >
>> >
>>
>

Re: [Discuss][PIP-143] Support split paritions belonging to specified topics in a bundle

Posted by Aloys Zhang <al...@apache.org>.
>Should this be "positions"? We are going to split one bundle into
multi-bundles,
in most cases, bundle number will be position number + 1, right?

Sure, it should be “positions” or “positionList”

PengHui Li <pe...@apache.org> 于2022年2月21日周一 20:50写道:

> > And the (anti-)affinity way needs more discussion or maybe we can
> introduce
> a new PIP for it.
>
> +1
>
> Thanks,
> Penghui
>
> On Mon, Feb 21, 2022 at 6:47 PM Haiting Jiang <ji...@apache.org>
> wrote:
>
> > > 2. calculate the position to split this bundle(also need a new API)
> > Should this be "positions"? We are going to split one bundle into
> > multi-bundles,
> > in most cases, bundle number will be position number + 1, right?
> >
> > > And the (anti-)affinity way needs more discussion or maybe we can
> > introduce
> > > a new PIP for it.
> > +1, this is not in the scope of this PIP.
> >
> >
> > Thanks,
> > Haiting
> >
> > On 2022/02/21 08:57:01 Aloys Zhang wrote:
> > > Hi penghui and haiting,
> > >
> > > I try to figure out how the (anti-)affinity works.
> > >
> > > > if I understand correctly, it looks like if we have a partitioned
> topic
> > > with 10
> > > > partitions under a namespace with 16 bundles, if applies the
> > > anti-affinity policy,
> > > > partition-0 map to bundle 0, partition-1 map to bundle 1, and so on.
> > > > Of course, it is not necessary for every partitioned topic to start
> > from
> > > bundle 0,
> > > > we can use the partition-0 hash to determine the start bundle index.
> > >
> > > As penghui described, I think this is a mechanism for assigning topics
> > to a
> > > bundle that controls how a topic is mapped to a bundle.
> > >
> > > > IMO, this affinity serves the purpose of isolating an abnormal topic
> to
> > > some spare
> > > > brokers.  These brokers host these kind of topics only. Here are some
> > > cases :
> > >
> > > And as haiting mentioned here, It's more like an isolation policy that
> > > decides topics can be owned by which broker.
> > > So, I am a little confused about how the (anti-)affinity works now.
> > >
> > > Back to this PIP which aims to solve the problem that a small number of
> > > topics in a bundle have a load that exceeds the average.
> > > So, we can
> > > 1. get the positions for the topics with we are interested( need a new
> > API)
> > > 2. calculate the position to split this bundle(also need a new API)
> > > I think this way is enough for solving the problem.
> > >
> > > And the (anti-)affinity way needs more discussion or maybe we can
> > introduce
> > > a new PIP for it.
> > > What do you think?@penghui @haiting
> > >
> > >
> > > Thanks,
> > > Aloys
> > >
> > > Aloys Zhang <al...@apache.org> 于2022年2月21日周一 14:39写道:
> > >
> > > > Hi, penghui
> > > >
> > > > >  The new API does not necessarily have to query by topic one by
> one,
> > > > we have listed all the "topic -> position" of a bundle?
> > > >
> > > > I see. After we got all the positions of the topics we want to split
> > in a
> > > > bundle, it's quite easy for us to decide how to it.
> > > >
> > > > Haiting Jiang <ji...@apache.org> 于2022年2月20日周日 12:05写道:
> > > >
> > > >> > Do you have an example for affinity? I don't fully understand how
> > this
> > > >> is
> > > >> > used
> > > >> > in practice.
> > > >>
> > > >> IMO, this affinity serves the purpose of isolating an abnormal topic
> > to
> > > >> some spare
> > > >> brokers.  These brokers host these kind of topics only. Here are
> some
> > > >> cases :
> > > >>
> > > >> 1. A topic may have unexpected short spike traffic flows
> periodically
> > and
> > > >> causing broker overloads and negative impact on other topics.
> > > >> Until we have more proper solutions, we can always isolate these
> > topics
> > > >> first,
> > > >>  and make the service recover time as small as possible.
> > > >>
> > > >> 2. Some users may encounter some bugs in brokers, and we can isolate
> > the
> > > >> topic to
> > > >> exclusive brokers, and use more radical approach to locate the bug,
> > like
> > > >> enable debug
> > > >> level logs or even add some temporary code patch.
> > > >>
> > > >> 3. User may already have configured failure domain and anti-affinity
> > > >> namespace, but with
> > > >> business logic code changes, some topic may need to migrate from one
> > > >> namespace
> > > >> to another. This will take some time for user to change the client
> > side
> > > >> config.
> > > >> In the meanwhile, we can isolate the topic first.
> > > >>
> > > >> Thanks,
> > > >> Haiting
> > > >>
> > > >> On 2022/02/18 15:26:09 PengHui Li wrote:
> > > >> > Hi Haiting,
> > > >> >
> > > >> > > I think this approach have more potential with abnormal topic
> > > >> isolation.
> > > >> > If we can introduce
> > > >> > some kind of bundle isolation strategy, (like broker-bundle
> > affinity and
> > > >> > anti-affinity mechanism), we can easily isolate some unexpected
> > traffic
> > > >> to
> > > >> > some empty brokers.
> > > >> > IMO, this would improve the stability of broker cluster.
> > > >> >
> > > >> > if I understand correctly, it looks like if we have a partitioned
> > topic
> > > >> > with 10
> > > >> > partitions under a namespace with 16 bundles, if applies the
> > > >> anti-affinity
> > > >> > policy,
> > > >> > partition-0 map to bundle 0, partition-1 map to bundle 1, and so
> on.
> > > >> > Of course, it is not necessary for every partitioned topic to
> start
> > from
> > > >> > bundle 0,
> > > >> > we can use the partition-0 hash to determine the start bundle
> index.
> > > >> >
> > > >> > Do you have an example for affinity? I don't fully understand how
> > this
> > > >> is
> > > >> > used
> > > >> > in practice.
> > > >> >
> > > >> > Best,
> > > >> > Penghui
> > > >> >
> > > >> > On Fri, Feb 18, 2022 at 11:16 PM PengHui Li <pe...@apache.org>
> > wrote:
> > > >> >
> > > >> > > Hi Aloys,
> > > >> > >
> > > >> > > >  Do you mean that
> > > >> > > 1. First, add a new API, maybe `getHashPositioin`,  to get the
> > hash
> > > >> > > position in a bundle
> > > >> > > 2. Then use this position to split the overloaded bundle
> > > >> > > If so, when we split a bundle with multi partitions of a topic,
> we
> > > >> need to
> > > >> > > call the `getHashPositioin` multi times to get the middle
> > position of
> > > >> all
> > > >> > > these positions.
> > > >> > >
> > > >> > > Yes, this want I mean. In this way, users can control to assign
> 1
> > > >> topic or
> > > >> > > 3 topics to one bundle. This is more like increasing the
> > transparency
> > > >> of
> > > >> > > the topic in the bundle, you can all the positions of the
> topics,
> > so
> > > >> how
> > > >> > > planning for bundle splitting becomes more flexible.
> > > >> > >
> > > >> > > The new API does not necessarily have to query by topic one by
> > one,
> > > >> > > we have listed all the "topic -> position" of a bundle?
> > > >> > >
> > > >> > > Thanks,
> > > >> > > Penghui
> > > >> > >
> > > >> > > On Fri, Feb 18, 2022 at 4:51 PM Haiting Jiang <
> > > >> jianghaiting@apache.org>
> > > >> > > wrote:
> > > >> > >
> > > >> > >> Hi Aloys,
> > > >> > >> +1 for this great PIP.
> > > >> > >>
> > > >> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
> > > >> > >> ${bundle_range}`
> > > >> > >> > will add a new parameter "--topic" or "-t" for  `outstanding
> > topic`
> > > >> > >> name.
> > > >> > >>
> > > >> > >> Do we have limitation on this "topic" parameter. Can this be a
> > > >> > >> partitioned topic?
> > > >> > >> If so, will this new algorithm split the bundle into more than
> 2
> > > >> bundles?
> > > >> > >> like each bundle for
> > > >> > >> one partition.
> > > >> > >>
> > > >> > >> > This algorithm has a disadvantage, it can only deal
> > > >> > >> > with one `outstanding topic`.
> > > >> > >>
> > > >> > >> For this disadvantage, I think it can be solved by extends the
> > > >> "topic"
> > > >> > >> parameter from one topic to a topic list.
> > > >> > >>
> > > >> > >> > The other algorithm is to split the bundle at the hashcode
> > point
> > > >> of the
> > > >> > >> > `outstanding partition` which will split the bundle into
> three
> > > >> bundles
> > > >> > >> once
> > > >> > >> > a time. The middle one contains the only point the hashcode
> of
> > the
> > > >> > >> > `outstanding partition, the left one is less than the
> > hashcode, the
> > > >> > >> right
> > > >> > >> > one is more than the hashcode.
> > > >> > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding
> > > >> partition`
> > > >> > >> > (partition-x and partition-y) whose hashcode is 0x03 and
> 0x07,
> > this
> > > >> > >> > algorithm  is going to split bundle the bundle into five new
> > > >> bundles,
> > > >> > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08(
> > for
> > > >> > >> > partition-y), 0x08_0x10.
> > > >> > >>
> > > >> > >> I think this approach have more potential with abnormal topic
> > > >> isolation.
> > > >> > >> If we can introduce
> > > >> > >> some kind of bundle isolation strategy, (like broker-bundle
> > affinity
> > > >> and
> > > >> > >> anti-affinity mechanism), we can easily isolate some unexpected
> > > >> traffic to
> > > >> > >> some empty brokers.
> > > >> > >> IMO, this would improve the stability of broker cluster.
> > > >> > >>
> > > >> > >> Thanks,
> > > >> > >> Haiting
> > > >> > >>
> > > >> > >> On 2022/02/17 15:47:15 Aloys Zhang wrote:
> > > >> > >> > Hi Pulsar Community,
> > > >> > >> >
> > > >> > >> > This is a PIP discussion on how to support split partitions
> > > >> belonging to
> > > >> > >> > specified topics in a bundle.
> > > >> > >> >
> > > >> > >> > The issue can be found:
> > > >> https://github.com/apache/pulsar/issues/13761
> > > >> > >> >
> > > >> > >> > I copy the content here for convenience, any suggestions are
> > > >> welcome and
> > > >> > >> > appreciated.
> > > >> > >> >
> > > >> > >> >
> > > >> > >> > ## Motivation
> > > >> > >> >
> > > >> > >> > As we all know, a namespace bundle may contain lots of
> > partitions
> > > >> > >> belonging
> > > >> > >> > to different topics.
> > > >> > >> > The throughput of these topics may vary greatly. Some topics
> > may
> > > >> with
> > > >> > >> very
> > > >> > >> > high rate/throughput while other topics have a very low
> > > >> rate/throughput.
> > > >> > >> >
> > > >> > >> > These partitions with high rate/throughput can cause broker
> > > >> overload and
> > > >> > >> > bundle unloading.
> > > >> > >> > At this point, if we split bundle manually with
> > > >> `range_equally_divide`
> > > >> > >> or
> > > >> > >> > `topic_count_equally_divide` split algorithm, there may need
> > many
> > > >> times
> > > >> > >> > split before these high rate/through partitions assigned to
> > > >> different
> > > >> > >> new
> > > >> > >> > bundles.
> > > >> > >> >
> > > >> > >> > For convenience, we call these high throughput topics
> > `outstanding
> > > >> > >> topic`
> > > >> > >> > and their partitions `outstanding partition` in this PIP.
> > > >> > >> >
> > > >> > >> > ## Goal
> > > >> > >> >
> > > >> > >> > Our goal is to make it easier to split `outstanding
> partition`
> > > >> into new
> > > >> > >> > bundles.
> > > >> > >> >
> > > >> > >> > There are two alternative ways to achieve this. Either of
> them
> > > >> will add
> > > >> > >> a
> > > >> > >> > new algorithm for bundle split. The difference is how the new
> > > >> bundle
> > > >> > >> split
> > > >> > >> > algorithm is implemented.
> > > >> > >> >
> > > >> > >> > One algorithm is to split bundle by `outstanding topic` which
> > will
> > > >> split
> > > >> > >> > the bundle into two new bundles and each new bundle contains
> an
> > > >> equally
> > > >> > >> > `outstanding partition` once a time.
> > > >> > >> > E.g, a bundle contains lots of topic partitions, and only one
> > > >> > >> `outstanding
> > > >> > >> > topic`(T) with 2  `outstanding partition` (T-partition-n,
> > > >> > >> Tpartition-n+1).
> > > >> > >> > This algorithm split this bundle at the middle point of these
> > two
> > > >> > >> > partition's hashcode.  This algorithm has a disadvantage, it
> > can
> > > >> only
> > > >> > >> deal
> > > >> > >> > with one `outstanding topic`.
> > > >> > >> >
> > > >> > >> > So we raised up another algorithm.
> > > >> > >> >
> > > >> > >> > The other algorithm is to split the bundle at the hashcode
> > point
> > > >> of the
> > > >> > >> > `outstanding partition` which will split the bundle into
> three
> > > >> bundles
> > > >> > >> once
> > > >> > >> > a time. The middle one contains the only point the hashcode
> of
> > the
> > > >> > >> > `outstanding partition, the left one is less than the
> > hashcode, the
> > > >> > >> right
> > > >> > >> > one is more than the hashcode.
> > > >> > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding
> > > >> partition`
> > > >> > >> > (partition-x and partition-y) whose hashcode is 0x03 and
> 0x07,
> > this
> > > >> > >> > algorithm  is going to split bundle the bundle into five new
> > > >> bundles,
> > > >> > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08(
> > for
> > > >> > >> > partition-y), 0x08_0x10.
> > > >> > >> >
> > > >> > >> > ## API Changes
> > > >> > >> >
> > > >> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
> > > >> > >> ${bundle_range}`
> > > >> > >> > will add a new parameter "--topic" or "-t" for  `outstanding
> > topic`
> > > >> > >> name.
> > > >> > >> >
> > > >> > >> > The split interface changed from
> > > >> > >> >
> > > >> > >> > ```JAVA
> > > >> > >> > void splitNamespaceBundle(String namespace, String bundle,
> > boolean
> > > >> > >> > unloadSplitBundles, String splitAlgorithmName)throws
> > > >> > >> PulsarAdminException;
> > > >> > >> > ```
> > > >> > >> >
> > > >> > >> > to
> > > >> > >> >
> > > >> > >> > ```java
> > > >> > >> > void splitNamespaceBundle(String namespace, String bundle,
> > boolean
> > > >> > >> > unloadSplitBundles,
> > > >> > >> >                               String splitAlgorithmName,
> String
> > > >> topic)
> > > >> > >> > throws PulsarAdminException;
> > > >> > >> > ```
> > > >> > >> >
> > > >> > >> > ## Implementation
> > > >> > >> >
> > > >> > >> > There are changes both from the Admin CLI and the broker
> side.
> > > >> > >> >
> > > >> > >> > First, Admin CLI for split bundle should support to specify
> the
> > > >> > >> > `outstanding topic`,
> > > >> > >> >
> > > >> > >> > ```java
> > > >> > >> > /**
> > > >> > >> >      * Split namespace bundle.
> > > >> > >> >      *
> > > >> > >> >      * @param namespace
> > > >> > >> >      * @param bundle range of bundle to split
> > > >> > >> >      * @param unloadSplitBundles
> > > >> > >> >      * @param splitAlgorithmName
> > > >> > >> >      * @param topic
> > > >> > >> >      * @throws PulsarAdminException
> > > >> > >> >      */
> > > >> > >> >     void splitNamespaceBundle(String namespace, String
> bundle,
> > > >> boolean
> > > >> > >> > unloadSplitBundles,
> > > >> > >> >                               String splitAlgorithmName,
> String
> > > >> topic)
> > > >> > >> > throws PulsarAdminException;
> > > >> > >> >
> > > >> > >> > ```
> > > >> > >> >
> > > >> > >> > ```java
> > > >> > >> > /**
> > > >> > >> >      * Split namespace bundle asynchronously.
> > > >> > >> >      *
> > > >> > >> >      * @param namespace
> > > >> > >> >      * @param bundle range of bundle to split
> > > >> > >> >      * @param unloadSplitBundles
> > > >> > >> >      * @param splitAlgorithmName
> > > >> > >> >      */
> > > >> > >> >     CompletableFuture<Void> splitNamespaceBundleAsync(
> > > >> > >> >             String namespace, String bundle, boolean
> > > >> unloadSplitBundles,
> > > >> > >> > String splitAlgorithmName, String topic);
> > > >> > >> > ```
> > > >> > >> >
> > > >> > >> > And for the broker side, first encapsulates the parameters
> for
> > > >> bundle
> > > >> > >> split
> > > >> > >> > into a new class `BundleSplitOption`
> > > >> > >> >
> > > >> > >> > ```java
> > > >> > >> > public class BundleSplitOption {
> > > >> > >> >     private NamespaceService service;
> > > >> > >> >     private NamespaceBundle bundle;
> > > >> > >> >     private String topic;
> > > >> > >> > }
> > > >> > >> > ```
> > > >> > >> >
> > > >> > >> > add a new split algorithm
> > > >> > >> >
> > > >> > >> > ```java
> > > >> > >> > ublic class
> > SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm
> > > >> > >> implements
> > > >> > >> > NamespaceBundleSplitAlgorithm {
> > > >> > >> >     @Override
> > > >> > >> >     public CompletableFuture<List<Long>>
> > > >> > >> getSplitBoundary(BundleSplitOption
> > > >> > >> > bundleSplitOption) {
> > > >> > >> >
> > > >> > >> >         });
> > > >> > >> >     }
> > > >> > >> > }
> > > >> > >> > ```
> > > >> > >> >
> > > >> > >> > add the new algorithm to `NamespaceBundleSplitAlgorithm`
> > > >> > >> >
> > > >> > >> > ```JAVA
> > > >> > >> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
> > > >> > >> > "specified_topic_count_equally_divide";
> > > >> > >> >
> > > >> > >> > List<String> AVAILABLE_ALGORITHMS =
> > > >> > >> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
> > > >> > >> >             TOPIC_COUNT_EQUALLY_DIVIDE,
> > > >> > >> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
> > > >> > >> >
> > > >> > >> >  NamespaceBundleSplitAlgorithm
> > > >> > >> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
> > > >> > >> >             new
> > > >> SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
> > > >> > >> > ```
> > > >> > >> >
> > > >> > >> > modify the `splitAndOwnBundle` and
> > `splitAndOwnBundleOnceAndRetry`
> > > >> for
> > > >> > >> >  [[NamespaceService.java](
> > > >> > >> >
> > > >> > >>
> > > >>
> >
> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1
> > > >> > >> )
> > > >> > >> >
> > > >> > >> >
> > > >> > >> > ```java
> > > >> > >> > public CompletableFuture<Void>
> > splitAndOwnBundle(NamespaceBundle
> > > >> bundle,
> > > >> > >> > boolean unload,
> > > >> > >> >
> > > >> > >> >  NamespaceBundleSplitAlgorithm splitAlgorithm, String topic)
> {
> > > >> > >> >
> > > >> > >> >         final CompletableFuture<Void> unloadFuture = new
> > > >> > >> > CompletableFuture<>();
> > > >> > >> >         final AtomicInteger counter = new
> > > >> > >> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
> > > >> > >> >         splitAndOwnBundleOnceAndRetry(bundle, unload,
> counter,
> > > >> > >> > unloadFuture, splitAlgorithm, topic);
> > > >> > >> >
> > > >> > >> >         return unloadFuture;
> > > >> > >> >     }
> > > >> > >> > ```
> > > >> > >> >
> > > >> > >> > ```java
> > > >> > >> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
> > > >> > >> >                                        boolean unload,
> > > >> > >> >                                        AtomicInteger counter,
> > > >> > >> >
> CompletableFuture<Void>
> > > >> > >> > completionFuture,
> > > >> > >> >
> > > >> NamespaceBundleSplitAlgorithm
> > > >> > >> > splitAlgorithm,
> > > >> > >> >                                        String topic) {
> > > >> > >> > ```
> > > >> > >> >
> > > >> > >> > Also, we change the REST api and broker.conf
> > > >> > >> >
> > > >> > >> > ```java
> > > >> > >> > public void splitNamespaceBundle(
> > > >> > >> >             @Suspended final AsyncResponse asyncResponse,
> > > >> > >> >             @PathParam("property") String property,
> > > >> > >> >             @PathParam("cluster") String cluster,
> > > >> > >> >             @PathParam("namespace") String namespace,
> > > >> > >> >             @PathParam("bundle") String bundleRange,
> > > >> > >> >             @QueryParam("authoritative")
> @DefaultValue("false")
> > > >> boolean
> > > >> > >> > authoritative,
> > > >> > >> >             @QueryParam("unload") @DefaultValue("false")
> > boolean
> > > >> unload,
> > > >> > >> >             @QueryParam("topic") @DefaultValue("") String
> > topic) {}
> > > >> > >> > ```
> > > >> > >> >
> > > >> > >> > ```shell
> > > >> > >> >
> > > >> > >>
> > > >>
> >
> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
> > > >> > >> > ```
> > > >> > >> >
> > > >> > >>
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Re: [Discuss][PIP-143] Support split paritions belonging to specified topics in a bundle

Posted by PengHui Li <pe...@apache.org>.
> And the (anti-)affinity way needs more discussion or maybe we can
introduce
a new PIP for it.

+1

Thanks,
Penghui

On Mon, Feb 21, 2022 at 6:47 PM Haiting Jiang <ji...@apache.org>
wrote:

> > 2. calculate the position to split this bundle(also need a new API)
> Should this be "positions"? We are going to split one bundle into
> multi-bundles,
> in most cases, bundle number will be position number + 1, right?
>
> > And the (anti-)affinity way needs more discussion or maybe we can
> introduce
> > a new PIP for it.
> +1, this is not in the scope of this PIP.
>
>
> Thanks,
> Haiting
>
> On 2022/02/21 08:57:01 Aloys Zhang wrote:
> > Hi penghui and haiting,
> >
> > I try to figure out how the (anti-)affinity works.
> >
> > > if I understand correctly, it looks like if we have a partitioned topic
> > with 10
> > > partitions under a namespace with 16 bundles, if applies the
> > anti-affinity policy,
> > > partition-0 map to bundle 0, partition-1 map to bundle 1, and so on.
> > > Of course, it is not necessary for every partitioned topic to start
> from
> > bundle 0,
> > > we can use the partition-0 hash to determine the start bundle index.
> >
> > As penghui described, I think this is a mechanism for assigning topics
> to a
> > bundle that controls how a topic is mapped to a bundle.
> >
> > > IMO, this affinity serves the purpose of isolating an abnormal topic to
> > some spare
> > > brokers.  These brokers host these kind of topics only. Here are some
> > cases :
> >
> > And as haiting mentioned here, It's more like an isolation policy that
> > decides topics can be owned by which broker.
> > So, I am a little confused about how the (anti-)affinity works now.
> >
> > Back to this PIP which aims to solve the problem that a small number of
> > topics in a bundle have a load that exceeds the average.
> > So, we can
> > 1. get the positions for the topics with we are interested( need a new
> API)
> > 2. calculate the position to split this bundle(also need a new API)
> > I think this way is enough for solving the problem.
> >
> > And the (anti-)affinity way needs more discussion or maybe we can
> introduce
> > a new PIP for it.
> > What do you think?@penghui @haiting
> >
> >
> > Thanks,
> > Aloys
> >
> > Aloys Zhang <al...@apache.org> 于2022年2月21日周一 14:39写道:
> >
> > > Hi, penghui
> > >
> > > >  The new API does not necessarily have to query by topic one by one,
> > > we have listed all the "topic -> position" of a bundle?
> > >
> > > I see. After we got all the positions of the topics we want to split
> in a
> > > bundle, it's quite easy for us to decide how to it.
> > >
> > > Haiting Jiang <ji...@apache.org> 于2022年2月20日周日 12:05写道:
> > >
> > >> > Do you have an example for affinity? I don't fully understand how
> this
> > >> is
> > >> > used
> > >> > in practice.
> > >>
> > >> IMO, this affinity serves the purpose of isolating an abnormal topic
> to
> > >> some spare
> > >> brokers.  These brokers host these kind of topics only. Here are some
> > >> cases :
> > >>
> > >> 1. A topic may have unexpected short spike traffic flows periodically
> and
> > >> causing broker overloads and negative impact on other topics.
> > >> Until we have more proper solutions, we can always isolate these
> topics
> > >> first,
> > >>  and make the service recover time as small as possible.
> > >>
> > >> 2. Some users may encounter some bugs in brokers, and we can isolate
> the
> > >> topic to
> > >> exclusive brokers, and use more radical approach to locate the bug,
> like
> > >> enable debug
> > >> level logs or even add some temporary code patch.
> > >>
> > >> 3. User may already have configured failure domain and anti-affinity
> > >> namespace, but with
> > >> business logic code changes, some topic may need to migrate from one
> > >> namespace
> > >> to another. This will take some time for user to change the client
> side
> > >> config.
> > >> In the meanwhile, we can isolate the topic first.
> > >>
> > >> Thanks,
> > >> Haiting
> > >>
> > >> On 2022/02/18 15:26:09 PengHui Li wrote:
> > >> > Hi Haiting,
> > >> >
> > >> > > I think this approach have more potential with abnormal topic
> > >> isolation.
> > >> > If we can introduce
> > >> > some kind of bundle isolation strategy, (like broker-bundle
> affinity and
> > >> > anti-affinity mechanism), we can easily isolate some unexpected
> traffic
> > >> to
> > >> > some empty brokers.
> > >> > IMO, this would improve the stability of broker cluster.
> > >> >
> > >> > if I understand correctly, it looks like if we have a partitioned
> topic
> > >> > with 10
> > >> > partitions under a namespace with 16 bundles, if applies the
> > >> anti-affinity
> > >> > policy,
> > >> > partition-0 map to bundle 0, partition-1 map to bundle 1, and so on.
> > >> > Of course, it is not necessary for every partitioned topic to start
> from
> > >> > bundle 0,
> > >> > we can use the partition-0 hash to determine the start bundle index.
> > >> >
> > >> > Do you have an example for affinity? I don't fully understand how
> this
> > >> is
> > >> > used
> > >> > in practice.
> > >> >
> > >> > Best,
> > >> > Penghui
> > >> >
> > >> > On Fri, Feb 18, 2022 at 11:16 PM PengHui Li <pe...@apache.org>
> wrote:
> > >> >
> > >> > > Hi Aloys,
> > >> > >
> > >> > > >  Do you mean that
> > >> > > 1. First, add a new API, maybe `getHashPositioin`,  to get the
> hash
> > >> > > position in a bundle
> > >> > > 2. Then use this position to split the overloaded bundle
> > >> > > If so, when we split a bundle with multi partitions of a topic, we
> > >> need to
> > >> > > call the `getHashPositioin` multi times to get the middle
> position of
> > >> all
> > >> > > these positions.
> > >> > >
> > >> > > Yes, this want I mean. In this way, users can control to assign 1
> > >> topic or
> > >> > > 3 topics to one bundle. This is more like increasing the
> transparency
> > >> of
> > >> > > the topic in the bundle, you can all the positions of the topics,
> so
> > >> how
> > >> > > planning for bundle splitting becomes more flexible.
> > >> > >
> > >> > > The new API does not necessarily have to query by topic one by
> one,
> > >> > > we have listed all the "topic -> position" of a bundle?
> > >> > >
> > >> > > Thanks,
> > >> > > Penghui
> > >> > >
> > >> > > On Fri, Feb 18, 2022 at 4:51 PM Haiting Jiang <
> > >> jianghaiting@apache.org>
> > >> > > wrote:
> > >> > >
> > >> > >> Hi Aloys,
> > >> > >> +1 for this great PIP.
> > >> > >>
> > >> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
> > >> > >> ${bundle_range}`
> > >> > >> > will add a new parameter "--topic" or "-t" for  `outstanding
> topic`
> > >> > >> name.
> > >> > >>
> > >> > >> Do we have limitation on this "topic" parameter. Can this be a
> > >> > >> partitioned topic?
> > >> > >> If so, will this new algorithm split the bundle into more than 2
> > >> bundles?
> > >> > >> like each bundle for
> > >> > >> one partition.
> > >> > >>
> > >> > >> > This algorithm has a disadvantage, it can only deal
> > >> > >> > with one `outstanding topic`.
> > >> > >>
> > >> > >> For this disadvantage, I think it can be solved by extends the
> > >> "topic"
> > >> > >> parameter from one topic to a topic list.
> > >> > >>
> > >> > >> > The other algorithm is to split the bundle at the hashcode
> point
> > >> of the
> > >> > >> > `outstanding partition` which will split the bundle into three
> > >> bundles
> > >> > >> once
> > >> > >> > a time. The middle one contains the only point the hashcode of
> the
> > >> > >> > `outstanding partition, the left one is less than the
> hashcode, the
> > >> > >> right
> > >> > >> > one is more than the hashcode.
> > >> > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding
> > >> partition`
> > >> > >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07,
> this
> > >> > >> > algorithm  is going to split bundle the bundle into five new
> > >> bundles,
> > >> > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08(
> for
> > >> > >> > partition-y), 0x08_0x10.
> > >> > >>
> > >> > >> I think this approach have more potential with abnormal topic
> > >> isolation.
> > >> > >> If we can introduce
> > >> > >> some kind of bundle isolation strategy, (like broker-bundle
> affinity
> > >> and
> > >> > >> anti-affinity mechanism), we can easily isolate some unexpected
> > >> traffic to
> > >> > >> some empty brokers.
> > >> > >> IMO, this would improve the stability of broker cluster.
> > >> > >>
> > >> > >> Thanks,
> > >> > >> Haiting
> > >> > >>
> > >> > >> On 2022/02/17 15:47:15 Aloys Zhang wrote:
> > >> > >> > Hi Pulsar Community,
> > >> > >> >
> > >> > >> > This is a PIP discussion on how to support split partitions
> > >> belonging to
> > >> > >> > specified topics in a bundle.
> > >> > >> >
> > >> > >> > The issue can be found:
> > >> https://github.com/apache/pulsar/issues/13761
> > >> > >> >
> > >> > >> > I copy the content here for convenience, any suggestions are
> > >> welcome and
> > >> > >> > appreciated.
> > >> > >> >
> > >> > >> >
> > >> > >> > ## Motivation
> > >> > >> >
> > >> > >> > As we all know, a namespace bundle may contain lots of
> partitions
> > >> > >> belonging
> > >> > >> > to different topics.
> > >> > >> > The throughput of these topics may vary greatly. Some topics
> may
> > >> with
> > >> > >> very
> > >> > >> > high rate/throughput while other topics have a very low
> > >> rate/throughput.
> > >> > >> >
> > >> > >> > These partitions with high rate/throughput can cause broker
> > >> overload and
> > >> > >> > bundle unloading.
> > >> > >> > At this point, if we split bundle manually with
> > >> `range_equally_divide`
> > >> > >> or
> > >> > >> > `topic_count_equally_divide` split algorithm, there may need
> many
> > >> times
> > >> > >> > split before these high rate/through partitions assigned to
> > >> different
> > >> > >> new
> > >> > >> > bundles.
> > >> > >> >
> > >> > >> > For convenience, we call these high throughput topics
> `outstanding
> > >> > >> topic`
> > >> > >> > and their partitions `outstanding partition` in this PIP.
> > >> > >> >
> > >> > >> > ## Goal
> > >> > >> >
> > >> > >> > Our goal is to make it easier to split `outstanding partition`
> > >> into new
> > >> > >> > bundles.
> > >> > >> >
> > >> > >> > There are two alternative ways to achieve this. Either of them
> > >> will add
> > >> > >> a
> > >> > >> > new algorithm for bundle split. The difference is how the new
> > >> bundle
> > >> > >> split
> > >> > >> > algorithm is implemented.
> > >> > >> >
> > >> > >> > One algorithm is to split bundle by `outstanding topic` which
> will
> > >> split
> > >> > >> > the bundle into two new bundles and each new bundle contains an
> > >> equally
> > >> > >> > `outstanding partition` once a time.
> > >> > >> > E.g, a bundle contains lots of topic partitions, and only one
> > >> > >> `outstanding
> > >> > >> > topic`(T) with 2  `outstanding partition` (T-partition-n,
> > >> > >> Tpartition-n+1).
> > >> > >> > This algorithm split this bundle at the middle point of these
> two
> > >> > >> > partition's hashcode.  This algorithm has a disadvantage, it
> can
> > >> only
> > >> > >> deal
> > >> > >> > with one `outstanding topic`.
> > >> > >> >
> > >> > >> > So we raised up another algorithm.
> > >> > >> >
> > >> > >> > The other algorithm is to split the bundle at the hashcode
> point
> > >> of the
> > >> > >> > `outstanding partition` which will split the bundle into three
> > >> bundles
> > >> > >> once
> > >> > >> > a time. The middle one contains the only point the hashcode of
> the
> > >> > >> > `outstanding partition, the left one is less than the
> hashcode, the
> > >> > >> right
> > >> > >> > one is more than the hashcode.
> > >> > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding
> > >> partition`
> > >> > >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07,
> this
> > >> > >> > algorithm  is going to split bundle the bundle into five new
> > >> bundles,
> > >> > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08(
> for
> > >> > >> > partition-y), 0x08_0x10.
> > >> > >> >
> > >> > >> > ## API Changes
> > >> > >> >
> > >> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
> > >> > >> ${bundle_range}`
> > >> > >> > will add a new parameter "--topic" or "-t" for  `outstanding
> topic`
> > >> > >> name.
> > >> > >> >
> > >> > >> > The split interface changed from
> > >> > >> >
> > >> > >> > ```JAVA
> > >> > >> > void splitNamespaceBundle(String namespace, String bundle,
> boolean
> > >> > >> > unloadSplitBundles, String splitAlgorithmName)throws
> > >> > >> PulsarAdminException;
> > >> > >> > ```
> > >> > >> >
> > >> > >> > to
> > >> > >> >
> > >> > >> > ```java
> > >> > >> > void splitNamespaceBundle(String namespace, String bundle,
> boolean
> > >> > >> > unloadSplitBundles,
> > >> > >> >                               String splitAlgorithmName, String
> > >> topic)
> > >> > >> > throws PulsarAdminException;
> > >> > >> > ```
> > >> > >> >
> > >> > >> > ## Implementation
> > >> > >> >
> > >> > >> > There are changes both from the Admin CLI and the broker side.
> > >> > >> >
> > >> > >> > First, Admin CLI for split bundle should support to specify the
> > >> > >> > `outstanding topic`,
> > >> > >> >
> > >> > >> > ```java
> > >> > >> > /**
> > >> > >> >      * Split namespace bundle.
> > >> > >> >      *
> > >> > >> >      * @param namespace
> > >> > >> >      * @param bundle range of bundle to split
> > >> > >> >      * @param unloadSplitBundles
> > >> > >> >      * @param splitAlgorithmName
> > >> > >> >      * @param topic
> > >> > >> >      * @throws PulsarAdminException
> > >> > >> >      */
> > >> > >> >     void splitNamespaceBundle(String namespace, String bundle,
> > >> boolean
> > >> > >> > unloadSplitBundles,
> > >> > >> >                               String splitAlgorithmName, String
> > >> topic)
> > >> > >> > throws PulsarAdminException;
> > >> > >> >
> > >> > >> > ```
> > >> > >> >
> > >> > >> > ```java
> > >> > >> > /**
> > >> > >> >      * Split namespace bundle asynchronously.
> > >> > >> >      *
> > >> > >> >      * @param namespace
> > >> > >> >      * @param bundle range of bundle to split
> > >> > >> >      * @param unloadSplitBundles
> > >> > >> >      * @param splitAlgorithmName
> > >> > >> >      */
> > >> > >> >     CompletableFuture<Void> splitNamespaceBundleAsync(
> > >> > >> >             String namespace, String bundle, boolean
> > >> unloadSplitBundles,
> > >> > >> > String splitAlgorithmName, String topic);
> > >> > >> > ```
> > >> > >> >
> > >> > >> > And for the broker side, first encapsulates the parameters for
> > >> bundle
> > >> > >> split
> > >> > >> > into a new class `BundleSplitOption`
> > >> > >> >
> > >> > >> > ```java
> > >> > >> > public class BundleSplitOption {
> > >> > >> >     private NamespaceService service;
> > >> > >> >     private NamespaceBundle bundle;
> > >> > >> >     private String topic;
> > >> > >> > }
> > >> > >> > ```
> > >> > >> >
> > >> > >> > add a new split algorithm
> > >> > >> >
> > >> > >> > ```java
> > >> > >> > ublic class
> SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm
> > >> > >> implements
> > >> > >> > NamespaceBundleSplitAlgorithm {
> > >> > >> >     @Override
> > >> > >> >     public CompletableFuture<List<Long>>
> > >> > >> getSplitBoundary(BundleSplitOption
> > >> > >> > bundleSplitOption) {
> > >> > >> >
> > >> > >> >         });
> > >> > >> >     }
> > >> > >> > }
> > >> > >> > ```
> > >> > >> >
> > >> > >> > add the new algorithm to `NamespaceBundleSplitAlgorithm`
> > >> > >> >
> > >> > >> > ```JAVA
> > >> > >> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
> > >> > >> > "specified_topic_count_equally_divide";
> > >> > >> >
> > >> > >> > List<String> AVAILABLE_ALGORITHMS =
> > >> > >> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
> > >> > >> >             TOPIC_COUNT_EQUALLY_DIVIDE,
> > >> > >> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
> > >> > >> >
> > >> > >> >  NamespaceBundleSplitAlgorithm
> > >> > >> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
> > >> > >> >             new
> > >> SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
> > >> > >> > ```
> > >> > >> >
> > >> > >> > modify the `splitAndOwnBundle` and
> `splitAndOwnBundleOnceAndRetry`
> > >> for
> > >> > >> >  [[NamespaceService.java](
> > >> > >> >
> > >> > >>
> > >>
> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1
> > >> > >> )
> > >> > >> >
> > >> > >> >
> > >> > >> > ```java
> > >> > >> > public CompletableFuture<Void>
> splitAndOwnBundle(NamespaceBundle
> > >> bundle,
> > >> > >> > boolean unload,
> > >> > >> >
> > >> > >> >  NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {
> > >> > >> >
> > >> > >> >         final CompletableFuture<Void> unloadFuture = new
> > >> > >> > CompletableFuture<>();
> > >> > >> >         final AtomicInteger counter = new
> > >> > >> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
> > >> > >> >         splitAndOwnBundleOnceAndRetry(bundle, unload, counter,
> > >> > >> > unloadFuture, splitAlgorithm, topic);
> > >> > >> >
> > >> > >> >         return unloadFuture;
> > >> > >> >     }
> > >> > >> > ```
> > >> > >> >
> > >> > >> > ```java
> > >> > >> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
> > >> > >> >                                        boolean unload,
> > >> > >> >                                        AtomicInteger counter,
> > >> > >> >                                        CompletableFuture<Void>
> > >> > >> > completionFuture,
> > >> > >> >
> > >> NamespaceBundleSplitAlgorithm
> > >> > >> > splitAlgorithm,
> > >> > >> >                                        String topic) {
> > >> > >> > ```
> > >> > >> >
> > >> > >> > Also, we change the REST api and broker.conf
> > >> > >> >
> > >> > >> > ```java
> > >> > >> > public void splitNamespaceBundle(
> > >> > >> >             @Suspended final AsyncResponse asyncResponse,
> > >> > >> >             @PathParam("property") String property,
> > >> > >> >             @PathParam("cluster") String cluster,
> > >> > >> >             @PathParam("namespace") String namespace,
> > >> > >> >             @PathParam("bundle") String bundleRange,
> > >> > >> >             @QueryParam("authoritative") @DefaultValue("false")
> > >> boolean
> > >> > >> > authoritative,
> > >> > >> >             @QueryParam("unload") @DefaultValue("false")
> boolean
> > >> unload,
> > >> > >> >             @QueryParam("topic") @DefaultValue("") String
> topic) {}
> > >> > >> > ```
> > >> > >> >
> > >> > >> > ```shell
> > >> > >> >
> > >> > >>
> > >>
> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
> > >> > >> > ```
> > >> > >> >
> > >> > >>
> > >> > >
> > >> >
> > >>
> > >
> >
>

Re: [Discuss][PIP-143] Support split paritions belonging to specified topics in a bundle

Posted by Haiting Jiang <ji...@apache.org>.
> 2. calculate the position to split this bundle(also need a new API)
Should this be "positions"? We are going to split one bundle into multi-bundles,  
in most cases, bundle number will be position number + 1, right? 

> And the (anti-)affinity way needs more discussion or maybe we can introduce
> a new PIP for it.
+1, this is not in the scope of this PIP.


Thanks,
Haiting

On 2022/02/21 08:57:01 Aloys Zhang wrote:
> Hi penghui and haiting,
> 
> I try to figure out how the (anti-)affinity works.
> 
> > if I understand correctly, it looks like if we have a partitioned topic
> with 10
> > partitions under a namespace with 16 bundles, if applies the
> anti-affinity policy,
> > partition-0 map to bundle 0, partition-1 map to bundle 1, and so on.
> > Of course, it is not necessary for every partitioned topic to start from
> bundle 0,
> > we can use the partition-0 hash to determine the start bundle index.
> 
> As penghui described, I think this is a mechanism for assigning topics to a
> bundle that controls how a topic is mapped to a bundle.
> 
> > IMO, this affinity serves the purpose of isolating an abnormal topic to
> some spare
> > brokers.  These brokers host these kind of topics only. Here are some
> cases :
> 
> And as haiting mentioned here, It's more like an isolation policy that
> decides topics can be owned by which broker.
> So, I am a little confused about how the (anti-)affinity works now.
> 
> Back to this PIP which aims to solve the problem that a small number of
> topics in a bundle have a load that exceeds the average.
> So, we can
> 1. get the positions for the topics with we are interested( need a new API)
> 2. calculate the position to split this bundle(also need a new API)
> I think this way is enough for solving the problem.
> 
> And the (anti-)affinity way needs more discussion or maybe we can introduce
> a new PIP for it.
> What do you think?@penghui @haiting
> 
> 
> Thanks,
> Aloys
> 
> Aloys Zhang <al...@apache.org> 于2022年2月21日周一 14:39写道:
> 
> > Hi, penghui
> >
> > >  The new API does not necessarily have to query by topic one by one,
> > we have listed all the "topic -> position" of a bundle?
> >
> > I see. After we got all the positions of the topics we want to split in a
> > bundle, it's quite easy for us to decide how to it.
> >
> > Haiting Jiang <ji...@apache.org> 于2022年2月20日周日 12:05写道:
> >
> >> > Do you have an example for affinity? I don't fully understand how this
> >> is
> >> > used
> >> > in practice.
> >>
> >> IMO, this affinity serves the purpose of isolating an abnormal topic to
> >> some spare
> >> brokers.  These brokers host these kind of topics only. Here are some
> >> cases :
> >>
> >> 1. A topic may have unexpected short spike traffic flows periodically and
> >> causing broker overloads and negative impact on other topics.
> >> Until we have more proper solutions, we can always isolate these topics
> >> first,
> >>  and make the service recover time as small as possible.
> >>
> >> 2. Some users may encounter some bugs in brokers, and we can isolate the
> >> topic to
> >> exclusive brokers, and use more radical approach to locate the bug, like
> >> enable debug
> >> level logs or even add some temporary code patch.
> >>
> >> 3. User may already have configured failure domain and anti-affinity
> >> namespace, but with
> >> business logic code changes, some topic may need to migrate from one
> >> namespace
> >> to another. This will take some time for user to change the client side
> >> config.
> >> In the meanwhile, we can isolate the topic first.
> >>
> >> Thanks,
> >> Haiting
> >>
> >> On 2022/02/18 15:26:09 PengHui Li wrote:
> >> > Hi Haiting,
> >> >
> >> > > I think this approach have more potential with abnormal topic
> >> isolation.
> >> > If we can introduce
> >> > some kind of bundle isolation strategy, (like broker-bundle affinity and
> >> > anti-affinity mechanism), we can easily isolate some unexpected traffic
> >> to
> >> > some empty brokers.
> >> > IMO, this would improve the stability of broker cluster.
> >> >
> >> > if I understand correctly, it looks like if we have a partitioned topic
> >> > with 10
> >> > partitions under a namespace with 16 bundles, if applies the
> >> anti-affinity
> >> > policy,
> >> > partition-0 map to bundle 0, partition-1 map to bundle 1, and so on.
> >> > Of course, it is not necessary for every partitioned topic to start from
> >> > bundle 0,
> >> > we can use the partition-0 hash to determine the start bundle index.
> >> >
> >> > Do you have an example for affinity? I don't fully understand how this
> >> is
> >> > used
> >> > in practice.
> >> >
> >> > Best,
> >> > Penghui
> >> >
> >> > On Fri, Feb 18, 2022 at 11:16 PM PengHui Li <pe...@apache.org> wrote:
> >> >
> >> > > Hi Aloys,
> >> > >
> >> > > >  Do you mean that
> >> > > 1. First, add a new API, maybe `getHashPositioin`,  to get the hash
> >> > > position in a bundle
> >> > > 2. Then use this position to split the overloaded bundle
> >> > > If so, when we split a bundle with multi partitions of a topic, we
> >> need to
> >> > > call the `getHashPositioin` multi times to get the middle position of
> >> all
> >> > > these positions.
> >> > >
> >> > > Yes, this want I mean. In this way, users can control to assign 1
> >> topic or
> >> > > 3 topics to one bundle. This is more like increasing the transparency
> >> of
> >> > > the topic in the bundle, you can all the positions of the topics, so
> >> how
> >> > > planning for bundle splitting becomes more flexible.
> >> > >
> >> > > The new API does not necessarily have to query by topic one by one,
> >> > > we have listed all the "topic -> position" of a bundle?
> >> > >
> >> > > Thanks,
> >> > > Penghui
> >> > >
> >> > > On Fri, Feb 18, 2022 at 4:51 PM Haiting Jiang <
> >> jianghaiting@apache.org>
> >> > > wrote:
> >> > >
> >> > >> Hi Aloys,
> >> > >> +1 for this great PIP.
> >> > >>
> >> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
> >> > >> ${bundle_range}`
> >> > >> > will add a new parameter "--topic" or "-t" for  `outstanding topic`
> >> > >> name.
> >> > >>
> >> > >> Do we have limitation on this "topic" parameter. Can this be a
> >> > >> partitioned topic?
> >> > >> If so, will this new algorithm split the bundle into more than 2
> >> bundles?
> >> > >> like each bundle for
> >> > >> one partition.
> >> > >>
> >> > >> > This algorithm has a disadvantage, it can only deal
> >> > >> > with one `outstanding topic`.
> >> > >>
> >> > >> For this disadvantage, I think it can be solved by extends the
> >> "topic"
> >> > >> parameter from one topic to a topic list.
> >> > >>
> >> > >> > The other algorithm is to split the bundle at the hashcode point
> >> of the
> >> > >> > `outstanding partition` which will split the bundle into three
> >> bundles
> >> > >> once
> >> > >> > a time. The middle one contains the only point the hashcode of the
> >> > >> > `outstanding partition, the left one is less than the hashcode, the
> >> > >> right
> >> > >> > one is more than the hashcode.
> >> > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding
> >> partition`
> >> > >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> >> > >> > algorithm  is going to split bundle the bundle into five new
> >> bundles,
> >> > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> >> > >> > partition-y), 0x08_0x10.
> >> > >>
> >> > >> I think this approach have more potential with abnormal topic
> >> isolation.
> >> > >> If we can introduce
> >> > >> some kind of bundle isolation strategy, (like broker-bundle affinity
> >> and
> >> > >> anti-affinity mechanism), we can easily isolate some unexpected
> >> traffic to
> >> > >> some empty brokers.
> >> > >> IMO, this would improve the stability of broker cluster.
> >> > >>
> >> > >> Thanks,
> >> > >> Haiting
> >> > >>
> >> > >> On 2022/02/17 15:47:15 Aloys Zhang wrote:
> >> > >> > Hi Pulsar Community,
> >> > >> >
> >> > >> > This is a PIP discussion on how to support split partitions
> >> belonging to
> >> > >> > specified topics in a bundle.
> >> > >> >
> >> > >> > The issue can be found:
> >> https://github.com/apache/pulsar/issues/13761
> >> > >> >
> >> > >> > I copy the content here for convenience, any suggestions are
> >> welcome and
> >> > >> > appreciated.
> >> > >> >
> >> > >> >
> >> > >> > ## Motivation
> >> > >> >
> >> > >> > As we all know, a namespace bundle may contain lots of partitions
> >> > >> belonging
> >> > >> > to different topics.
> >> > >> > The throughput of these topics may vary greatly. Some topics may
> >> with
> >> > >> very
> >> > >> > high rate/throughput while other topics have a very low
> >> rate/throughput.
> >> > >> >
> >> > >> > These partitions with high rate/throughput can cause broker
> >> overload and
> >> > >> > bundle unloading.
> >> > >> > At this point, if we split bundle manually with
> >> `range_equally_divide`
> >> > >> or
> >> > >> > `topic_count_equally_divide` split algorithm, there may need many
> >> times
> >> > >> > split before these high rate/through partitions assigned to
> >> different
> >> > >> new
> >> > >> > bundles.
> >> > >> >
> >> > >> > For convenience, we call these high throughput topics `outstanding
> >> > >> topic`
> >> > >> > and their partitions `outstanding partition` in this PIP.
> >> > >> >
> >> > >> > ## Goal
> >> > >> >
> >> > >> > Our goal is to make it easier to split `outstanding partition`
> >> into new
> >> > >> > bundles.
> >> > >> >
> >> > >> > There are two alternative ways to achieve this. Either of them
> >> will add
> >> > >> a
> >> > >> > new algorithm for bundle split. The difference is how the new
> >> bundle
> >> > >> split
> >> > >> > algorithm is implemented.
> >> > >> >
> >> > >> > One algorithm is to split bundle by `outstanding topic` which will
> >> split
> >> > >> > the bundle into two new bundles and each new bundle contains an
> >> equally
> >> > >> > `outstanding partition` once a time.
> >> > >> > E.g, a bundle contains lots of topic partitions, and only one
> >> > >> `outstanding
> >> > >> > topic`(T) with 2  `outstanding partition` (T-partition-n,
> >> > >> Tpartition-n+1).
> >> > >> > This algorithm split this bundle at the middle point of these two
> >> > >> > partition's hashcode.  This algorithm has a disadvantage, it can
> >> only
> >> > >> deal
> >> > >> > with one `outstanding topic`.
> >> > >> >
> >> > >> > So we raised up another algorithm.
> >> > >> >
> >> > >> > The other algorithm is to split the bundle at the hashcode point
> >> of the
> >> > >> > `outstanding partition` which will split the bundle into three
> >> bundles
> >> > >> once
> >> > >> > a time. The middle one contains the only point the hashcode of the
> >> > >> > `outstanding partition, the left one is less than the hashcode, the
> >> > >> right
> >> > >> > one is more than the hashcode.
> >> > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding
> >> partition`
> >> > >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> >> > >> > algorithm  is going to split bundle the bundle into five new
> >> bundles,
> >> > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> >> > >> > partition-y), 0x08_0x10.
> >> > >> >
> >> > >> > ## API Changes
> >> > >> >
> >> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
> >> > >> ${bundle_range}`
> >> > >> > will add a new parameter "--topic" or "-t" for  `outstanding topic`
> >> > >> name.
> >> > >> >
> >> > >> > The split interface changed from
> >> > >> >
> >> > >> > ```JAVA
> >> > >> > void splitNamespaceBundle(String namespace, String bundle, boolean
> >> > >> > unloadSplitBundles, String splitAlgorithmName)throws
> >> > >> PulsarAdminException;
> >> > >> > ```
> >> > >> >
> >> > >> > to
> >> > >> >
> >> > >> > ```java
> >> > >> > void splitNamespaceBundle(String namespace, String bundle, boolean
> >> > >> > unloadSplitBundles,
> >> > >> >                               String splitAlgorithmName, String
> >> topic)
> >> > >> > throws PulsarAdminException;
> >> > >> > ```
> >> > >> >
> >> > >> > ## Implementation
> >> > >> >
> >> > >> > There are changes both from the Admin CLI and the broker side.
> >> > >> >
> >> > >> > First, Admin CLI for split bundle should support to specify the
> >> > >> > `outstanding topic`,
> >> > >> >
> >> > >> > ```java
> >> > >> > /**
> >> > >> >      * Split namespace bundle.
> >> > >> >      *
> >> > >> >      * @param namespace
> >> > >> >      * @param bundle range of bundle to split
> >> > >> >      * @param unloadSplitBundles
> >> > >> >      * @param splitAlgorithmName
> >> > >> >      * @param topic
> >> > >> >      * @throws PulsarAdminException
> >> > >> >      */
> >> > >> >     void splitNamespaceBundle(String namespace, String bundle,
> >> boolean
> >> > >> > unloadSplitBundles,
> >> > >> >                               String splitAlgorithmName, String
> >> topic)
> >> > >> > throws PulsarAdminException;
> >> > >> >
> >> > >> > ```
> >> > >> >
> >> > >> > ```java
> >> > >> > /**
> >> > >> >      * Split namespace bundle asynchronously.
> >> > >> >      *
> >> > >> >      * @param namespace
> >> > >> >      * @param bundle range of bundle to split
> >> > >> >      * @param unloadSplitBundles
> >> > >> >      * @param splitAlgorithmName
> >> > >> >      */
> >> > >> >     CompletableFuture<Void> splitNamespaceBundleAsync(
> >> > >> >             String namespace, String bundle, boolean
> >> unloadSplitBundles,
> >> > >> > String splitAlgorithmName, String topic);
> >> > >> > ```
> >> > >> >
> >> > >> > And for the broker side, first encapsulates the parameters for
> >> bundle
> >> > >> split
> >> > >> > into a new class `BundleSplitOption`
> >> > >> >
> >> > >> > ```java
> >> > >> > public class BundleSplitOption {
> >> > >> >     private NamespaceService service;
> >> > >> >     private NamespaceBundle bundle;
> >> > >> >     private String topic;
> >> > >> > }
> >> > >> > ```
> >> > >> >
> >> > >> > add a new split algorithm
> >> > >> >
> >> > >> > ```java
> >> > >> > ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm
> >> > >> implements
> >> > >> > NamespaceBundleSplitAlgorithm {
> >> > >> >     @Override
> >> > >> >     public CompletableFuture<List<Long>>
> >> > >> getSplitBoundary(BundleSplitOption
> >> > >> > bundleSplitOption) {
> >> > >> >
> >> > >> >         });
> >> > >> >     }
> >> > >> > }
> >> > >> > ```
> >> > >> >
> >> > >> > add the new algorithm to `NamespaceBundleSplitAlgorithm`
> >> > >> >
> >> > >> > ```JAVA
> >> > >> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
> >> > >> > "specified_topic_count_equally_divide";
> >> > >> >
> >> > >> > List<String> AVAILABLE_ALGORITHMS =
> >> > >> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
> >> > >> >             TOPIC_COUNT_EQUALLY_DIVIDE,
> >> > >> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
> >> > >> >
> >> > >> >  NamespaceBundleSplitAlgorithm
> >> > >> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
> >> > >> >             new
> >> SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
> >> > >> > ```
> >> > >> >
> >> > >> > modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry`
> >> for
> >> > >> >  [[NamespaceService.java](
> >> > >> >
> >> > >>
> >> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1
> >> > >> )
> >> > >> >
> >> > >> >
> >> > >> > ```java
> >> > >> > public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle
> >> bundle,
> >> > >> > boolean unload,
> >> > >> >
> >> > >> >  NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {
> >> > >> >
> >> > >> >         final CompletableFuture<Void> unloadFuture = new
> >> > >> > CompletableFuture<>();
> >> > >> >         final AtomicInteger counter = new
> >> > >> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
> >> > >> >         splitAndOwnBundleOnceAndRetry(bundle, unload, counter,
> >> > >> > unloadFuture, splitAlgorithm, topic);
> >> > >> >
> >> > >> >         return unloadFuture;
> >> > >> >     }
> >> > >> > ```
> >> > >> >
> >> > >> > ```java
> >> > >> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
> >> > >> >                                        boolean unload,
> >> > >> >                                        AtomicInteger counter,
> >> > >> >                                        CompletableFuture<Void>
> >> > >> > completionFuture,
> >> > >> >
> >> NamespaceBundleSplitAlgorithm
> >> > >> > splitAlgorithm,
> >> > >> >                                        String topic) {
> >> > >> > ```
> >> > >> >
> >> > >> > Also, we change the REST api and broker.conf
> >> > >> >
> >> > >> > ```java
> >> > >> > public void splitNamespaceBundle(
> >> > >> >             @Suspended final AsyncResponse asyncResponse,
> >> > >> >             @PathParam("property") String property,
> >> > >> >             @PathParam("cluster") String cluster,
> >> > >> >             @PathParam("namespace") String namespace,
> >> > >> >             @PathParam("bundle") String bundleRange,
> >> > >> >             @QueryParam("authoritative") @DefaultValue("false")
> >> boolean
> >> > >> > authoritative,
> >> > >> >             @QueryParam("unload") @DefaultValue("false") boolean
> >> unload,
> >> > >> >             @QueryParam("topic") @DefaultValue("") String topic) {}
> >> > >> > ```
> >> > >> >
> >> > >> > ```shell
> >> > >> >
> >> > >>
> >> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
> >> > >> > ```
> >> > >> >
> >> > >>
> >> > >
> >> >
> >>
> >
> 

Re: [Discuss][PIP-143] Support split paritions belonging to specified topics in a bundle

Posted by Aloys Zhang <al...@apache.org>.
Hi penghui and haiting,

I try to figure out how the (anti-)affinity works.

> if I understand correctly, it looks like if we have a partitioned topic
with 10
> partitions under a namespace with 16 bundles, if applies the
anti-affinity policy,
> partition-0 map to bundle 0, partition-1 map to bundle 1, and so on.
> Of course, it is not necessary for every partitioned topic to start from
bundle 0,
> we can use the partition-0 hash to determine the start bundle index.

As penghui described, I think this is a mechanism for assigning topics to a
bundle that controls how a topic is mapped to a bundle.

> IMO, this affinity serves the purpose of isolating an abnormal topic to
some spare
> brokers.  These brokers host these kind of topics only. Here are some
cases :

And as haiting mentioned here, It's more like an isolation policy that
decides topics can be owned by which broker.
So, I am a little confused about how the (anti-)affinity works now.

Back to this PIP which aims to solve the problem that a small number of
topics in a bundle have a load that exceeds the average.
So, we can
1. get the positions for the topics with we are interested( need a new API)
2. calculate the position to split this bundle(also need a new API)
I think this way is enough for solving the problem.

And the (anti-)affinity way needs more discussion or maybe we can introduce
a new PIP for it.
What do you think?@penghui @haiting


Thanks,
Aloys

Aloys Zhang <al...@apache.org> 于2022年2月21日周一 14:39写道:

> Hi, penghui
>
> >  The new API does not necessarily have to query by topic one by one,
> we have listed all the "topic -> position" of a bundle?
>
> I see. After we got all the positions of the topics we want to split in a
> bundle, it's quite easy for us to decide how to it.
>
> Haiting Jiang <ji...@apache.org> 于2022年2月20日周日 12:05写道:
>
>> > Do you have an example for affinity? I don't fully understand how this
>> is
>> > used
>> > in practice.
>>
>> IMO, this affinity serves the purpose of isolating an abnormal topic to
>> some spare
>> brokers.  These brokers host these kind of topics only. Here are some
>> cases :
>>
>> 1. A topic may have unexpected short spike traffic flows periodically and
>> causing broker overloads and negative impact on other topics.
>> Until we have more proper solutions, we can always isolate these topics
>> first,
>>  and make the service recover time as small as possible.
>>
>> 2. Some users may encounter some bugs in brokers, and we can isolate the
>> topic to
>> exclusive brokers, and use more radical approach to locate the bug, like
>> enable debug
>> level logs or even add some temporary code patch.
>>
>> 3. User may already have configured failure domain and anti-affinity
>> namespace, but with
>> business logic code changes, some topic may need to migrate from one
>> namespace
>> to another. This will take some time for user to change the client side
>> config.
>> In the meanwhile, we can isolate the topic first.
>>
>> Thanks,
>> Haiting
>>
>> On 2022/02/18 15:26:09 PengHui Li wrote:
>> > Hi Haiting,
>> >
>> > > I think this approach have more potential with abnormal topic
>> isolation.
>> > If we can introduce
>> > some kind of bundle isolation strategy, (like broker-bundle affinity and
>> > anti-affinity mechanism), we can easily isolate some unexpected traffic
>> to
>> > some empty brokers.
>> > IMO, this would improve the stability of broker cluster.
>> >
>> > if I understand correctly, it looks like if we have a partitioned topic
>> > with 10
>> > partitions under a namespace with 16 bundles, if applies the
>> anti-affinity
>> > policy,
>> > partition-0 map to bundle 0, partition-1 map to bundle 1, and so on.
>> > Of course, it is not necessary for every partitioned topic to start from
>> > bundle 0,
>> > we can use the partition-0 hash to determine the start bundle index.
>> >
>> > Do you have an example for affinity? I don't fully understand how this
>> is
>> > used
>> > in practice.
>> >
>> > Best,
>> > Penghui
>> >
>> > On Fri, Feb 18, 2022 at 11:16 PM PengHui Li <pe...@apache.org> wrote:
>> >
>> > > Hi Aloys,
>> > >
>> > > >  Do you mean that
>> > > 1. First, add a new API, maybe `getHashPositioin`,  to get the hash
>> > > position in a bundle
>> > > 2. Then use this position to split the overloaded bundle
>> > > If so, when we split a bundle with multi partitions of a topic, we
>> need to
>> > > call the `getHashPositioin` multi times to get the middle position of
>> all
>> > > these positions.
>> > >
>> > > Yes, this want I mean. In this way, users can control to assign 1
>> topic or
>> > > 3 topics to one bundle. This is more like increasing the transparency
>> of
>> > > the topic in the bundle, you can all the positions of the topics, so
>> how
>> > > planning for bundle splitting becomes more flexible.
>> > >
>> > > The new API does not necessarily have to query by topic one by one,
>> > > we have listed all the "topic -> position" of a bundle?
>> > >
>> > > Thanks,
>> > > Penghui
>> > >
>> > > On Fri, Feb 18, 2022 at 4:51 PM Haiting Jiang <
>> jianghaiting@apache.org>
>> > > wrote:
>> > >
>> > >> Hi Aloys,
>> > >> +1 for this great PIP.
>> > >>
>> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
>> > >> ${bundle_range}`
>> > >> > will add a new parameter "--topic" or "-t" for  `outstanding topic`
>> > >> name.
>> > >>
>> > >> Do we have limitation on this "topic" parameter. Can this be a
>> > >> partitioned topic?
>> > >> If so, will this new algorithm split the bundle into more than 2
>> bundles?
>> > >> like each bundle for
>> > >> one partition.
>> > >>
>> > >> > This algorithm has a disadvantage, it can only deal
>> > >> > with one `outstanding topic`.
>> > >>
>> > >> For this disadvantage, I think it can be solved by extends the
>> "topic"
>> > >> parameter from one topic to a topic list.
>> > >>
>> > >> > The other algorithm is to split the bundle at the hashcode point
>> of the
>> > >> > `outstanding partition` which will split the bundle into three
>> bundles
>> > >> once
>> > >> > a time. The middle one contains the only point the hashcode of the
>> > >> > `outstanding partition, the left one is less than the hashcode, the
>> > >> right
>> > >> > one is more than the hashcode.
>> > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding
>> partition`
>> > >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
>> > >> > algorithm  is going to split bundle the bundle into five new
>> bundles,
>> > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
>> > >> > partition-y), 0x08_0x10.
>> > >>
>> > >> I think this approach have more potential with abnormal topic
>> isolation.
>> > >> If we can introduce
>> > >> some kind of bundle isolation strategy, (like broker-bundle affinity
>> and
>> > >> anti-affinity mechanism), we can easily isolate some unexpected
>> traffic to
>> > >> some empty brokers.
>> > >> IMO, this would improve the stability of broker cluster.
>> > >>
>> > >> Thanks,
>> > >> Haiting
>> > >>
>> > >> On 2022/02/17 15:47:15 Aloys Zhang wrote:
>> > >> > Hi Pulsar Community,
>> > >> >
>> > >> > This is a PIP discussion on how to support split partitions
>> belonging to
>> > >> > specified topics in a bundle.
>> > >> >
>> > >> > The issue can be found:
>> https://github.com/apache/pulsar/issues/13761
>> > >> >
>> > >> > I copy the content here for convenience, any suggestions are
>> welcome and
>> > >> > appreciated.
>> > >> >
>> > >> >
>> > >> > ## Motivation
>> > >> >
>> > >> > As we all know, a namespace bundle may contain lots of partitions
>> > >> belonging
>> > >> > to different topics.
>> > >> > The throughput of these topics may vary greatly. Some topics may
>> with
>> > >> very
>> > >> > high rate/throughput while other topics have a very low
>> rate/throughput.
>> > >> >
>> > >> > These partitions with high rate/throughput can cause broker
>> overload and
>> > >> > bundle unloading.
>> > >> > At this point, if we split bundle manually with
>> `range_equally_divide`
>> > >> or
>> > >> > `topic_count_equally_divide` split algorithm, there may need many
>> times
>> > >> > split before these high rate/through partitions assigned to
>> different
>> > >> new
>> > >> > bundles.
>> > >> >
>> > >> > For convenience, we call these high throughput topics `outstanding
>> > >> topic`
>> > >> > and their partitions `outstanding partition` in this PIP.
>> > >> >
>> > >> > ## Goal
>> > >> >
>> > >> > Our goal is to make it easier to split `outstanding partition`
>> into new
>> > >> > bundles.
>> > >> >
>> > >> > There are two alternative ways to achieve this. Either of them
>> will add
>> > >> a
>> > >> > new algorithm for bundle split. The difference is how the new
>> bundle
>> > >> split
>> > >> > algorithm is implemented.
>> > >> >
>> > >> > One algorithm is to split bundle by `outstanding topic` which will
>> split
>> > >> > the bundle into two new bundles and each new bundle contains an
>> equally
>> > >> > `outstanding partition` once a time.
>> > >> > E.g, a bundle contains lots of topic partitions, and only one
>> > >> `outstanding
>> > >> > topic`(T) with 2  `outstanding partition` (T-partition-n,
>> > >> Tpartition-n+1).
>> > >> > This algorithm split this bundle at the middle point of these two
>> > >> > partition's hashcode.  This algorithm has a disadvantage, it can
>> only
>> > >> deal
>> > >> > with one `outstanding topic`.
>> > >> >
>> > >> > So we raised up another algorithm.
>> > >> >
>> > >> > The other algorithm is to split the bundle at the hashcode point
>> of the
>> > >> > `outstanding partition` which will split the bundle into three
>> bundles
>> > >> once
>> > >> > a time. The middle one contains the only point the hashcode of the
>> > >> > `outstanding partition, the left one is less than the hashcode, the
>> > >> right
>> > >> > one is more than the hashcode.
>> > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding
>> partition`
>> > >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
>> > >> > algorithm  is going to split bundle the bundle into five new
>> bundles,
>> > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
>> > >> > partition-y), 0x08_0x10.
>> > >> >
>> > >> > ## API Changes
>> > >> >
>> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
>> > >> ${bundle_range}`
>> > >> > will add a new parameter "--topic" or "-t" for  `outstanding topic`
>> > >> name.
>> > >> >
>> > >> > The split interface changed from
>> > >> >
>> > >> > ```JAVA
>> > >> > void splitNamespaceBundle(String namespace, String bundle, boolean
>> > >> > unloadSplitBundles, String splitAlgorithmName)throws
>> > >> PulsarAdminException;
>> > >> > ```
>> > >> >
>> > >> > to
>> > >> >
>> > >> > ```java
>> > >> > void splitNamespaceBundle(String namespace, String bundle, boolean
>> > >> > unloadSplitBundles,
>> > >> >                               String splitAlgorithmName, String
>> topic)
>> > >> > throws PulsarAdminException;
>> > >> > ```
>> > >> >
>> > >> > ## Implementation
>> > >> >
>> > >> > There are changes both from the Admin CLI and the broker side.
>> > >> >
>> > >> > First, Admin CLI for split bundle should support to specify the
>> > >> > `outstanding topic`,
>> > >> >
>> > >> > ```java
>> > >> > /**
>> > >> >      * Split namespace bundle.
>> > >> >      *
>> > >> >      * @param namespace
>> > >> >      * @param bundle range of bundle to split
>> > >> >      * @param unloadSplitBundles
>> > >> >      * @param splitAlgorithmName
>> > >> >      * @param topic
>> > >> >      * @throws PulsarAdminException
>> > >> >      */
>> > >> >     void splitNamespaceBundle(String namespace, String bundle,
>> boolean
>> > >> > unloadSplitBundles,
>> > >> >                               String splitAlgorithmName, String
>> topic)
>> > >> > throws PulsarAdminException;
>> > >> >
>> > >> > ```
>> > >> >
>> > >> > ```java
>> > >> > /**
>> > >> >      * Split namespace bundle asynchronously.
>> > >> >      *
>> > >> >      * @param namespace
>> > >> >      * @param bundle range of bundle to split
>> > >> >      * @param unloadSplitBundles
>> > >> >      * @param splitAlgorithmName
>> > >> >      */
>> > >> >     CompletableFuture<Void> splitNamespaceBundleAsync(
>> > >> >             String namespace, String bundle, boolean
>> unloadSplitBundles,
>> > >> > String splitAlgorithmName, String topic);
>> > >> > ```
>> > >> >
>> > >> > And for the broker side, first encapsulates the parameters for
>> bundle
>> > >> split
>> > >> > into a new class `BundleSplitOption`
>> > >> >
>> > >> > ```java
>> > >> > public class BundleSplitOption {
>> > >> >     private NamespaceService service;
>> > >> >     private NamespaceBundle bundle;
>> > >> >     private String topic;
>> > >> > }
>> > >> > ```
>> > >> >
>> > >> > add a new split algorithm
>> > >> >
>> > >> > ```java
>> > >> > ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm
>> > >> implements
>> > >> > NamespaceBundleSplitAlgorithm {
>> > >> >     @Override
>> > >> >     public CompletableFuture<List<Long>>
>> > >> getSplitBoundary(BundleSplitOption
>> > >> > bundleSplitOption) {
>> > >> >
>> > >> >         });
>> > >> >     }
>> > >> > }
>> > >> > ```
>> > >> >
>> > >> > add the new algorithm to `NamespaceBundleSplitAlgorithm`
>> > >> >
>> > >> > ```JAVA
>> > >> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
>> > >> > "specified_topic_count_equally_divide";
>> > >> >
>> > >> > List<String> AVAILABLE_ALGORITHMS =
>> > >> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
>> > >> >             TOPIC_COUNT_EQUALLY_DIVIDE,
>> > >> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
>> > >> >
>> > >> >  NamespaceBundleSplitAlgorithm
>> > >> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
>> > >> >             new
>> SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
>> > >> > ```
>> > >> >
>> > >> > modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry`
>> for
>> > >> >  [[NamespaceService.java](
>> > >> >
>> > >>
>> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1
>> > >> )
>> > >> >
>> > >> >
>> > >> > ```java
>> > >> > public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle
>> bundle,
>> > >> > boolean unload,
>> > >> >
>> > >> >  NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {
>> > >> >
>> > >> >         final CompletableFuture<Void> unloadFuture = new
>> > >> > CompletableFuture<>();
>> > >> >         final AtomicInteger counter = new
>> > >> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
>> > >> >         splitAndOwnBundleOnceAndRetry(bundle, unload, counter,
>> > >> > unloadFuture, splitAlgorithm, topic);
>> > >> >
>> > >> >         return unloadFuture;
>> > >> >     }
>> > >> > ```
>> > >> >
>> > >> > ```java
>> > >> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
>> > >> >                                        boolean unload,
>> > >> >                                        AtomicInteger counter,
>> > >> >                                        CompletableFuture<Void>
>> > >> > completionFuture,
>> > >> >
>> NamespaceBundleSplitAlgorithm
>> > >> > splitAlgorithm,
>> > >> >                                        String topic) {
>> > >> > ```
>> > >> >
>> > >> > Also, we change the REST api and broker.conf
>> > >> >
>> > >> > ```java
>> > >> > public void splitNamespaceBundle(
>> > >> >             @Suspended final AsyncResponse asyncResponse,
>> > >> >             @PathParam("property") String property,
>> > >> >             @PathParam("cluster") String cluster,
>> > >> >             @PathParam("namespace") String namespace,
>> > >> >             @PathParam("bundle") String bundleRange,
>> > >> >             @QueryParam("authoritative") @DefaultValue("false")
>> boolean
>> > >> > authoritative,
>> > >> >             @QueryParam("unload") @DefaultValue("false") boolean
>> unload,
>> > >> >             @QueryParam("topic") @DefaultValue("") String topic) {}
>> > >> > ```
>> > >> >
>> > >> > ```shell
>> > >> >
>> > >>
>> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
>> > >> > ```
>> > >> >
>> > >>
>> > >
>> >
>>
>

Re: [Discuss][PIP-143] Support split paritions belonging to specified topics in a bundle

Posted by Aloys Zhang <al...@apache.org>.
Hi, penghui

>  The new API does not necessarily have to query by topic one by one,
we have listed all the "topic -> position" of a bundle?

I see. After we got all the positions of the topics we want to split in a
bundle, it's quite easy for us to decide how to it.

Haiting Jiang <ji...@apache.org> 于2022年2月20日周日 12:05写道:

> > Do you have an example for affinity? I don't fully understand how this is
> > used
> > in practice.
>
> IMO, this affinity serves the purpose of isolating an abnormal topic to
> some spare
> brokers.  These brokers host these kind of topics only. Here are some
> cases :
>
> 1. A topic may have unexpected short spike traffic flows periodically and
> causing broker overloads and negative impact on other topics.
> Until we have more proper solutions, we can always isolate these topics
> first,
>  and make the service recover time as small as possible.
>
> 2. Some users may encounter some bugs in brokers, and we can isolate the
> topic to
> exclusive brokers, and use more radical approach to locate the bug, like
> enable debug
> level logs or even add some temporary code patch.
>
> 3. User may already have configured failure domain and anti-affinity
> namespace, but with
> business logic code changes, some topic may need to migrate from one
> namespace
> to another. This will take some time for user to change the client side
> config.
> In the meanwhile, we can isolate the topic first.
>
> Thanks,
> Haiting
>
> On 2022/02/18 15:26:09 PengHui Li wrote:
> > Hi Haiting,
> >
> > > I think this approach have more potential with abnormal topic
> isolation.
> > If we can introduce
> > some kind of bundle isolation strategy, (like broker-bundle affinity and
> > anti-affinity mechanism), we can easily isolate some unexpected traffic
> to
> > some empty brokers.
> > IMO, this would improve the stability of broker cluster.
> >
> > if I understand correctly, it looks like if we have a partitioned topic
> > with 10
> > partitions under a namespace with 16 bundles, if applies the
> anti-affinity
> > policy,
> > partition-0 map to bundle 0, partition-1 map to bundle 1, and so on.
> > Of course, it is not necessary for every partitioned topic to start from
> > bundle 0,
> > we can use the partition-0 hash to determine the start bundle index.
> >
> > Do you have an example for affinity? I don't fully understand how this is
> > used
> > in practice.
> >
> > Best,
> > Penghui
> >
> > On Fri, Feb 18, 2022 at 11:16 PM PengHui Li <pe...@apache.org> wrote:
> >
> > > Hi Aloys,
> > >
> > > >  Do you mean that
> > > 1. First, add a new API, maybe `getHashPositioin`,  to get the hash
> > > position in a bundle
> > > 2. Then use this position to split the overloaded bundle
> > > If so, when we split a bundle with multi partitions of a topic, we
> need to
> > > call the `getHashPositioin` multi times to get the middle position of
> all
> > > these positions.
> > >
> > > Yes, this want I mean. In this way, users can control to assign 1
> topic or
> > > 3 topics to one bundle. This is more like increasing the transparency
> of
> > > the topic in the bundle, you can all the positions of the topics, so
> how
> > > planning for bundle splitting becomes more flexible.
> > >
> > > The new API does not necessarily have to query by topic one by one,
> > > we have listed all the "topic -> position" of a bundle?
> > >
> > > Thanks,
> > > Penghui
> > >
> > > On Fri, Feb 18, 2022 at 4:51 PM Haiting Jiang <jianghaiting@apache.org
> >
> > > wrote:
> > >
> > >> Hi Aloys,
> > >> +1 for this great PIP.
> > >>
> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
> > >> ${bundle_range}`
> > >> > will add a new parameter "--topic" or "-t" for  `outstanding topic`
> > >> name.
> > >>
> > >> Do we have limitation on this "topic" parameter. Can this be a
> > >> partitioned topic?
> > >> If so, will this new algorithm split the bundle into more than 2
> bundles?
> > >> like each bundle for
> > >> one partition.
> > >>
> > >> > This algorithm has a disadvantage, it can only deal
> > >> > with one `outstanding topic`.
> > >>
> > >> For this disadvantage, I think it can be solved by extends the "topic"
> > >> parameter from one topic to a topic list.
> > >>
> > >> > The other algorithm is to split the bundle at the hashcode point of
> the
> > >> > `outstanding partition` which will split the bundle into three
> bundles
> > >> once
> > >> > a time. The middle one contains the only point the hashcode of the
> > >> > `outstanding partition, the left one is less than the hashcode, the
> > >> right
> > >> > one is more than the hashcode.
> > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding
> partition`
> > >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> > >> > algorithm  is going to split bundle the bundle into five new
> bundles,
> > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> > >> > partition-y), 0x08_0x10.
> > >>
> > >> I think this approach have more potential with abnormal topic
> isolation.
> > >> If we can introduce
> > >> some kind of bundle isolation strategy, (like broker-bundle affinity
> and
> > >> anti-affinity mechanism), we can easily isolate some unexpected
> traffic to
> > >> some empty brokers.
> > >> IMO, this would improve the stability of broker cluster.
> > >>
> > >> Thanks,
> > >> Haiting
> > >>
> > >> On 2022/02/17 15:47:15 Aloys Zhang wrote:
> > >> > Hi Pulsar Community,
> > >> >
> > >> > This is a PIP discussion on how to support split partitions
> belonging to
> > >> > specified topics in a bundle.
> > >> >
> > >> > The issue can be found:
> https://github.com/apache/pulsar/issues/13761
> > >> >
> > >> > I copy the content here for convenience, any suggestions are
> welcome and
> > >> > appreciated.
> > >> >
> > >> >
> > >> > ## Motivation
> > >> >
> > >> > As we all know, a namespace bundle may contain lots of partitions
> > >> belonging
> > >> > to different topics.
> > >> > The throughput of these topics may vary greatly. Some topics may
> with
> > >> very
> > >> > high rate/throughput while other topics have a very low
> rate/throughput.
> > >> >
> > >> > These partitions with high rate/throughput can cause broker
> overload and
> > >> > bundle unloading.
> > >> > At this point, if we split bundle manually with
> `range_equally_divide`
> > >> or
> > >> > `topic_count_equally_divide` split algorithm, there may need many
> times
> > >> > split before these high rate/through partitions assigned to
> different
> > >> new
> > >> > bundles.
> > >> >
> > >> > For convenience, we call these high throughput topics `outstanding
> > >> topic`
> > >> > and their partitions `outstanding partition` in this PIP.
> > >> >
> > >> > ## Goal
> > >> >
> > >> > Our goal is to make it easier to split `outstanding partition` into
> new
> > >> > bundles.
> > >> >
> > >> > There are two alternative ways to achieve this. Either of them will
> add
> > >> a
> > >> > new algorithm for bundle split. The difference is how the new bundle
> > >> split
> > >> > algorithm is implemented.
> > >> >
> > >> > One algorithm is to split bundle by `outstanding topic` which will
> split
> > >> > the bundle into two new bundles and each new bundle contains an
> equally
> > >> > `outstanding partition` once a time.
> > >> > E.g, a bundle contains lots of topic partitions, and only one
> > >> `outstanding
> > >> > topic`(T) with 2  `outstanding partition` (T-partition-n,
> > >> Tpartition-n+1).
> > >> > This algorithm split this bundle at the middle point of these two
> > >> > partition's hashcode.  This algorithm has a disadvantage, it can
> only
> > >> deal
> > >> > with one `outstanding topic`.
> > >> >
> > >> > So we raised up another algorithm.
> > >> >
> > >> > The other algorithm is to split the bundle at the hashcode point of
> the
> > >> > `outstanding partition` which will split the bundle into three
> bundles
> > >> once
> > >> > a time. The middle one contains the only point the hashcode of the
> > >> > `outstanding partition, the left one is less than the hashcode, the
> > >> right
> > >> > one is more than the hashcode.
> > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding
> partition`
> > >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> > >> > algorithm  is going to split bundle the bundle into five new
> bundles,
> > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> > >> > partition-y), 0x08_0x10.
> > >> >
> > >> > ## API Changes
> > >> >
> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
> > >> ${bundle_range}`
> > >> > will add a new parameter "--topic" or "-t" for  `outstanding topic`
> > >> name.
> > >> >
> > >> > The split interface changed from
> > >> >
> > >> > ```JAVA
> > >> > void splitNamespaceBundle(String namespace, String bundle, boolean
> > >> > unloadSplitBundles, String splitAlgorithmName)throws
> > >> PulsarAdminException;
> > >> > ```
> > >> >
> > >> > to
> > >> >
> > >> > ```java
> > >> > void splitNamespaceBundle(String namespace, String bundle, boolean
> > >> > unloadSplitBundles,
> > >> >                               String splitAlgorithmName, String
> topic)
> > >> > throws PulsarAdminException;
> > >> > ```
> > >> >
> > >> > ## Implementation
> > >> >
> > >> > There are changes both from the Admin CLI and the broker side.
> > >> >
> > >> > First, Admin CLI for split bundle should support to specify the
> > >> > `outstanding topic`,
> > >> >
> > >> > ```java
> > >> > /**
> > >> >      * Split namespace bundle.
> > >> >      *
> > >> >      * @param namespace
> > >> >      * @param bundle range of bundle to split
> > >> >      * @param unloadSplitBundles
> > >> >      * @param splitAlgorithmName
> > >> >      * @param topic
> > >> >      * @throws PulsarAdminException
> > >> >      */
> > >> >     void splitNamespaceBundle(String namespace, String bundle,
> boolean
> > >> > unloadSplitBundles,
> > >> >                               String splitAlgorithmName, String
> topic)
> > >> > throws PulsarAdminException;
> > >> >
> > >> > ```
> > >> >
> > >> > ```java
> > >> > /**
> > >> >      * Split namespace bundle asynchronously.
> > >> >      *
> > >> >      * @param namespace
> > >> >      * @param bundle range of bundle to split
> > >> >      * @param unloadSplitBundles
> > >> >      * @param splitAlgorithmName
> > >> >      */
> > >> >     CompletableFuture<Void> splitNamespaceBundleAsync(
> > >> >             String namespace, String bundle, boolean
> unloadSplitBundles,
> > >> > String splitAlgorithmName, String topic);
> > >> > ```
> > >> >
> > >> > And for the broker side, first encapsulates the parameters for
> bundle
> > >> split
> > >> > into a new class `BundleSplitOption`
> > >> >
> > >> > ```java
> > >> > public class BundleSplitOption {
> > >> >     private NamespaceService service;
> > >> >     private NamespaceBundle bundle;
> > >> >     private String topic;
> > >> > }
> > >> > ```
> > >> >
> > >> > add a new split algorithm
> > >> >
> > >> > ```java
> > >> > ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm
> > >> implements
> > >> > NamespaceBundleSplitAlgorithm {
> > >> >     @Override
> > >> >     public CompletableFuture<List<Long>>
> > >> getSplitBoundary(BundleSplitOption
> > >> > bundleSplitOption) {
> > >> >
> > >> >         });
> > >> >     }
> > >> > }
> > >> > ```
> > >> >
> > >> > add the new algorithm to `NamespaceBundleSplitAlgorithm`
> > >> >
> > >> > ```JAVA
> > >> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
> > >> > "specified_topic_count_equally_divide";
> > >> >
> > >> > List<String> AVAILABLE_ALGORITHMS =
> > >> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
> > >> >             TOPIC_COUNT_EQUALLY_DIVIDE,
> > >> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
> > >> >
> > >> >  NamespaceBundleSplitAlgorithm
> > >> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
> > >> >             new
> SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
> > >> > ```
> > >> >
> > >> > modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry`
> for
> > >> >  [[NamespaceService.java](
> > >> >
> > >>
> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1
> > >> )
> > >> >
> > >> >
> > >> > ```java
> > >> > public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle
> bundle,
> > >> > boolean unload,
> > >> >
> > >> >  NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {
> > >> >
> > >> >         final CompletableFuture<Void> unloadFuture = new
> > >> > CompletableFuture<>();
> > >> >         final AtomicInteger counter = new
> > >> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
> > >> >         splitAndOwnBundleOnceAndRetry(bundle, unload, counter,
> > >> > unloadFuture, splitAlgorithm, topic);
> > >> >
> > >> >         return unloadFuture;
> > >> >     }
> > >> > ```
> > >> >
> > >> > ```java
> > >> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
> > >> >                                        boolean unload,
> > >> >                                        AtomicInteger counter,
> > >> >                                        CompletableFuture<Void>
> > >> > completionFuture,
> > >> >                                        NamespaceBundleSplitAlgorithm
> > >> > splitAlgorithm,
> > >> >                                        String topic) {
> > >> > ```
> > >> >
> > >> > Also, we change the REST api and broker.conf
> > >> >
> > >> > ```java
> > >> > public void splitNamespaceBundle(
> > >> >             @Suspended final AsyncResponse asyncResponse,
> > >> >             @PathParam("property") String property,
> > >> >             @PathParam("cluster") String cluster,
> > >> >             @PathParam("namespace") String namespace,
> > >> >             @PathParam("bundle") String bundleRange,
> > >> >             @QueryParam("authoritative") @DefaultValue("false")
> boolean
> > >> > authoritative,
> > >> >             @QueryParam("unload") @DefaultValue("false") boolean
> unload,
> > >> >             @QueryParam("topic") @DefaultValue("") String topic) {}
> > >> > ```
> > >> >
> > >> > ```shell
> > >> >
> > >>
> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
> > >> > ```
> > >> >
> > >>
> > >
> >
>

Re: [Discuss][PIP-143] Support split paritions belonging to specified topics in a bundle

Posted by Haiting Jiang <ji...@apache.org>.
> Do you have an example for affinity? I don't fully understand how this is
> used
> in practice.

IMO, this affinity serves the purpose of isolating an abnormal topic to some spare 
brokers.  These brokers host these kind of topics only. Here are some cases :

1. A topic may have unexpected short spike traffic flows periodically and 
causing broker overloads and negative impact on other topics. 
Until we have more proper solutions, we can always isolate these topics first,
 and make the service recover time as small as possible. 

2. Some users may encounter some bugs in brokers, and we can isolate the topic to 
exclusive brokers, and use more radical approach to locate the bug, like enable debug 
level logs or even add some temporary code patch.

3. User may already have configured failure domain and anti-affinity namespace, but with 
business logic code changes, some topic may need to migrate from one namespace 
to another. This will take some time for user to change the client side config.
In the meanwhile, we can isolate the topic first.

Thanks,
Haiting

On 2022/02/18 15:26:09 PengHui Li wrote:
> Hi Haiting,
> 
> > I think this approach have more potential with abnormal topic isolation.
> If we can introduce
> some kind of bundle isolation strategy, (like broker-bundle affinity and
> anti-affinity mechanism), we can easily isolate some unexpected traffic to
> some empty brokers.
> IMO, this would improve the stability of broker cluster.
> 
> if I understand correctly, it looks like if we have a partitioned topic
> with 10
> partitions under a namespace with 16 bundles, if applies the anti-affinity
> policy,
> partition-0 map to bundle 0, partition-1 map to bundle 1, and so on.
> Of course, it is not necessary for every partitioned topic to start from
> bundle 0,
> we can use the partition-0 hash to determine the start bundle index.
> 
> Do you have an example for affinity? I don't fully understand how this is
> used
> in practice.
> 
> Best,
> Penghui
> 
> On Fri, Feb 18, 2022 at 11:16 PM PengHui Li <pe...@apache.org> wrote:
> 
> > Hi Aloys,
> >
> > >  Do you mean that
> > 1. First, add a new API, maybe `getHashPositioin`,  to get the hash
> > position in a bundle
> > 2. Then use this position to split the overloaded bundle
> > If so, when we split a bundle with multi partitions of a topic, we need to
> > call the `getHashPositioin` multi times to get the middle position of all
> > these positions.
> >
> > Yes, this want I mean. In this way, users can control to assign 1 topic or
> > 3 topics to one bundle. This is more like increasing the transparency of
> > the topic in the bundle, you can all the positions of the topics, so how
> > planning for bundle splitting becomes more flexible.
> >
> > The new API does not necessarily have to query by topic one by one,
> > we have listed all the "topic -> position" of a bundle?
> >
> > Thanks,
> > Penghui
> >
> > On Fri, Feb 18, 2022 at 4:51 PM Haiting Jiang <ji...@apache.org>
> > wrote:
> >
> >> Hi Aloys,
> >> +1 for this great PIP.
> >>
> >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
> >> ${bundle_range}`
> >> > will add a new parameter "--topic" or "-t" for  `outstanding topic`
> >> name.
> >>
> >> Do we have limitation on this "topic" parameter. Can this be a
> >> partitioned topic?
> >> If so, will this new algorithm split the bundle into more than 2 bundles?
> >> like each bundle for
> >> one partition.
> >>
> >> > This algorithm has a disadvantage, it can only deal
> >> > with one `outstanding topic`.
> >>
> >> For this disadvantage, I think it can be solved by extends the "topic"
> >> parameter from one topic to a topic list.
> >>
> >> > The other algorithm is to split the bundle at the hashcode point of the
> >> > `outstanding partition` which will split the bundle into three bundles
> >> once
> >> > a time. The middle one contains the only point the hashcode of the
> >> > `outstanding partition, the left one is less than the hashcode, the
> >> right
> >> > one is more than the hashcode.
> >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
> >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> >> > algorithm  is going to split bundle the bundle into five new bundles,
> >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> >> > partition-y), 0x08_0x10.
> >>
> >> I think this approach have more potential with abnormal topic isolation.
> >> If we can introduce
> >> some kind of bundle isolation strategy, (like broker-bundle affinity and
> >> anti-affinity mechanism), we can easily isolate some unexpected traffic to
> >> some empty brokers.
> >> IMO, this would improve the stability of broker cluster.
> >>
> >> Thanks,
> >> Haiting
> >>
> >> On 2022/02/17 15:47:15 Aloys Zhang wrote:
> >> > Hi Pulsar Community,
> >> >
> >> > This is a PIP discussion on how to support split partitions belonging to
> >> > specified topics in a bundle.
> >> >
> >> > The issue can be found: https://github.com/apache/pulsar/issues/13761
> >> >
> >> > I copy the content here for convenience, any suggestions are welcome and
> >> > appreciated.
> >> >
> >> >
> >> > ## Motivation
> >> >
> >> > As we all know, a namespace bundle may contain lots of partitions
> >> belonging
> >> > to different topics.
> >> > The throughput of these topics may vary greatly. Some topics may with
> >> very
> >> > high rate/throughput while other topics have a very low rate/throughput.
> >> >
> >> > These partitions with high rate/throughput can cause broker overload and
> >> > bundle unloading.
> >> > At this point, if we split bundle manually with `range_equally_divide`
> >> or
> >> > `topic_count_equally_divide` split algorithm, there may need many times
> >> > split before these high rate/through partitions assigned to different
> >> new
> >> > bundles.
> >> >
> >> > For convenience, we call these high throughput topics `outstanding
> >> topic`
> >> > and their partitions `outstanding partition` in this PIP.
> >> >
> >> > ## Goal
> >> >
> >> > Our goal is to make it easier to split `outstanding partition` into new
> >> > bundles.
> >> >
> >> > There are two alternative ways to achieve this. Either of them will add
> >> a
> >> > new algorithm for bundle split. The difference is how the new bundle
> >> split
> >> > algorithm is implemented.
> >> >
> >> > One algorithm is to split bundle by `outstanding topic` which will split
> >> > the bundle into two new bundles and each new bundle contains an equally
> >> > `outstanding partition` once a time.
> >> > E.g, a bundle contains lots of topic partitions, and only one
> >> `outstanding
> >> > topic`(T) with 2  `outstanding partition` (T-partition-n,
> >> Tpartition-n+1).
> >> > This algorithm split this bundle at the middle point of these two
> >> > partition's hashcode.  This algorithm has a disadvantage, it can only
> >> deal
> >> > with one `outstanding topic`.
> >> >
> >> > So we raised up another algorithm.
> >> >
> >> > The other algorithm is to split the bundle at the hashcode point of the
> >> > `outstanding partition` which will split the bundle into three bundles
> >> once
> >> > a time. The middle one contains the only point the hashcode of the
> >> > `outstanding partition, the left one is less than the hashcode, the
> >> right
> >> > one is more than the hashcode.
> >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
> >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> >> > algorithm  is going to split bundle the bundle into five new bundles,
> >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> >> > partition-y), 0x08_0x10.
> >> >
> >> > ## API Changes
> >> >
> >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
> >> ${bundle_range}`
> >> > will add a new parameter "--topic" or "-t" for  `outstanding topic`
> >> name.
> >> >
> >> > The split interface changed from
> >> >
> >> > ```JAVA
> >> > void splitNamespaceBundle(String namespace, String bundle, boolean
> >> > unloadSplitBundles, String splitAlgorithmName)throws
> >> PulsarAdminException;
> >> > ```
> >> >
> >> > to
> >> >
> >> > ```java
> >> > void splitNamespaceBundle(String namespace, String bundle, boolean
> >> > unloadSplitBundles,
> >> >                               String splitAlgorithmName, String topic)
> >> > throws PulsarAdminException;
> >> > ```
> >> >
> >> > ## Implementation
> >> >
> >> > There are changes both from the Admin CLI and the broker side.
> >> >
> >> > First, Admin CLI for split bundle should support to specify the
> >> > `outstanding topic`,
> >> >
> >> > ```java
> >> > /**
> >> >      * Split namespace bundle.
> >> >      *
> >> >      * @param namespace
> >> >      * @param bundle range of bundle to split
> >> >      * @param unloadSplitBundles
> >> >      * @param splitAlgorithmName
> >> >      * @param topic
> >> >      * @throws PulsarAdminException
> >> >      */
> >> >     void splitNamespaceBundle(String namespace, String bundle, boolean
> >> > unloadSplitBundles,
> >> >                               String splitAlgorithmName, String topic)
> >> > throws PulsarAdminException;
> >> >
> >> > ```
> >> >
> >> > ```java
> >> > /**
> >> >      * Split namespace bundle asynchronously.
> >> >      *
> >> >      * @param namespace
> >> >      * @param bundle range of bundle to split
> >> >      * @param unloadSplitBundles
> >> >      * @param splitAlgorithmName
> >> >      */
> >> >     CompletableFuture<Void> splitNamespaceBundleAsync(
> >> >             String namespace, String bundle, boolean unloadSplitBundles,
> >> > String splitAlgorithmName, String topic);
> >> > ```
> >> >
> >> > And for the broker side, first encapsulates the parameters for bundle
> >> split
> >> > into a new class `BundleSplitOption`
> >> >
> >> > ```java
> >> > public class BundleSplitOption {
> >> >     private NamespaceService service;
> >> >     private NamespaceBundle bundle;
> >> >     private String topic;
> >> > }
> >> > ```
> >> >
> >> > add a new split algorithm
> >> >
> >> > ```java
> >> > ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm
> >> implements
> >> > NamespaceBundleSplitAlgorithm {
> >> >     @Override
> >> >     public CompletableFuture<List<Long>>
> >> getSplitBoundary(BundleSplitOption
> >> > bundleSplitOption) {
> >> >
> >> >         });
> >> >     }
> >> > }
> >> > ```
> >> >
> >> > add the new algorithm to `NamespaceBundleSplitAlgorithm`
> >> >
> >> > ```JAVA
> >> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
> >> > "specified_topic_count_equally_divide";
> >> >
> >> > List<String> AVAILABLE_ALGORITHMS =
> >> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
> >> >             TOPIC_COUNT_EQUALLY_DIVIDE,
> >> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
> >> >
> >> >  NamespaceBundleSplitAlgorithm
> >> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
> >> >             new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
> >> > ```
> >> >
> >> > modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` for
> >> >  [[NamespaceService.java](
> >> >
> >> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1
> >> )
> >> >
> >> >
> >> > ```java
> >> > public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle,
> >> > boolean unload,
> >> >
> >> >  NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {
> >> >
> >> >         final CompletableFuture<Void> unloadFuture = new
> >> > CompletableFuture<>();
> >> >         final AtomicInteger counter = new
> >> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
> >> >         splitAndOwnBundleOnceAndRetry(bundle, unload, counter,
> >> > unloadFuture, splitAlgorithm, topic);
> >> >
> >> >         return unloadFuture;
> >> >     }
> >> > ```
> >> >
> >> > ```java
> >> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
> >> >                                        boolean unload,
> >> >                                        AtomicInteger counter,
> >> >                                        CompletableFuture<Void>
> >> > completionFuture,
> >> >                                        NamespaceBundleSplitAlgorithm
> >> > splitAlgorithm,
> >> >                                        String topic) {
> >> > ```
> >> >
> >> > Also, we change the REST api and broker.conf
> >> >
> >> > ```java
> >> > public void splitNamespaceBundle(
> >> >             @Suspended final AsyncResponse asyncResponse,
> >> >             @PathParam("property") String property,
> >> >             @PathParam("cluster") String cluster,
> >> >             @PathParam("namespace") String namespace,
> >> >             @PathParam("bundle") String bundleRange,
> >> >             @QueryParam("authoritative") @DefaultValue("false") boolean
> >> > authoritative,
> >> >             @QueryParam("unload") @DefaultValue("false") boolean unload,
> >> >             @QueryParam("topic") @DefaultValue("") String topic) {}
> >> > ```
> >> >
> >> > ```shell
> >> >
> >> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
> >> > ```
> >> >
> >>
> >
> 

Re: [Discuss][PIP-143] Support split paritions belonging to specified topics in a bundle

Posted by PengHui Li <pe...@apache.org>.
Hi Haiting,

> I think this approach have more potential with abnormal topic isolation.
If we can introduce
some kind of bundle isolation strategy, (like broker-bundle affinity and
anti-affinity mechanism), we can easily isolate some unexpected traffic to
some empty brokers.
IMO, this would improve the stability of broker cluster.

if I understand correctly, it looks like if we have a partitioned topic
with 10
partitions under a namespace with 16 bundles, if applies the anti-affinity
policy,
partition-0 map to bundle 0, partition-1 map to bundle 1, and so on.
Of course, it is not necessary for every partitioned topic to start from
bundle 0,
we can use the partition-0 hash to determine the start bundle index.

Do you have an example for affinity? I don't fully understand how this is
used
in practice.

Best,
Penghui

On Fri, Feb 18, 2022 at 11:16 PM PengHui Li <pe...@apache.org> wrote:

> Hi Aloys,
>
> >  Do you mean that
> 1. First, add a new API, maybe `getHashPositioin`,  to get the hash
> position in a bundle
> 2. Then use this position to split the overloaded bundle
> If so, when we split a bundle with multi partitions of a topic, we need to
> call the `getHashPositioin` multi times to get the middle position of all
> these positions.
>
> Yes, this want I mean. In this way, users can control to assign 1 topic or
> 3 topics to one bundle. This is more like increasing the transparency of
> the topic in the bundle, you can all the positions of the topics, so how
> planning for bundle splitting becomes more flexible.
>
> The new API does not necessarily have to query by topic one by one,
> we have listed all the "topic -> position" of a bundle?
>
> Thanks,
> Penghui
>
> On Fri, Feb 18, 2022 at 4:51 PM Haiting Jiang <ji...@apache.org>
> wrote:
>
>> Hi Aloys,
>> +1 for this great PIP.
>>
>> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
>> ${bundle_range}`
>> > will add a new parameter "--topic" or "-t" for  `outstanding topic`
>> name.
>>
>> Do we have limitation on this "topic" parameter. Can this be a
>> partitioned topic?
>> If so, will this new algorithm split the bundle into more than 2 bundles?
>> like each bundle for
>> one partition.
>>
>> > This algorithm has a disadvantage, it can only deal
>> > with one `outstanding topic`.
>>
>> For this disadvantage, I think it can be solved by extends the "topic"
>> parameter from one topic to a topic list.
>>
>> > The other algorithm is to split the bundle at the hashcode point of the
>> > `outstanding partition` which will split the bundle into three bundles
>> once
>> > a time. The middle one contains the only point the hashcode of the
>> > `outstanding partition, the left one is less than the hashcode, the
>> right
>> > one is more than the hashcode.
>> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
>> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
>> > algorithm  is going to split bundle the bundle into five new bundles,
>> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
>> > partition-y), 0x08_0x10.
>>
>> I think this approach have more potential with abnormal topic isolation.
>> If we can introduce
>> some kind of bundle isolation strategy, (like broker-bundle affinity and
>> anti-affinity mechanism), we can easily isolate some unexpected traffic to
>> some empty brokers.
>> IMO, this would improve the stability of broker cluster.
>>
>> Thanks,
>> Haiting
>>
>> On 2022/02/17 15:47:15 Aloys Zhang wrote:
>> > Hi Pulsar Community,
>> >
>> > This is a PIP discussion on how to support split partitions belonging to
>> > specified topics in a bundle.
>> >
>> > The issue can be found: https://github.com/apache/pulsar/issues/13761
>> >
>> > I copy the content here for convenience, any suggestions are welcome and
>> > appreciated.
>> >
>> >
>> > ## Motivation
>> >
>> > As we all know, a namespace bundle may contain lots of partitions
>> belonging
>> > to different topics.
>> > The throughput of these topics may vary greatly. Some topics may with
>> very
>> > high rate/throughput while other topics have a very low rate/throughput.
>> >
>> > These partitions with high rate/throughput can cause broker overload and
>> > bundle unloading.
>> > At this point, if we split bundle manually with `range_equally_divide`
>> or
>> > `topic_count_equally_divide` split algorithm, there may need many times
>> > split before these high rate/through partitions assigned to different
>> new
>> > bundles.
>> >
>> > For convenience, we call these high throughput topics `outstanding
>> topic`
>> > and their partitions `outstanding partition` in this PIP.
>> >
>> > ## Goal
>> >
>> > Our goal is to make it easier to split `outstanding partition` into new
>> > bundles.
>> >
>> > There are two alternative ways to achieve this. Either of them will add
>> a
>> > new algorithm for bundle split. The difference is how the new bundle
>> split
>> > algorithm is implemented.
>> >
>> > One algorithm is to split bundle by `outstanding topic` which will split
>> > the bundle into two new bundles and each new bundle contains an equally
>> > `outstanding partition` once a time.
>> > E.g, a bundle contains lots of topic partitions, and only one
>> `outstanding
>> > topic`(T) with 2  `outstanding partition` (T-partition-n,
>> Tpartition-n+1).
>> > This algorithm split this bundle at the middle point of these two
>> > partition's hashcode.  This algorithm has a disadvantage, it can only
>> deal
>> > with one `outstanding topic`.
>> >
>> > So we raised up another algorithm.
>> >
>> > The other algorithm is to split the bundle at the hashcode point of the
>> > `outstanding partition` which will split the bundle into three bundles
>> once
>> > a time. The middle one contains the only point the hashcode of the
>> > `outstanding partition, the left one is less than the hashcode, the
>> right
>> > one is more than the hashcode.
>> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
>> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
>> > algorithm  is going to split bundle the bundle into five new bundles,
>> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
>> > partition-y), 0x08_0x10.
>> >
>> > ## API Changes
>> >
>> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
>> ${bundle_range}`
>> > will add a new parameter "--topic" or "-t" for  `outstanding topic`
>> name.
>> >
>> > The split interface changed from
>> >
>> > ```JAVA
>> > void splitNamespaceBundle(String namespace, String bundle, boolean
>> > unloadSplitBundles, String splitAlgorithmName)throws
>> PulsarAdminException;
>> > ```
>> >
>> > to
>> >
>> > ```java
>> > void splitNamespaceBundle(String namespace, String bundle, boolean
>> > unloadSplitBundles,
>> >                               String splitAlgorithmName, String topic)
>> > throws PulsarAdminException;
>> > ```
>> >
>> > ## Implementation
>> >
>> > There are changes both from the Admin CLI and the broker side.
>> >
>> > First, Admin CLI for split bundle should support to specify the
>> > `outstanding topic`,
>> >
>> > ```java
>> > /**
>> >      * Split namespace bundle.
>> >      *
>> >      * @param namespace
>> >      * @param bundle range of bundle to split
>> >      * @param unloadSplitBundles
>> >      * @param splitAlgorithmName
>> >      * @param topic
>> >      * @throws PulsarAdminException
>> >      */
>> >     void splitNamespaceBundle(String namespace, String bundle, boolean
>> > unloadSplitBundles,
>> >                               String splitAlgorithmName, String topic)
>> > throws PulsarAdminException;
>> >
>> > ```
>> >
>> > ```java
>> > /**
>> >      * Split namespace bundle asynchronously.
>> >      *
>> >      * @param namespace
>> >      * @param bundle range of bundle to split
>> >      * @param unloadSplitBundles
>> >      * @param splitAlgorithmName
>> >      */
>> >     CompletableFuture<Void> splitNamespaceBundleAsync(
>> >             String namespace, String bundle, boolean unloadSplitBundles,
>> > String splitAlgorithmName, String topic);
>> > ```
>> >
>> > And for the broker side, first encapsulates the parameters for bundle
>> split
>> > into a new class `BundleSplitOption`
>> >
>> > ```java
>> > public class BundleSplitOption {
>> >     private NamespaceService service;
>> >     private NamespaceBundle bundle;
>> >     private String topic;
>> > }
>> > ```
>> >
>> > add a new split algorithm
>> >
>> > ```java
>> > ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm
>> implements
>> > NamespaceBundleSplitAlgorithm {
>> >     @Override
>> >     public CompletableFuture<List<Long>>
>> getSplitBoundary(BundleSplitOption
>> > bundleSplitOption) {
>> >
>> >         });
>> >     }
>> > }
>> > ```
>> >
>> > add the new algorithm to `NamespaceBundleSplitAlgorithm`
>> >
>> > ```JAVA
>> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
>> > "specified_topic_count_equally_divide";
>> >
>> > List<String> AVAILABLE_ALGORITHMS =
>> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
>> >             TOPIC_COUNT_EQUALLY_DIVIDE,
>> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
>> >
>> >  NamespaceBundleSplitAlgorithm
>> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
>> >             new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
>> > ```
>> >
>> > modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` for
>> >  [[NamespaceService.java](
>> >
>> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1
>> )
>> >
>> >
>> > ```java
>> > public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle,
>> > boolean unload,
>> >
>> >  NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {
>> >
>> >         final CompletableFuture<Void> unloadFuture = new
>> > CompletableFuture<>();
>> >         final AtomicInteger counter = new
>> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
>> >         splitAndOwnBundleOnceAndRetry(bundle, unload, counter,
>> > unloadFuture, splitAlgorithm, topic);
>> >
>> >         return unloadFuture;
>> >     }
>> > ```
>> >
>> > ```java
>> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
>> >                                        boolean unload,
>> >                                        AtomicInteger counter,
>> >                                        CompletableFuture<Void>
>> > completionFuture,
>> >                                        NamespaceBundleSplitAlgorithm
>> > splitAlgorithm,
>> >                                        String topic) {
>> > ```
>> >
>> > Also, we change the REST api and broker.conf
>> >
>> > ```java
>> > public void splitNamespaceBundle(
>> >             @Suspended final AsyncResponse asyncResponse,
>> >             @PathParam("property") String property,
>> >             @PathParam("cluster") String cluster,
>> >             @PathParam("namespace") String namespace,
>> >             @PathParam("bundle") String bundleRange,
>> >             @QueryParam("authoritative") @DefaultValue("false") boolean
>> > authoritative,
>> >             @QueryParam("unload") @DefaultValue("false") boolean unload,
>> >             @QueryParam("topic") @DefaultValue("") String topic) {}
>> > ```
>> >
>> > ```shell
>> >
>> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
>> > ```
>> >
>>
>

Re: [Discuss][PIP-143] Support split paritions belonging to specified topics in a bundle

Posted by PengHui Li <pe...@apache.org>.
Hi Aloys,

>  Do you mean that
1. First, add a new API, maybe `getHashPositioin`,  to get the hash
position in a bundle
2. Then use this position to split the overloaded bundle
If so, when we split a bundle with multi partitions of a topic, we need to
call the `getHashPositioin` multi times to get the middle position of all
these positions.

Yes, this want I mean. In this way, users can control to assign 1 topic or
3 topics to one bundle. This is more like increasing the transparency of
the topic in the bundle, you can all the positions of the topics, so how
planning for bundle splitting becomes more flexible.

The new API does not necessarily have to query by topic one by one,
we have listed all the "topic -> position" of a bundle?

Thanks,
Penghui

On Fri, Feb 18, 2022 at 4:51 PM Haiting Jiang <ji...@apache.org>
wrote:

> Hi Aloys,
> +1 for this great PIP.
>
> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
> ${bundle_range}`
> > will add a new parameter "--topic" or "-t" for  `outstanding topic` name.
>
> Do we have limitation on this "topic" parameter. Can this be a partitioned
> topic?
> If so, will this new algorithm split the bundle into more than 2 bundles?
> like each bundle for
> one partition.
>
> > This algorithm has a disadvantage, it can only deal
> > with one `outstanding topic`.
>
> For this disadvantage, I think it can be solved by extends the "topic"
> parameter from one topic to a topic list.
>
> > The other algorithm is to split the bundle at the hashcode point of the
> > `outstanding partition` which will split the bundle into three bundles
> once
> > a time. The middle one contains the only point the hashcode of the
> > `outstanding partition, the left one is less than the hashcode, the right
> > one is more than the hashcode.
> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> > algorithm  is going to split bundle the bundle into five new bundles,
> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> > partition-y), 0x08_0x10.
>
> I think this approach have more potential with abnormal topic isolation.
> If we can introduce
> some kind of bundle isolation strategy, (like broker-bundle affinity and
> anti-affinity mechanism), we can easily isolate some unexpected traffic to
> some empty brokers.
> IMO, this would improve the stability of broker cluster.
>
> Thanks,
> Haiting
>
> On 2022/02/17 15:47:15 Aloys Zhang wrote:
> > Hi Pulsar Community,
> >
> > This is a PIP discussion on how to support split partitions belonging to
> > specified topics in a bundle.
> >
> > The issue can be found: https://github.com/apache/pulsar/issues/13761
> >
> > I copy the content here for convenience, any suggestions are welcome and
> > appreciated.
> >
> >
> > ## Motivation
> >
> > As we all know, a namespace bundle may contain lots of partitions
> belonging
> > to different topics.
> > The throughput of these topics may vary greatly. Some topics may with
> very
> > high rate/throughput while other topics have a very low rate/throughput.
> >
> > These partitions with high rate/throughput can cause broker overload and
> > bundle unloading.
> > At this point, if we split bundle manually with `range_equally_divide` or
> > `topic_count_equally_divide` split algorithm, there may need many times
> > split before these high rate/through partitions assigned to different new
> > bundles.
> >
> > For convenience, we call these high throughput topics `outstanding topic`
> > and their partitions `outstanding partition` in this PIP.
> >
> > ## Goal
> >
> > Our goal is to make it easier to split `outstanding partition` into new
> > bundles.
> >
> > There are two alternative ways to achieve this. Either of them will add a
> > new algorithm for bundle split. The difference is how the new bundle
> split
> > algorithm is implemented.
> >
> > One algorithm is to split bundle by `outstanding topic` which will split
> > the bundle into two new bundles and each new bundle contains an equally
> > `outstanding partition` once a time.
> > E.g, a bundle contains lots of topic partitions, and only one
> `outstanding
> > topic`(T) with 2  `outstanding partition` (T-partition-n,
> Tpartition-n+1).
> > This algorithm split this bundle at the middle point of these two
> > partition's hashcode.  This algorithm has a disadvantage, it can only
> deal
> > with one `outstanding topic`.
> >
> > So we raised up another algorithm.
> >
> > The other algorithm is to split the bundle at the hashcode point of the
> > `outstanding partition` which will split the bundle into three bundles
> once
> > a time. The middle one contains the only point the hashcode of the
> > `outstanding partition, the left one is less than the hashcode, the right
> > one is more than the hashcode.
> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> > algorithm  is going to split bundle the bundle into five new bundles,
> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> > partition-y), 0x08_0x10.
> >
> > ## API Changes
> >
> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
> ${bundle_range}`
> > will add a new parameter "--topic" or "-t" for  `outstanding topic` name.
> >
> > The split interface changed from
> >
> > ```JAVA
> > void splitNamespaceBundle(String namespace, String bundle, boolean
> > unloadSplitBundles, String splitAlgorithmName)throws
> PulsarAdminException;
> > ```
> >
> > to
> >
> > ```java
> > void splitNamespaceBundle(String namespace, String bundle, boolean
> > unloadSplitBundles,
> >                               String splitAlgorithmName, String topic)
> > throws PulsarAdminException;
> > ```
> >
> > ## Implementation
> >
> > There are changes both from the Admin CLI and the broker side.
> >
> > First, Admin CLI for split bundle should support to specify the
> > `outstanding topic`,
> >
> > ```java
> > /**
> >      * Split namespace bundle.
> >      *
> >      * @param namespace
> >      * @param bundle range of bundle to split
> >      * @param unloadSplitBundles
> >      * @param splitAlgorithmName
> >      * @param topic
> >      * @throws PulsarAdminException
> >      */
> >     void splitNamespaceBundle(String namespace, String bundle, boolean
> > unloadSplitBundles,
> >                               String splitAlgorithmName, String topic)
> > throws PulsarAdminException;
> >
> > ```
> >
> > ```java
> > /**
> >      * Split namespace bundle asynchronously.
> >      *
> >      * @param namespace
> >      * @param bundle range of bundle to split
> >      * @param unloadSplitBundles
> >      * @param splitAlgorithmName
> >      */
> >     CompletableFuture<Void> splitNamespaceBundleAsync(
> >             String namespace, String bundle, boolean unloadSplitBundles,
> > String splitAlgorithmName, String topic);
> > ```
> >
> > And for the broker side, first encapsulates the parameters for bundle
> split
> > into a new class `BundleSplitOption`
> >
> > ```java
> > public class BundleSplitOption {
> >     private NamespaceService service;
> >     private NamespaceBundle bundle;
> >     private String topic;
> > }
> > ```
> >
> > add a new split algorithm
> >
> > ```java
> > ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm
> implements
> > NamespaceBundleSplitAlgorithm {
> >     @Override
> >     public CompletableFuture<List<Long>>
> getSplitBoundary(BundleSplitOption
> > bundleSplitOption) {
> >
> >         });
> >     }
> > }
> > ```
> >
> > add the new algorithm to `NamespaceBundleSplitAlgorithm`
> >
> > ```JAVA
> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
> > "specified_topic_count_equally_divide";
> >
> > List<String> AVAILABLE_ALGORITHMS =
> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
> >             TOPIC_COUNT_EQUALLY_DIVIDE,
> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
> >
> >  NamespaceBundleSplitAlgorithm SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO
> =
> >             new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
> > ```
> >
> > modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` for
> >  [[NamespaceService.java](
> >
> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1
> )
> >
> >
> > ```java
> > public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle,
> > boolean unload,
> >
> >  NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {
> >
> >         final CompletableFuture<Void> unloadFuture = new
> > CompletableFuture<>();
> >         final AtomicInteger counter = new
> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
> >         splitAndOwnBundleOnceAndRetry(bundle, unload, counter,
> > unloadFuture, splitAlgorithm, topic);
> >
> >         return unloadFuture;
> >     }
> > ```
> >
> > ```java
> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
> >                                        boolean unload,
> >                                        AtomicInteger counter,
> >                                        CompletableFuture<Void>
> > completionFuture,
> >                                        NamespaceBundleSplitAlgorithm
> > splitAlgorithm,
> >                                        String topic) {
> > ```
> >
> > Also, we change the REST api and broker.conf
> >
> > ```java
> > public void splitNamespaceBundle(
> >             @Suspended final AsyncResponse asyncResponse,
> >             @PathParam("property") String property,
> >             @PathParam("cluster") String cluster,
> >             @PathParam("namespace") String namespace,
> >             @PathParam("bundle") String bundleRange,
> >             @QueryParam("authoritative") @DefaultValue("false") boolean
> > authoritative,
> >             @QueryParam("unload") @DefaultValue("false") boolean unload,
> >             @QueryParam("topic") @DefaultValue("") String topic) {}
> > ```
> >
> > ```shell
> >
> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
> > ```
> >
>

Re: [Discuss][PIP-143] Support split paritions belonging to specified topics in a bundle

Posted by Haiting Jiang <ji...@apache.org>.
Hi Aloys,
+1 for this great PIP.

> The Admin CLI `bin/pulsar-admin namespaces split-bundle -b ${bundle_range}`
> will add a new parameter "--topic" or "-t" for  `outstanding topic` name.

Do we have limitation on this "topic" parameter. Can this be a partitioned topic?
If so, will this new algorithm split the bundle into more than 2 bundles? like each bundle for
one partition.

> This algorithm has a disadvantage, it can only deal
> with one `outstanding topic`.

For this disadvantage, I think it can be solved by extends the "topic" parameter from one topic to a topic list.

> The other algorithm is to split the bundle at the hashcode point of the
> `outstanding partition` which will split the bundle into three bundles once
> a time. The middle one contains the only point the hashcode of the
> `outstanding partition, the left one is less than the hashcode, the right
> one is more than the hashcode.
> E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
> (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> algorithm  is going to split bundle the bundle into five new bundles,
> 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> partition-y), 0x08_0x10.

I think this approach have more potential with abnormal topic isolation. If we can introduce 
some kind of bundle isolation strategy, (like broker-bundle affinity and anti-affinity mechanism), we can easily isolate some unexpected traffic to some empty brokers. 
IMO, this would improve the stability of broker cluster.

Thanks,
Haiting

On 2022/02/17 15:47:15 Aloys Zhang wrote:
> Hi Pulsar Community,
> 
> This is a PIP discussion on how to support split partitions belonging to
> specified topics in a bundle.
> 
> The issue can be found: https://github.com/apache/pulsar/issues/13761
> 
> I copy the content here for convenience, any suggestions are welcome and
> appreciated.
> 
> 
> ## Motivation
> 
> As we all know, a namespace bundle may contain lots of partitions belonging
> to different topics.
> The throughput of these topics may vary greatly. Some topics may with very
> high rate/throughput while other topics have a very low rate/throughput.
> 
> These partitions with high rate/throughput can cause broker overload and
> bundle unloading.
> At this point, if we split bundle manually with `range_equally_divide` or
> `topic_count_equally_divide` split algorithm, there may need many times
> split before these high rate/through partitions assigned to different new
> bundles.
> 
> For convenience, we call these high throughput topics `outstanding topic`
> and their partitions `outstanding partition` in this PIP.
> 
> ## Goal
> 
> Our goal is to make it easier to split `outstanding partition` into new
> bundles.
> 
> There are two alternative ways to achieve this. Either of them will add a
> new algorithm for bundle split. The difference is how the new bundle split
> algorithm is implemented.
> 
> One algorithm is to split bundle by `outstanding topic` which will split
> the bundle into two new bundles and each new bundle contains an equally
> `outstanding partition` once a time.
> E.g, a bundle contains lots of topic partitions, and only one `outstanding
> topic`(T) with 2  `outstanding partition` (T-partition-n, Tpartition-n+1).
> This algorithm split this bundle at the middle point of these two
> partition's hashcode.  This algorithm has a disadvantage, it can only deal
> with one `outstanding topic`.
> 
> So we raised up another algorithm.
> 
> The other algorithm is to split the bundle at the hashcode point of the
> `outstanding partition` which will split the bundle into three bundles once
> a time. The middle one contains the only point the hashcode of the
> `outstanding partition, the left one is less than the hashcode, the right
> one is more than the hashcode.
> E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
> (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> algorithm  is going to split bundle the bundle into five new bundles,
> 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> partition-y), 0x08_0x10.
> 
> ## API Changes
> 
> The Admin CLI `bin/pulsar-admin namespaces split-bundle -b ${bundle_range}`
> will add a new parameter "--topic" or "-t" for  `outstanding topic` name.
> 
> The split interface changed from
> 
> ```JAVA
> void splitNamespaceBundle(String namespace, String bundle, boolean
> unloadSplitBundles, String splitAlgorithmName)throws PulsarAdminException;
> ```
> 
> to
> 
> ```java
> void splitNamespaceBundle(String namespace, String bundle, boolean
> unloadSplitBundles,
>                               String splitAlgorithmName, String topic)
> throws PulsarAdminException;
> ```
> 
> ## Implementation
> 
> There are changes both from the Admin CLI and the broker side.
> 
> First, Admin CLI for split bundle should support to specify the
> `outstanding topic`,
> 
> ```java
> /**
>      * Split namespace bundle.
>      *
>      * @param namespace
>      * @param bundle range of bundle to split
>      * @param unloadSplitBundles
>      * @param splitAlgorithmName
>      * @param topic
>      * @throws PulsarAdminException
>      */
>     void splitNamespaceBundle(String namespace, String bundle, boolean
> unloadSplitBundles,
>                               String splitAlgorithmName, String topic)
> throws PulsarAdminException;
> 
> ```
> 
> ```java
> /**
>      * Split namespace bundle asynchronously.
>      *
>      * @param namespace
>      * @param bundle range of bundle to split
>      * @param unloadSplitBundles
>      * @param splitAlgorithmName
>      */
>     CompletableFuture<Void> splitNamespaceBundleAsync(
>             String namespace, String bundle, boolean unloadSplitBundles,
> String splitAlgorithmName, String topic);
> ```
> 
> And for the broker side, first encapsulates the parameters for bundle split
> into a new class `BundleSplitOption`
> 
> ```java
> public class BundleSplitOption {
>     private NamespaceService service;
>     private NamespaceBundle bundle;
>     private String topic;
> }
> ```
> 
> add a new split algorithm
> 
> ```java
> ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm implements
> NamespaceBundleSplitAlgorithm {
>     @Override
>     public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption
> bundleSplitOption) {
> 
>         });
>     }
> }
> ```
> 
> add the new algorithm to `NamespaceBundleSplitAlgorithm`
> 
> ```JAVA
> String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
> "specified_topic_count_equally_divide";
> 
> List<String> AVAILABLE_ALGORITHMS =
> Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
>             TOPIC_COUNT_EQUALLY_DIVIDE,
> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
> 
>  NamespaceBundleSplitAlgorithm SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
>             new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
> ```
> 
> modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` for
>  [[NamespaceService.java](
> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)
> 
> 
> ```java
> public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle,
> boolean unload,
> 
>  NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {
> 
>         final CompletableFuture<Void> unloadFuture = new
> CompletableFuture<>();
>         final AtomicInteger counter = new
> AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
>         splitAndOwnBundleOnceAndRetry(bundle, unload, counter,
> unloadFuture, splitAlgorithm, topic);
> 
>         return unloadFuture;
>     }
> ```
> 
> ```java
> void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
>                                        boolean unload,
>                                        AtomicInteger counter,
>                                        CompletableFuture<Void>
> completionFuture,
>                                        NamespaceBundleSplitAlgorithm
> splitAlgorithm,
>                                        String topic) {
> ```
> 
> Also, we change the REST api and broker.conf
> 
> ```java
> public void splitNamespaceBundle(
>             @Suspended final AsyncResponse asyncResponse,
>             @PathParam("property") String property,
>             @PathParam("cluster") String cluster,
>             @PathParam("namespace") String namespace,
>             @PathParam("bundle") String bundleRange,
>             @QueryParam("authoritative") @DefaultValue("false") boolean
> authoritative,
>             @QueryParam("unload") @DefaultValue("false") boolean unload,
>             @QueryParam("topic") @DefaultValue("") String topic) {}
> ```
> 
> ```shell
> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
> ```
> 

Re: [Discuss][PIP-143] Support split paritions belonging to specified topics in a bundle

Posted by Aloys Zhang <al...@apache.org>.
Hi Penghui,


> I am considering if we can add a boundary param for split bundle API,
> The boundary must be between the start and the end of the bundle.
> looks like the followings:
>
> ```java
> void splitNamespaceBundle(String namespace, String bundle, boolean
> unloadSplitBundles,
>                               String splitAlgorithmName, long boundary)
> throws PulsarAdminException;
> ```
>
> And, provide a new API to get the position in the bundle for a topic which
> can be used to determine the split boundary of the bundle when splitting
> a bundle.
>

 Do you mean that
1. First, add a new API, maybe `getHashPositioin`,  to get the hash
position in a bundle
2. Then use this position to split the overloaded bundle
If so, when we split a bundle with multi partitions of a topic, we need to
call the `getHashPositioin` multi times to get the middle position of all
these positions.

Looks like currently we only have a bundle assign allocation strategy
> based on the topic name hash, maybe we can also consider taking advantage
> of other characteristics of a topic to choose a different bundle.
>
It makes sense for me, sounds like a re-hash. It seems like a new way to
assign topics to a  bundle, but not a bundle split algorithm. I think we
can raise another feature or PIP for this idea.



PengHui Li <pe...@apache.org> 于2022年2月18日周五 08:54写道:

> Hi Aloys,
>
> Thanks for the great proposal.
>
> I am considering if we can add a boundary param for split bundle API,
> The boundary must be between the start and the end of the bundle.
> looks like the followings:
>
> ```java
> void splitNamespaceBundle(String namespace, String bundle, boolean
> unloadSplitBundles,
>                               String splitAlgorithmName, long boundary)
> throws PulsarAdminException;
> ```
>
> And, provide a new API to get the position in the bundle for a topic which
> can be used to determine the split boundary of the bundle when splitting
> a bundle.
>
> Looks like currently we only have a bundle assign allocation strategy
> based on the topic name hash, maybe we can also consider taking advantage
> of
> other characteristics of a topic to choose a different bundle. Just a rough
> idea.
> This may be beyond the scope of this proposal.
>
> Thanks,
> Penghui
>
> On Thu, Feb 17, 2022 at 11:47 PM Aloys Zhang <al...@apache.org>
> wrote:
>
> > Hi Pulsar Community,
> >
> > This is a PIP discussion on how to support split partitions belonging to
> > specified topics in a bundle.
> >
> > The issue can be found: https://github.com/apache/pulsar/issues/13761
> >
> > I copy the content here for convenience, any suggestions are welcome and
> > appreciated.
> >
> >
> > ## Motivation
> >
> > As we all know, a namespace bundle may contain lots of partitions
> belonging
> > to different topics.
> > The throughput of these topics may vary greatly. Some topics may with
> very
> > high rate/throughput while other topics have a very low rate/throughput.
> >
> > These partitions with high rate/throughput can cause broker overload and
> > bundle unloading.
> > At this point, if we split bundle manually with `range_equally_divide` or
> > `topic_count_equally_divide` split algorithm, there may need many times
> > split before these high rate/through partitions assigned to different new
> > bundles.
> >
> > For convenience, we call these high throughput topics `outstanding topic`
> > and their partitions `outstanding partition` in this PIP.
> >
> > ## Goal
> >
> > Our goal is to make it easier to split `outstanding partition` into new
> > bundles.
> >
> > There are two alternative ways to achieve this. Either of them will add a
> > new algorithm for bundle split. The difference is how the new bundle
> split
> > algorithm is implemented.
> >
> > One algorithm is to split bundle by `outstanding topic` which will split
> > the bundle into two new bundles and each new bundle contains an equally
> > `outstanding partition` once a time.
> > E.g, a bundle contains lots of topic partitions, and only one
> `outstanding
> > topic`(T) with 2  `outstanding partition` (T-partition-n,
> Tpartition-n+1).
> > This algorithm split this bundle at the middle point of these two
> > partition's hashcode.  This algorithm has a disadvantage, it can only
> deal
> > with one `outstanding topic`.
> >
> > So we raised up another algorithm.
> >
> > The other algorithm is to split the bundle at the hashcode point of the
> > `outstanding partition` which will split the bundle into three bundles
> once
> > a time. The middle one contains the only point the hashcode of the
> > `outstanding partition, the left one is less than the hashcode, the right
> > one is more than the hashcode.
> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> > algorithm  is going to split bundle the bundle into five new bundles,
> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> > partition-y), 0x08_0x10.
> >
> > ## API Changes
> >
> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
> ${bundle_range}`
> > will add a new parameter "--topic" or "-t" for  `outstanding topic` name.
> >
> > The split interface changed from
> >
> > ```JAVA
> > void splitNamespaceBundle(String namespace, String bundle, boolean
> > unloadSplitBundles, String splitAlgorithmName)throws
> PulsarAdminException;
> > ```
> >
> > to
> >
> > ```java
> > void splitNamespaceBundle(String namespace, String bundle, boolean
> > unloadSplitBundles,
> >                               String splitAlgorithmName, String topic)
> > throws PulsarAdminException;
> > ```
> >
> > ## Implementation
> >
> > There are changes both from the Admin CLI and the broker side.
> >
> > First, Admin CLI for split bundle should support to specify the
> > `outstanding topic`,
> >
> > ```java
> > /**
> >      * Split namespace bundle.
> >      *
> >      * @param namespace
> >      * @param bundle range of bundle to split
> >      * @param unloadSplitBundles
> >      * @param splitAlgorithmName
> >      * @param topic
> >      * @throws PulsarAdminException
> >      */
> >     void splitNamespaceBundle(String namespace, String bundle, boolean
> > unloadSplitBundles,
> >                               String splitAlgorithmName, String topic)
> > throws PulsarAdminException;
> >
> > ```
> >
> > ```java
> > /**
> >      * Split namespace bundle asynchronously.
> >      *
> >      * @param namespace
> >      * @param bundle range of bundle to split
> >      * @param unloadSplitBundles
> >      * @param splitAlgorithmName
> >      */
> >     CompletableFuture<Void> splitNamespaceBundleAsync(
> >             String namespace, String bundle, boolean unloadSplitBundles,
> > String splitAlgorithmName, String topic);
> > ```
> >
> > And for the broker side, first encapsulates the parameters for bundle
> split
> > into a new class `BundleSplitOption`
> >
> > ```java
> > public class BundleSplitOption {
> >     private NamespaceService service;
> >     private NamespaceBundle bundle;
> >     private String topic;
> > }
> > ```
> >
> > add a new split algorithm
> >
> > ```java
> > ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm
> implements
> > NamespaceBundleSplitAlgorithm {
> >     @Override
> >     public CompletableFuture<List<Long>>
> getSplitBoundary(BundleSplitOption
> > bundleSplitOption) {
> >
> >         });
> >     }
> > }
> > ```
> >
> > add the new algorithm to `NamespaceBundleSplitAlgorithm`
> >
> > ```JAVA
> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
> > "specified_topic_count_equally_divide";
> >
> > List<String> AVAILABLE_ALGORITHMS =
> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
> >             TOPIC_COUNT_EQUALLY_DIVIDE,
> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
> >
> >  NamespaceBundleSplitAlgorithm SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO
> =
> >             new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
> > ```
> >
> > modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` for
> >  [[NamespaceService.java](
> >
> >
> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1
> > )
> >
> >
> > ```java
> > public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle,
> > boolean unload,
> >
> >  NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {
> >
> >         final CompletableFuture<Void> unloadFuture = new
> > CompletableFuture<>();
> >         final AtomicInteger counter = new
> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
> >         splitAndOwnBundleOnceAndRetry(bundle, unload, counter,
> > unloadFuture, splitAlgorithm, topic);
> >
> >         return unloadFuture;
> >     }
> > ```
> >
> > ```java
> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
> >                                        boolean unload,
> >                                        AtomicInteger counter,
> >                                        CompletableFuture<Void>
> > completionFuture,
> >                                        NamespaceBundleSplitAlgorithm
> > splitAlgorithm,
> >                                        String topic) {
> > ```
> >
> > Also, we change the REST api and broker.conf
> >
> > ```java
> > public void splitNamespaceBundle(
> >             @Suspended final AsyncResponse asyncResponse,
> >             @PathParam("property") String property,
> >             @PathParam("cluster") String cluster,
> >             @PathParam("namespace") String namespace,
> >             @PathParam("bundle") String bundleRange,
> >             @QueryParam("authoritative") @DefaultValue("false") boolean
> > authoritative,
> >             @QueryParam("unload") @DefaultValue("false") boolean unload,
> >             @QueryParam("topic") @DefaultValue("") String topic) {}
> > ```
> >
> > ```shell
> >
> >
> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
> > ```
> >
>

Re: [Discuss][PIP-143] Support split paritions belonging to specified topics in a bundle

Posted by PengHui Li <pe...@apache.org>.
Hi Aloys,

Thanks for the great proposal.

I am considering if we can add a boundary param for split bundle API,
The boundary must be between the start and the end of the bundle.
looks like the followings:

```java
void splitNamespaceBundle(String namespace, String bundle, boolean
unloadSplitBundles,
                              String splitAlgorithmName, long boundary)
throws PulsarAdminException;
```

And, provide a new API to get the position in the bundle for a topic which
can be used to determine the split boundary of the bundle when splitting
a bundle.

Looks like currently we only have a bundle assign allocation strategy
based on the topic name hash, maybe we can also consider taking advantage
of
other characteristics of a topic to choose a different bundle. Just a rough
idea.
This may be beyond the scope of this proposal.

Thanks,
Penghui

On Thu, Feb 17, 2022 at 11:47 PM Aloys Zhang <al...@apache.org> wrote:

> Hi Pulsar Community,
>
> This is a PIP discussion on how to support split partitions belonging to
> specified topics in a bundle.
>
> The issue can be found: https://github.com/apache/pulsar/issues/13761
>
> I copy the content here for convenience, any suggestions are welcome and
> appreciated.
>
>
> ## Motivation
>
> As we all know, a namespace bundle may contain lots of partitions belonging
> to different topics.
> The throughput of these topics may vary greatly. Some topics may with very
> high rate/throughput while other topics have a very low rate/throughput.
>
> These partitions with high rate/throughput can cause broker overload and
> bundle unloading.
> At this point, if we split bundle manually with `range_equally_divide` or
> `topic_count_equally_divide` split algorithm, there may need many times
> split before these high rate/through partitions assigned to different new
> bundles.
>
> For convenience, we call these high throughput topics `outstanding topic`
> and their partitions `outstanding partition` in this PIP.
>
> ## Goal
>
> Our goal is to make it easier to split `outstanding partition` into new
> bundles.
>
> There are two alternative ways to achieve this. Either of them will add a
> new algorithm for bundle split. The difference is how the new bundle split
> algorithm is implemented.
>
> One algorithm is to split bundle by `outstanding topic` which will split
> the bundle into two new bundles and each new bundle contains an equally
> `outstanding partition` once a time.
> E.g, a bundle contains lots of topic partitions, and only one `outstanding
> topic`(T) with 2  `outstanding partition` (T-partition-n, Tpartition-n+1).
> This algorithm split this bundle at the middle point of these two
> partition's hashcode.  This algorithm has a disadvantage, it can only deal
> with one `outstanding topic`.
>
> So we raised up another algorithm.
>
> The other algorithm is to split the bundle at the hashcode point of the
> `outstanding partition` which will split the bundle into three bundles once
> a time. The middle one contains the only point the hashcode of the
> `outstanding partition, the left one is less than the hashcode, the right
> one is more than the hashcode.
> E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
> (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> algorithm  is going to split bundle the bundle into five new bundles,
> 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> partition-y), 0x08_0x10.
>
> ## API Changes
>
> The Admin CLI `bin/pulsar-admin namespaces split-bundle -b ${bundle_range}`
> will add a new parameter "--topic" or "-t" for  `outstanding topic` name.
>
> The split interface changed from
>
> ```JAVA
> void splitNamespaceBundle(String namespace, String bundle, boolean
> unloadSplitBundles, String splitAlgorithmName)throws PulsarAdminException;
> ```
>
> to
>
> ```java
> void splitNamespaceBundle(String namespace, String bundle, boolean
> unloadSplitBundles,
>                               String splitAlgorithmName, String topic)
> throws PulsarAdminException;
> ```
>
> ## Implementation
>
> There are changes both from the Admin CLI and the broker side.
>
> First, Admin CLI for split bundle should support to specify the
> `outstanding topic`,
>
> ```java
> /**
>      * Split namespace bundle.
>      *
>      * @param namespace
>      * @param bundle range of bundle to split
>      * @param unloadSplitBundles
>      * @param splitAlgorithmName
>      * @param topic
>      * @throws PulsarAdminException
>      */
>     void splitNamespaceBundle(String namespace, String bundle, boolean
> unloadSplitBundles,
>                               String splitAlgorithmName, String topic)
> throws PulsarAdminException;
>
> ```
>
> ```java
> /**
>      * Split namespace bundle asynchronously.
>      *
>      * @param namespace
>      * @param bundle range of bundle to split
>      * @param unloadSplitBundles
>      * @param splitAlgorithmName
>      */
>     CompletableFuture<Void> splitNamespaceBundleAsync(
>             String namespace, String bundle, boolean unloadSplitBundles,
> String splitAlgorithmName, String topic);
> ```
>
> And for the broker side, first encapsulates the parameters for bundle split
> into a new class `BundleSplitOption`
>
> ```java
> public class BundleSplitOption {
>     private NamespaceService service;
>     private NamespaceBundle bundle;
>     private String topic;
> }
> ```
>
> add a new split algorithm
>
> ```java
> ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm implements
> NamespaceBundleSplitAlgorithm {
>     @Override
>     public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption
> bundleSplitOption) {
>
>         });
>     }
> }
> ```
>
> add the new algorithm to `NamespaceBundleSplitAlgorithm`
>
> ```JAVA
> String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
> "specified_topic_count_equally_divide";
>
> List<String> AVAILABLE_ALGORITHMS =
> Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
>             TOPIC_COUNT_EQUALLY_DIVIDE,
> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
>
>  NamespaceBundleSplitAlgorithm SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
>             new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
> ```
>
> modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` for
>  [[NamespaceService.java](
>
> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1
> )
>
>
> ```java
> public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle,
> boolean unload,
>
>  NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {
>
>         final CompletableFuture<Void> unloadFuture = new
> CompletableFuture<>();
>         final AtomicInteger counter = new
> AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
>         splitAndOwnBundleOnceAndRetry(bundle, unload, counter,
> unloadFuture, splitAlgorithm, topic);
>
>         return unloadFuture;
>     }
> ```
>
> ```java
> void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
>                                        boolean unload,
>                                        AtomicInteger counter,
>                                        CompletableFuture<Void>
> completionFuture,
>                                        NamespaceBundleSplitAlgorithm
> splitAlgorithm,
>                                        String topic) {
> ```
>
> Also, we change the REST api and broker.conf
>
> ```java
> public void splitNamespaceBundle(
>             @Suspended final AsyncResponse asyncResponse,
>             @PathParam("property") String property,
>             @PathParam("cluster") String cluster,
>             @PathParam("namespace") String namespace,
>             @PathParam("bundle") String bundleRange,
>             @QueryParam("authoritative") @DefaultValue("false") boolean
> authoritative,
>             @QueryParam("unload") @DefaultValue("false") boolean unload,
>             @QueryParam("topic") @DefaultValue("") String topic) {}
> ```
>
> ```shell
>
> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
> ```
>