You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Andrey Yegorov <ay...@apache.org> on 2023/01/13 04:35:36 UTC

[DISCUSS] PIP-241: TopicEventListener / topic events for the BrokerService

Hi,

I am starting discussion for PIP-241: TopicEventListener / topic events for
the BrokerService.

PIP issue: https://github.com/apache/pulsar/issues/19224

### Motivation

Some Protocol Handlers may need to know about the topic-specific events to
update internal caches and/or state.

These mechanisms will be useful also for core Pulsar components (like the
Transactions subsystem) and probably we would be able to simplify the
interaction between the internal components in the broker by using an
unified mechanism to handle the lifecycle of topics.

Specific use cases:

KOP keeps some state for the topic and needs to handle such cases as:

- Topic Unloaded: release resources dedicated to the topic
- Topic Loaded: trigger loading of components tied to the partition
(GroupCoordinator, TransactionManager)
- Topic Deleted: remove any persistent state associated to the topic that
is stored in additional side system topics
- Topic Created: the same as “deleted” (ensure that there is no state on
system topics related to the new topic)


### Goal

This PIP defines a set of events needed for the protocol handlers (and for
internal broker components) to get notifications about topic-specific
events as seen by BrokerService. PIP outlines changes needed for protocol
handlers to keep/cache state consistent with BrokerService’s.

The changes should not affect Pulsar running without protocol handlers or
with protocol handlers that do not rely on the new events.


### API Changes

```java
/**
 * Listener for the Topic events.
 */
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface TopicEventsListener {

    /**
     * Types of events currently supported.
     *  create/load/unload/delete
     */
    enum TopicEvent {
        // create events included into load events
        CREATE,
        LOAD,
        UNLOAD,
        DELETE,
    }

    /**
     * Stages of events currently supported.
     *  before starting the event/successful completion/failed completion
     */
    enum EventStage {
        BEFORE,
        SUCCESS,
        FAILURE
    }

    /**
     * Outcome of the listener.
     * Ignored for events at final stages (SUCCESS/FAILURE),
     *
     */
    enum EventProcessingOutcome {
        OK,
        FAILURE,
        NOT_ALLOWED
    }

    /**
     * POJO for event processing result (outcome, message)
     */
    @ToString(includeFieldNames=true)
    @Data(staticConstructor="of")
    class EventProcessingResult {
        private final EventProcessingOutcome outcome;
        private final String message;
    }

    /**
     * Handle topic event.
     * Choice of the thread / maintenance of the thread pool is up to the
event handlers.
     * @param topicName - name of the topic
     * @param event - TopicEvent
     * @param stage - EventStage
     * @param t - exception in case of FAILURE, if present/known
     * @return - EventProcessingResult.
     *      EventProcessingResult.EventProcessingOutcome != OK indicates
request to cancel
     *           event at BEFORE stage.
     */
    EventProcessingResult handleEvent(String topicName, TopicEvent event,
EventStage stage, Throwable t);
}
```

BrokerService:
```java
    public void addTopicEventListener(TopicEventsListener... listeners)

    public void removeTopicEventListener(TopicEventsListener... listeners)
```

### Implementation

See PR for the proposed implementation.
https://github.com/apache/pulsar/pull/19153


### Alternatives

Add new methods to the BrokerInterceptor API

--
Andrey

Re: [DISCUSS] PIP-241: TopicEventListener / topic events for the BrokerService

Posted by Michael Marshall <mm...@apache.org>.
This proposal looks good to me. I left some minor feedback on the PR.

I agree with Enrico that this would be a helpful framework to use
internally in Pulsar.

Thanks,
Michael

On Thu, Jan 19, 2023 at 10:24 AM Andrey Yegorov
<an...@datastax.com> wrote:
>
> Hi Haiting,
>
> Currently there are no plans to use TopicEvent/EventStage outside of the
> TopicEventsListener.
> We can refactor that if the situation changes and if these will be reusable
> (which I doubt at least for the TopicEvent).
> I'd keep it contained in one file/namespace.
>
> On Thu, Jan 19, 2023 at 2:37 AM Haiting Jiang <ji...@gmail.com>
> wrote:
>
> > Hi Andrey,
> >
> > This PIP makes sense to me.
> > A small question: Is it better to define `TopicEvent` and `EventStage`
> > outside of  `TopicEventsListener`?
> >
> > Thanks,
> > Haiting
> >
> >
> > On Tue, Jan 17, 2023 at 3:51 PM Enrico Olivelli <eo...@gmail.com>
> > wrote:
> > >
> > > I support this PIP.
> > > This will allow Protocol Handlers to have better knowledge of what's
> > > happening on the broker.
> > >
> > > I hope that someday we will be able to refactor Broker internals using
> > > this feature and decouple the components.
> > >
> > > Enrico
> > >
> > > Il giorno lun 16 gen 2023 alle ore 10:27 Yunze Xu
> > > <yz...@streamnative.io.invalid> ha scritto:
> > > >
> > > > +1 to me now.
> > > >
> > > > Thanks,
> > > > Yunze
> > > >
> > > > On Sat, Jan 14, 2023 at 1:29 AM Andrey Yegorov
> > > > <an...@datastax.com> wrote:
> > > > >
> > > > > My bad, I pasted the interface code from a branch with experiment to
> > cancel
> > > > > events. This is no longer needed.
> > > > > EventProcessingResult result is irrelevant, I updated the PIP.
> > > > >
> > > > > It is not in the PR.
> > > > >
> > > > > On Fri, Jan 13, 2023 at 4:08 AM Yunze Xu
> > <yz...@streamnative.io.invalid>
> > > > > wrote:
> > > > >
> > > > > > I have some questions about the EventProcessingResult:
> > > > > > 1. What's the difference between FAILURE and NOT_ALLOWED?
> > > > > > 2. If we need to return the `message`, which indicates the error
> > IIUC,
> > > > > > would it be better to replace the returned value with a checked
> > > > > > exception?
> > > > > >
> > > > > > Thanks,
> > > > > > Yunze
> > > > > >
> > > > > > On Fri, Jan 13, 2023 at 12:36 PM Andrey Yegorov <
> > ayegorov@apache.org>
> > > > > > wrote:
> > > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I am starting discussion for PIP-241: TopicEventListener / topic
> > events
> > > > > > for
> > > > > > > the BrokerService.
> > > > > > >
> > > > > > > PIP issue: https://github.com/apache/pulsar/issues/19224
> > > > > > >
> > > > > > > ### Motivation
> > > > > > >
> > > > > > > Some Protocol Handlers may need to know about the topic-specific
> > events
> > > > > > to
> > > > > > > update internal caches and/or state.
> > > > > > >
> > > > > > > These mechanisms will be useful also for core Pulsar components
> > (like the
> > > > > > > Transactions subsystem) and probably we would be able to
> > simplify the
> > > > > > > interaction between the internal components in the broker by
> > using an
> > > > > > > unified mechanism to handle the lifecycle of topics.
> > > > > > >
> > > > > > > Specific use cases:
> > > > > > >
> > > > > > > KOP keeps some state for the topic and needs to handle such
> > cases as:
> > > > > > >
> > > > > > > - Topic Unloaded: release resources dedicated to the topic
> > > > > > > - Topic Loaded: trigger loading of components tied to the
> > partition
> > > > > > > (GroupCoordinator, TransactionManager)
> > > > > > > - Topic Deleted: remove any persistent state associated to the
> > topic that
> > > > > > > is stored in additional side system topics
> > > > > > > - Topic Created: the same as “deleted” (ensure that there is no
> > state on
> > > > > > > system topics related to the new topic)
> > > > > > >
> > > > > > >
> > > > > > > ### Goal
> > > > > > >
> > > > > > > This PIP defines a set of events needed for the protocol
> > handlers (and
> > > > > > for
> > > > > > > internal broker components) to get notifications about
> > topic-specific
> > > > > > > events as seen by BrokerService. PIP outlines changes needed for
> > protocol
> > > > > > > handlers to keep/cache state consistent with BrokerService’s.
> > > > > > >
> > > > > > > The changes should not affect Pulsar running without protocol
> > handlers or
> > > > > > > with protocol handlers that do not rely on the new events.
> > > > > > >
> > > > > > >
> > > > > > > ### API Changes
> > > > > > >
> > > > > > > ```java
> > > > > > > /**
> > > > > > >  * Listener for the Topic events.
> > > > > > >  */
> > > > > > > @InterfaceAudience.LimitedPrivate
> > > > > > > @InterfaceStability.Evolving
> > > > > > > public interface TopicEventsListener {
> > > > > > >
> > > > > > >     /**
> > > > > > >      * Types of events currently supported.
> > > > > > >      *  create/load/unload/delete
> > > > > > >      */
> > > > > > >     enum TopicEvent {
> > > > > > >         // create events included into load events
> > > > > > >         CREATE,
> > > > > > >         LOAD,
> > > > > > >         UNLOAD,
> > > > > > >         DELETE,
> > > > > > >     }
> > > > > > >
> > > > > > >     /**
> > > > > > >      * Stages of events currently supported.
> > > > > > >      *  before starting the event/successful completion/failed
> > completion
> > > > > > >      */
> > > > > > >     enum EventStage {
> > > > > > >         BEFORE,
> > > > > > >         SUCCESS,
> > > > > > >         FAILURE
> > > > > > >     }
> > > > > > >
> > > > > > >     /**
> > > > > > >      * Outcome of the listener.
> > > > > > >      * Ignored for events at final stages (SUCCESS/FAILURE),
> > > > > > >      *
> > > > > > >      */
> > > > > > >     enum EventProcessingOutcome {
> > > > > > >         OK,
> > > > > > >         FAILURE,
> > > > > > >         NOT_ALLOWED
> > > > > > >     }
> > > > > > >
> > > > > > >     /**
> > > > > > >      * POJO for event processing result (outcome, message)
> > > > > > >      */
> > > > > > >     @ToString(includeFieldNames=true)
> > > > > > >     @Data(staticConstructor="of")
> > > > > > >     class EventProcessingResult {
> > > > > > >         private final EventProcessingOutcome outcome;
> > > > > > >         private final String message;
> > > > > > >     }
> > > > > > >
> > > > > > >     /**
> > > > > > >      * Handle topic event.
> > > > > > >      * Choice of the thread / maintenance of the thread pool is
> > up to the
> > > > > > > event handlers.
> > > > > > >      * @param topicName - name of the topic
> > > > > > >      * @param event - TopicEvent
> > > > > > >      * @param stage - EventStage
> > > > > > >      * @param t - exception in case of FAILURE, if present/known
> > > > > > >      * @return - EventProcessingResult.
> > > > > > >      *      EventProcessingResult.EventProcessingOutcome != OK
> > indicates
> > > > > > > request to cancel
> > > > > > >      *           event at BEFORE stage.
> > > > > > >      */
> > > > > > >     EventProcessingResult handleEvent(String topicName,
> > TopicEvent event,
> > > > > > > EventStage stage, Throwable t);
> > > > > > > }
> > > > > > > ```
> > > > > > >
> > > > > > > BrokerService:
> > > > > > > ```java
> > > > > > >     public void addTopicEventListener(TopicEventsListener...
> > listeners)
> > > > > > >
> > > > > > >     public void removeTopicEventListener(TopicEventsListener...
> > > > > > listeners)
> > > > > > > ```
> > > > > > >
> > > > > > > ### Implementation
> > > > > > >
> > > > > > > See PR for the proposed implementation.
> > > > > > > https://github.com/apache/pulsar/pull/19153
> > > > > > >
> > > > > > >
> > > > > > > ### Alternatives
> > > > > > >
> > > > > > > Add new methods to the BrokerInterceptor API
> > > > > > >
> > > > > > > --
> > > > > > > Andrey
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Andrey Yegorov
> >
>
>
> --
> Andrey Yegorov

Re: [DISCUSS] PIP-241: TopicEventListener / topic events for the BrokerService

Posted by Andrey Yegorov <an...@datastax.com>.
Hi Haiting,

Currently there are no plans to use TopicEvent/EventStage outside of the
TopicEventsListener.
We can refactor that if the situation changes and if these will be reusable
(which I doubt at least for the TopicEvent).
I'd keep it contained in one file/namespace.

On Thu, Jan 19, 2023 at 2:37 AM Haiting Jiang <ji...@gmail.com>
wrote:

> Hi Andrey,
>
> This PIP makes sense to me.
> A small question: Is it better to define `TopicEvent` and `EventStage`
> outside of  `TopicEventsListener`?
>
> Thanks,
> Haiting
>
>
> On Tue, Jan 17, 2023 at 3:51 PM Enrico Olivelli <eo...@gmail.com>
> wrote:
> >
> > I support this PIP.
> > This will allow Protocol Handlers to have better knowledge of what's
> > happening on the broker.
> >
> > I hope that someday we will be able to refactor Broker internals using
> > this feature and decouple the components.
> >
> > Enrico
> >
> > Il giorno lun 16 gen 2023 alle ore 10:27 Yunze Xu
> > <yz...@streamnative.io.invalid> ha scritto:
> > >
> > > +1 to me now.
> > >
> > > Thanks,
> > > Yunze
> > >
> > > On Sat, Jan 14, 2023 at 1:29 AM Andrey Yegorov
> > > <an...@datastax.com> wrote:
> > > >
> > > > My bad, I pasted the interface code from a branch with experiment to
> cancel
> > > > events. This is no longer needed.
> > > > EventProcessingResult result is irrelevant, I updated the PIP.
> > > >
> > > > It is not in the PR.
> > > >
> > > > On Fri, Jan 13, 2023 at 4:08 AM Yunze Xu
> <yz...@streamnative.io.invalid>
> > > > wrote:
> > > >
> > > > > I have some questions about the EventProcessingResult:
> > > > > 1. What's the difference between FAILURE and NOT_ALLOWED?
> > > > > 2. If we need to return the `message`, which indicates the error
> IIUC,
> > > > > would it be better to replace the returned value with a checked
> > > > > exception?
> > > > >
> > > > > Thanks,
> > > > > Yunze
> > > > >
> > > > > On Fri, Jan 13, 2023 at 12:36 PM Andrey Yegorov <
> ayegorov@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I am starting discussion for PIP-241: TopicEventListener / topic
> events
> > > > > for
> > > > > > the BrokerService.
> > > > > >
> > > > > > PIP issue: https://github.com/apache/pulsar/issues/19224
> > > > > >
> > > > > > ### Motivation
> > > > > >
> > > > > > Some Protocol Handlers may need to know about the topic-specific
> events
> > > > > to
> > > > > > update internal caches and/or state.
> > > > > >
> > > > > > These mechanisms will be useful also for core Pulsar components
> (like the
> > > > > > Transactions subsystem) and probably we would be able to
> simplify the
> > > > > > interaction between the internal components in the broker by
> using an
> > > > > > unified mechanism to handle the lifecycle of topics.
> > > > > >
> > > > > > Specific use cases:
> > > > > >
> > > > > > KOP keeps some state for the topic and needs to handle such
> cases as:
> > > > > >
> > > > > > - Topic Unloaded: release resources dedicated to the topic
> > > > > > - Topic Loaded: trigger loading of components tied to the
> partition
> > > > > > (GroupCoordinator, TransactionManager)
> > > > > > - Topic Deleted: remove any persistent state associated to the
> topic that
> > > > > > is stored in additional side system topics
> > > > > > - Topic Created: the same as “deleted” (ensure that there is no
> state on
> > > > > > system topics related to the new topic)
> > > > > >
> > > > > >
> > > > > > ### Goal
> > > > > >
> > > > > > This PIP defines a set of events needed for the protocol
> handlers (and
> > > > > for
> > > > > > internal broker components) to get notifications about
> topic-specific
> > > > > > events as seen by BrokerService. PIP outlines changes needed for
> protocol
> > > > > > handlers to keep/cache state consistent with BrokerService’s.
> > > > > >
> > > > > > The changes should not affect Pulsar running without protocol
> handlers or
> > > > > > with protocol handlers that do not rely on the new events.
> > > > > >
> > > > > >
> > > > > > ### API Changes
> > > > > >
> > > > > > ```java
> > > > > > /**
> > > > > >  * Listener for the Topic events.
> > > > > >  */
> > > > > > @InterfaceAudience.LimitedPrivate
> > > > > > @InterfaceStability.Evolving
> > > > > > public interface TopicEventsListener {
> > > > > >
> > > > > >     /**
> > > > > >      * Types of events currently supported.
> > > > > >      *  create/load/unload/delete
> > > > > >      */
> > > > > >     enum TopicEvent {
> > > > > >         // create events included into load events
> > > > > >         CREATE,
> > > > > >         LOAD,
> > > > > >         UNLOAD,
> > > > > >         DELETE,
> > > > > >     }
> > > > > >
> > > > > >     /**
> > > > > >      * Stages of events currently supported.
> > > > > >      *  before starting the event/successful completion/failed
> completion
> > > > > >      */
> > > > > >     enum EventStage {
> > > > > >         BEFORE,
> > > > > >         SUCCESS,
> > > > > >         FAILURE
> > > > > >     }
> > > > > >
> > > > > >     /**
> > > > > >      * Outcome of the listener.
> > > > > >      * Ignored for events at final stages (SUCCESS/FAILURE),
> > > > > >      *
> > > > > >      */
> > > > > >     enum EventProcessingOutcome {
> > > > > >         OK,
> > > > > >         FAILURE,
> > > > > >         NOT_ALLOWED
> > > > > >     }
> > > > > >
> > > > > >     /**
> > > > > >      * POJO for event processing result (outcome, message)
> > > > > >      */
> > > > > >     @ToString(includeFieldNames=true)
> > > > > >     @Data(staticConstructor="of")
> > > > > >     class EventProcessingResult {
> > > > > >         private final EventProcessingOutcome outcome;
> > > > > >         private final String message;
> > > > > >     }
> > > > > >
> > > > > >     /**
> > > > > >      * Handle topic event.
> > > > > >      * Choice of the thread / maintenance of the thread pool is
> up to the
> > > > > > event handlers.
> > > > > >      * @param topicName - name of the topic
> > > > > >      * @param event - TopicEvent
> > > > > >      * @param stage - EventStage
> > > > > >      * @param t - exception in case of FAILURE, if present/known
> > > > > >      * @return - EventProcessingResult.
> > > > > >      *      EventProcessingResult.EventProcessingOutcome != OK
> indicates
> > > > > > request to cancel
> > > > > >      *           event at BEFORE stage.
> > > > > >      */
> > > > > >     EventProcessingResult handleEvent(String topicName,
> TopicEvent event,
> > > > > > EventStage stage, Throwable t);
> > > > > > }
> > > > > > ```
> > > > > >
> > > > > > BrokerService:
> > > > > > ```java
> > > > > >     public void addTopicEventListener(TopicEventsListener...
> listeners)
> > > > > >
> > > > > >     public void removeTopicEventListener(TopicEventsListener...
> > > > > listeners)
> > > > > > ```
> > > > > >
> > > > > > ### Implementation
> > > > > >
> > > > > > See PR for the proposed implementation.
> > > > > > https://github.com/apache/pulsar/pull/19153
> > > > > >
> > > > > >
> > > > > > ### Alternatives
> > > > > >
> > > > > > Add new methods to the BrokerInterceptor API
> > > > > >
> > > > > > --
> > > > > > Andrey
> > > > >
> > > >
> > > >
> > > > --
> > > > Andrey Yegorov
>


-- 
Andrey Yegorov

Re: [DISCUSS] PIP-241: TopicEventListener / topic events for the BrokerService

Posted by Haiting Jiang <ji...@gmail.com>.
Hi Andrey,

This PIP makes sense to me.
A small question: Is it better to define `TopicEvent` and `EventStage`
outside of  `TopicEventsListener`?

Thanks,
Haiting


On Tue, Jan 17, 2023 at 3:51 PM Enrico Olivelli <eo...@gmail.com> wrote:
>
> I support this PIP.
> This will allow Protocol Handlers to have better knowledge of what's
> happening on the broker.
>
> I hope that someday we will be able to refactor Broker internals using
> this feature and decouple the components.
>
> Enrico
>
> Il giorno lun 16 gen 2023 alle ore 10:27 Yunze Xu
> <yz...@streamnative.io.invalid> ha scritto:
> >
> > +1 to me now.
> >
> > Thanks,
> > Yunze
> >
> > On Sat, Jan 14, 2023 at 1:29 AM Andrey Yegorov
> > <an...@datastax.com> wrote:
> > >
> > > My bad, I pasted the interface code from a branch with experiment to cancel
> > > events. This is no longer needed.
> > > EventProcessingResult result is irrelevant, I updated the PIP.
> > >
> > > It is not in the PR.
> > >
> > > On Fri, Jan 13, 2023 at 4:08 AM Yunze Xu <yz...@streamnative.io.invalid>
> > > wrote:
> > >
> > > > I have some questions about the EventProcessingResult:
> > > > 1. What's the difference between FAILURE and NOT_ALLOWED?
> > > > 2. If we need to return the `message`, which indicates the error IIUC,
> > > > would it be better to replace the returned value with a checked
> > > > exception?
> > > >
> > > > Thanks,
> > > > Yunze
> > > >
> > > > On Fri, Jan 13, 2023 at 12:36 PM Andrey Yegorov <ay...@apache.org>
> > > > wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > I am starting discussion for PIP-241: TopicEventListener / topic events
> > > > for
> > > > > the BrokerService.
> > > > >
> > > > > PIP issue: https://github.com/apache/pulsar/issues/19224
> > > > >
> > > > > ### Motivation
> > > > >
> > > > > Some Protocol Handlers may need to know about the topic-specific events
> > > > to
> > > > > update internal caches and/or state.
> > > > >
> > > > > These mechanisms will be useful also for core Pulsar components (like the
> > > > > Transactions subsystem) and probably we would be able to simplify the
> > > > > interaction between the internal components in the broker by using an
> > > > > unified mechanism to handle the lifecycle of topics.
> > > > >
> > > > > Specific use cases:
> > > > >
> > > > > KOP keeps some state for the topic and needs to handle such cases as:
> > > > >
> > > > > - Topic Unloaded: release resources dedicated to the topic
> > > > > - Topic Loaded: trigger loading of components tied to the partition
> > > > > (GroupCoordinator, TransactionManager)
> > > > > - Topic Deleted: remove any persistent state associated to the topic that
> > > > > is stored in additional side system topics
> > > > > - Topic Created: the same as “deleted” (ensure that there is no state on
> > > > > system topics related to the new topic)
> > > > >
> > > > >
> > > > > ### Goal
> > > > >
> > > > > This PIP defines a set of events needed for the protocol handlers (and
> > > > for
> > > > > internal broker components) to get notifications about topic-specific
> > > > > events as seen by BrokerService. PIP outlines changes needed for protocol
> > > > > handlers to keep/cache state consistent with BrokerService’s.
> > > > >
> > > > > The changes should not affect Pulsar running without protocol handlers or
> > > > > with protocol handlers that do not rely on the new events.
> > > > >
> > > > >
> > > > > ### API Changes
> > > > >
> > > > > ```java
> > > > > /**
> > > > >  * Listener for the Topic events.
> > > > >  */
> > > > > @InterfaceAudience.LimitedPrivate
> > > > > @InterfaceStability.Evolving
> > > > > public interface TopicEventsListener {
> > > > >
> > > > >     /**
> > > > >      * Types of events currently supported.
> > > > >      *  create/load/unload/delete
> > > > >      */
> > > > >     enum TopicEvent {
> > > > >         // create events included into load events
> > > > >         CREATE,
> > > > >         LOAD,
> > > > >         UNLOAD,
> > > > >         DELETE,
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Stages of events currently supported.
> > > > >      *  before starting the event/successful completion/failed completion
> > > > >      */
> > > > >     enum EventStage {
> > > > >         BEFORE,
> > > > >         SUCCESS,
> > > > >         FAILURE
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Outcome of the listener.
> > > > >      * Ignored for events at final stages (SUCCESS/FAILURE),
> > > > >      *
> > > > >      */
> > > > >     enum EventProcessingOutcome {
> > > > >         OK,
> > > > >         FAILURE,
> > > > >         NOT_ALLOWED
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * POJO for event processing result (outcome, message)
> > > > >      */
> > > > >     @ToString(includeFieldNames=true)
> > > > >     @Data(staticConstructor="of")
> > > > >     class EventProcessingResult {
> > > > >         private final EventProcessingOutcome outcome;
> > > > >         private final String message;
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Handle topic event.
> > > > >      * Choice of the thread / maintenance of the thread pool is up to the
> > > > > event handlers.
> > > > >      * @param topicName - name of the topic
> > > > >      * @param event - TopicEvent
> > > > >      * @param stage - EventStage
> > > > >      * @param t - exception in case of FAILURE, if present/known
> > > > >      * @return - EventProcessingResult.
> > > > >      *      EventProcessingResult.EventProcessingOutcome != OK indicates
> > > > > request to cancel
> > > > >      *           event at BEFORE stage.
> > > > >      */
> > > > >     EventProcessingResult handleEvent(String topicName, TopicEvent event,
> > > > > EventStage stage, Throwable t);
> > > > > }
> > > > > ```
> > > > >
> > > > > BrokerService:
> > > > > ```java
> > > > >     public void addTopicEventListener(TopicEventsListener... listeners)
> > > > >
> > > > >     public void removeTopicEventListener(TopicEventsListener...
> > > > listeners)
> > > > > ```
> > > > >
> > > > > ### Implementation
> > > > >
> > > > > See PR for the proposed implementation.
> > > > > https://github.com/apache/pulsar/pull/19153
> > > > >
> > > > >
> > > > > ### Alternatives
> > > > >
> > > > > Add new methods to the BrokerInterceptor API
> > > > >
> > > > > --
> > > > > Andrey
> > > >
> > >
> > >
> > > --
> > > Andrey Yegorov

Re: [DISCUSS] PIP-241: TopicEventListener / topic events for the BrokerService

Posted by Enrico Olivelli <eo...@gmail.com>.
I support this PIP.
This will allow Protocol Handlers to have better knowledge of what's
happening on the broker.

I hope that someday we will be able to refactor Broker internals using
this feature and decouple the components.

Enrico

Il giorno lun 16 gen 2023 alle ore 10:27 Yunze Xu
<yz...@streamnative.io.invalid> ha scritto:
>
> +1 to me now.
>
> Thanks,
> Yunze
>
> On Sat, Jan 14, 2023 at 1:29 AM Andrey Yegorov
> <an...@datastax.com> wrote:
> >
> > My bad, I pasted the interface code from a branch with experiment to cancel
> > events. This is no longer needed.
> > EventProcessingResult result is irrelevant, I updated the PIP.
> >
> > It is not in the PR.
> >
> > On Fri, Jan 13, 2023 at 4:08 AM Yunze Xu <yz...@streamnative.io.invalid>
> > wrote:
> >
> > > I have some questions about the EventProcessingResult:
> > > 1. What's the difference between FAILURE and NOT_ALLOWED?
> > > 2. If we need to return the `message`, which indicates the error IIUC,
> > > would it be better to replace the returned value with a checked
> > > exception?
> > >
> > > Thanks,
> > > Yunze
> > >
> > > On Fri, Jan 13, 2023 at 12:36 PM Andrey Yegorov <ay...@apache.org>
> > > wrote:
> > > >
> > > > Hi,
> > > >
> > > > I am starting discussion for PIP-241: TopicEventListener / topic events
> > > for
> > > > the BrokerService.
> > > >
> > > > PIP issue: https://github.com/apache/pulsar/issues/19224
> > > >
> > > > ### Motivation
> > > >
> > > > Some Protocol Handlers may need to know about the topic-specific events
> > > to
> > > > update internal caches and/or state.
> > > >
> > > > These mechanisms will be useful also for core Pulsar components (like the
> > > > Transactions subsystem) and probably we would be able to simplify the
> > > > interaction between the internal components in the broker by using an
> > > > unified mechanism to handle the lifecycle of topics.
> > > >
> > > > Specific use cases:
> > > >
> > > > KOP keeps some state for the topic and needs to handle such cases as:
> > > >
> > > > - Topic Unloaded: release resources dedicated to the topic
> > > > - Topic Loaded: trigger loading of components tied to the partition
> > > > (GroupCoordinator, TransactionManager)
> > > > - Topic Deleted: remove any persistent state associated to the topic that
> > > > is stored in additional side system topics
> > > > - Topic Created: the same as “deleted” (ensure that there is no state on
> > > > system topics related to the new topic)
> > > >
> > > >
> > > > ### Goal
> > > >
> > > > This PIP defines a set of events needed for the protocol handlers (and
> > > for
> > > > internal broker components) to get notifications about topic-specific
> > > > events as seen by BrokerService. PIP outlines changes needed for protocol
> > > > handlers to keep/cache state consistent with BrokerService’s.
> > > >
> > > > The changes should not affect Pulsar running without protocol handlers or
> > > > with protocol handlers that do not rely on the new events.
> > > >
> > > >
> > > > ### API Changes
> > > >
> > > > ```java
> > > > /**
> > > >  * Listener for the Topic events.
> > > >  */
> > > > @InterfaceAudience.LimitedPrivate
> > > > @InterfaceStability.Evolving
> > > > public interface TopicEventsListener {
> > > >
> > > >     /**
> > > >      * Types of events currently supported.
> > > >      *  create/load/unload/delete
> > > >      */
> > > >     enum TopicEvent {
> > > >         // create events included into load events
> > > >         CREATE,
> > > >         LOAD,
> > > >         UNLOAD,
> > > >         DELETE,
> > > >     }
> > > >
> > > >     /**
> > > >      * Stages of events currently supported.
> > > >      *  before starting the event/successful completion/failed completion
> > > >      */
> > > >     enum EventStage {
> > > >         BEFORE,
> > > >         SUCCESS,
> > > >         FAILURE
> > > >     }
> > > >
> > > >     /**
> > > >      * Outcome of the listener.
> > > >      * Ignored for events at final stages (SUCCESS/FAILURE),
> > > >      *
> > > >      */
> > > >     enum EventProcessingOutcome {
> > > >         OK,
> > > >         FAILURE,
> > > >         NOT_ALLOWED
> > > >     }
> > > >
> > > >     /**
> > > >      * POJO for event processing result (outcome, message)
> > > >      */
> > > >     @ToString(includeFieldNames=true)
> > > >     @Data(staticConstructor="of")
> > > >     class EventProcessingResult {
> > > >         private final EventProcessingOutcome outcome;
> > > >         private final String message;
> > > >     }
> > > >
> > > >     /**
> > > >      * Handle topic event.
> > > >      * Choice of the thread / maintenance of the thread pool is up to the
> > > > event handlers.
> > > >      * @param topicName - name of the topic
> > > >      * @param event - TopicEvent
> > > >      * @param stage - EventStage
> > > >      * @param t - exception in case of FAILURE, if present/known
> > > >      * @return - EventProcessingResult.
> > > >      *      EventProcessingResult.EventProcessingOutcome != OK indicates
> > > > request to cancel
> > > >      *           event at BEFORE stage.
> > > >      */
> > > >     EventProcessingResult handleEvent(String topicName, TopicEvent event,
> > > > EventStage stage, Throwable t);
> > > > }
> > > > ```
> > > >
> > > > BrokerService:
> > > > ```java
> > > >     public void addTopicEventListener(TopicEventsListener... listeners)
> > > >
> > > >     public void removeTopicEventListener(TopicEventsListener...
> > > listeners)
> > > > ```
> > > >
> > > > ### Implementation
> > > >
> > > > See PR for the proposed implementation.
> > > > https://github.com/apache/pulsar/pull/19153
> > > >
> > > >
> > > > ### Alternatives
> > > >
> > > > Add new methods to the BrokerInterceptor API
> > > >
> > > > --
> > > > Andrey
> > >
> >
> >
> > --
> > Andrey Yegorov

Re: [DISCUSS] PIP-241: TopicEventListener / topic events for the BrokerService

Posted by Yunze Xu <yz...@streamnative.io.INVALID>.
+1 to me now.

Thanks,
Yunze

On Sat, Jan 14, 2023 at 1:29 AM Andrey Yegorov
<an...@datastax.com> wrote:
>
> My bad, I pasted the interface code from a branch with experiment to cancel
> events. This is no longer needed.
> EventProcessingResult result is irrelevant, I updated the PIP.
>
> It is not in the PR.
>
> On Fri, Jan 13, 2023 at 4:08 AM Yunze Xu <yz...@streamnative.io.invalid>
> wrote:
>
> > I have some questions about the EventProcessingResult:
> > 1. What's the difference between FAILURE and NOT_ALLOWED?
> > 2. If we need to return the `message`, which indicates the error IIUC,
> > would it be better to replace the returned value with a checked
> > exception?
> >
> > Thanks,
> > Yunze
> >
> > On Fri, Jan 13, 2023 at 12:36 PM Andrey Yegorov <ay...@apache.org>
> > wrote:
> > >
> > > Hi,
> > >
> > > I am starting discussion for PIP-241: TopicEventListener / topic events
> > for
> > > the BrokerService.
> > >
> > > PIP issue: https://github.com/apache/pulsar/issues/19224
> > >
> > > ### Motivation
> > >
> > > Some Protocol Handlers may need to know about the topic-specific events
> > to
> > > update internal caches and/or state.
> > >
> > > These mechanisms will be useful also for core Pulsar components (like the
> > > Transactions subsystem) and probably we would be able to simplify the
> > > interaction between the internal components in the broker by using an
> > > unified mechanism to handle the lifecycle of topics.
> > >
> > > Specific use cases:
> > >
> > > KOP keeps some state for the topic and needs to handle such cases as:
> > >
> > > - Topic Unloaded: release resources dedicated to the topic
> > > - Topic Loaded: trigger loading of components tied to the partition
> > > (GroupCoordinator, TransactionManager)
> > > - Topic Deleted: remove any persistent state associated to the topic that
> > > is stored in additional side system topics
> > > - Topic Created: the same as “deleted” (ensure that there is no state on
> > > system topics related to the new topic)
> > >
> > >
> > > ### Goal
> > >
> > > This PIP defines a set of events needed for the protocol handlers (and
> > for
> > > internal broker components) to get notifications about topic-specific
> > > events as seen by BrokerService. PIP outlines changes needed for protocol
> > > handlers to keep/cache state consistent with BrokerService’s.
> > >
> > > The changes should not affect Pulsar running without protocol handlers or
> > > with protocol handlers that do not rely on the new events.
> > >
> > >
> > > ### API Changes
> > >
> > > ```java
> > > /**
> > >  * Listener for the Topic events.
> > >  */
> > > @InterfaceAudience.LimitedPrivate
> > > @InterfaceStability.Evolving
> > > public interface TopicEventsListener {
> > >
> > >     /**
> > >      * Types of events currently supported.
> > >      *  create/load/unload/delete
> > >      */
> > >     enum TopicEvent {
> > >         // create events included into load events
> > >         CREATE,
> > >         LOAD,
> > >         UNLOAD,
> > >         DELETE,
> > >     }
> > >
> > >     /**
> > >      * Stages of events currently supported.
> > >      *  before starting the event/successful completion/failed completion
> > >      */
> > >     enum EventStage {
> > >         BEFORE,
> > >         SUCCESS,
> > >         FAILURE
> > >     }
> > >
> > >     /**
> > >      * Outcome of the listener.
> > >      * Ignored for events at final stages (SUCCESS/FAILURE),
> > >      *
> > >      */
> > >     enum EventProcessingOutcome {
> > >         OK,
> > >         FAILURE,
> > >         NOT_ALLOWED
> > >     }
> > >
> > >     /**
> > >      * POJO for event processing result (outcome, message)
> > >      */
> > >     @ToString(includeFieldNames=true)
> > >     @Data(staticConstructor="of")
> > >     class EventProcessingResult {
> > >         private final EventProcessingOutcome outcome;
> > >         private final String message;
> > >     }
> > >
> > >     /**
> > >      * Handle topic event.
> > >      * Choice of the thread / maintenance of the thread pool is up to the
> > > event handlers.
> > >      * @param topicName - name of the topic
> > >      * @param event - TopicEvent
> > >      * @param stage - EventStage
> > >      * @param t - exception in case of FAILURE, if present/known
> > >      * @return - EventProcessingResult.
> > >      *      EventProcessingResult.EventProcessingOutcome != OK indicates
> > > request to cancel
> > >      *           event at BEFORE stage.
> > >      */
> > >     EventProcessingResult handleEvent(String topicName, TopicEvent event,
> > > EventStage stage, Throwable t);
> > > }
> > > ```
> > >
> > > BrokerService:
> > > ```java
> > >     public void addTopicEventListener(TopicEventsListener... listeners)
> > >
> > >     public void removeTopicEventListener(TopicEventsListener...
> > listeners)
> > > ```
> > >
> > > ### Implementation
> > >
> > > See PR for the proposed implementation.
> > > https://github.com/apache/pulsar/pull/19153
> > >
> > >
> > > ### Alternatives
> > >
> > > Add new methods to the BrokerInterceptor API
> > >
> > > --
> > > Andrey
> >
>
>
> --
> Andrey Yegorov

Re: [DISCUSS] PIP-241: TopicEventListener / topic events for the BrokerService

Posted by Andrey Yegorov <an...@datastax.com>.
My bad, I pasted the interface code from a branch with experiment to cancel
events. This is no longer needed.
EventProcessingResult result is irrelevant, I updated the PIP.

It is not in the PR.

On Fri, Jan 13, 2023 at 4:08 AM Yunze Xu <yz...@streamnative.io.invalid>
wrote:

> I have some questions about the EventProcessingResult:
> 1. What's the difference between FAILURE and NOT_ALLOWED?
> 2. If we need to return the `message`, which indicates the error IIUC,
> would it be better to replace the returned value with a checked
> exception?
>
> Thanks,
> Yunze
>
> On Fri, Jan 13, 2023 at 12:36 PM Andrey Yegorov <ay...@apache.org>
> wrote:
> >
> > Hi,
> >
> > I am starting discussion for PIP-241: TopicEventListener / topic events
> for
> > the BrokerService.
> >
> > PIP issue: https://github.com/apache/pulsar/issues/19224
> >
> > ### Motivation
> >
> > Some Protocol Handlers may need to know about the topic-specific events
> to
> > update internal caches and/or state.
> >
> > These mechanisms will be useful also for core Pulsar components (like the
> > Transactions subsystem) and probably we would be able to simplify the
> > interaction between the internal components in the broker by using an
> > unified mechanism to handle the lifecycle of topics.
> >
> > Specific use cases:
> >
> > KOP keeps some state for the topic and needs to handle such cases as:
> >
> > - Topic Unloaded: release resources dedicated to the topic
> > - Topic Loaded: trigger loading of components tied to the partition
> > (GroupCoordinator, TransactionManager)
> > - Topic Deleted: remove any persistent state associated to the topic that
> > is stored in additional side system topics
> > - Topic Created: the same as “deleted” (ensure that there is no state on
> > system topics related to the new topic)
> >
> >
> > ### Goal
> >
> > This PIP defines a set of events needed for the protocol handlers (and
> for
> > internal broker components) to get notifications about topic-specific
> > events as seen by BrokerService. PIP outlines changes needed for protocol
> > handlers to keep/cache state consistent with BrokerService’s.
> >
> > The changes should not affect Pulsar running without protocol handlers or
> > with protocol handlers that do not rely on the new events.
> >
> >
> > ### API Changes
> >
> > ```java
> > /**
> >  * Listener for the Topic events.
> >  */
> > @InterfaceAudience.LimitedPrivate
> > @InterfaceStability.Evolving
> > public interface TopicEventsListener {
> >
> >     /**
> >      * Types of events currently supported.
> >      *  create/load/unload/delete
> >      */
> >     enum TopicEvent {
> >         // create events included into load events
> >         CREATE,
> >         LOAD,
> >         UNLOAD,
> >         DELETE,
> >     }
> >
> >     /**
> >      * Stages of events currently supported.
> >      *  before starting the event/successful completion/failed completion
> >      */
> >     enum EventStage {
> >         BEFORE,
> >         SUCCESS,
> >         FAILURE
> >     }
> >
> >     /**
> >      * Outcome of the listener.
> >      * Ignored for events at final stages (SUCCESS/FAILURE),
> >      *
> >      */
> >     enum EventProcessingOutcome {
> >         OK,
> >         FAILURE,
> >         NOT_ALLOWED
> >     }
> >
> >     /**
> >      * POJO for event processing result (outcome, message)
> >      */
> >     @ToString(includeFieldNames=true)
> >     @Data(staticConstructor="of")
> >     class EventProcessingResult {
> >         private final EventProcessingOutcome outcome;
> >         private final String message;
> >     }
> >
> >     /**
> >      * Handle topic event.
> >      * Choice of the thread / maintenance of the thread pool is up to the
> > event handlers.
> >      * @param topicName - name of the topic
> >      * @param event - TopicEvent
> >      * @param stage - EventStage
> >      * @param t - exception in case of FAILURE, if present/known
> >      * @return - EventProcessingResult.
> >      *      EventProcessingResult.EventProcessingOutcome != OK indicates
> > request to cancel
> >      *           event at BEFORE stage.
> >      */
> >     EventProcessingResult handleEvent(String topicName, TopicEvent event,
> > EventStage stage, Throwable t);
> > }
> > ```
> >
> > BrokerService:
> > ```java
> >     public void addTopicEventListener(TopicEventsListener... listeners)
> >
> >     public void removeTopicEventListener(TopicEventsListener...
> listeners)
> > ```
> >
> > ### Implementation
> >
> > See PR for the proposed implementation.
> > https://github.com/apache/pulsar/pull/19153
> >
> >
> > ### Alternatives
> >
> > Add new methods to the BrokerInterceptor API
> >
> > --
> > Andrey
>


-- 
Andrey Yegorov

Re: [DISCUSS] PIP-241: TopicEventListener / topic events for the BrokerService

Posted by Yunze Xu <yz...@streamnative.io.INVALID>.
I have some questions about the EventProcessingResult:
1. What's the difference between FAILURE and NOT_ALLOWED?
2. If we need to return the `message`, which indicates the error IIUC,
would it be better to replace the returned value with a checked
exception?

Thanks,
Yunze

On Fri, Jan 13, 2023 at 12:36 PM Andrey Yegorov <ay...@apache.org> wrote:
>
> Hi,
>
> I am starting discussion for PIP-241: TopicEventListener / topic events for
> the BrokerService.
>
> PIP issue: https://github.com/apache/pulsar/issues/19224
>
> ### Motivation
>
> Some Protocol Handlers may need to know about the topic-specific events to
> update internal caches and/or state.
>
> These mechanisms will be useful also for core Pulsar components (like the
> Transactions subsystem) and probably we would be able to simplify the
> interaction between the internal components in the broker by using an
> unified mechanism to handle the lifecycle of topics.
>
> Specific use cases:
>
> KOP keeps some state for the topic and needs to handle such cases as:
>
> - Topic Unloaded: release resources dedicated to the topic
> - Topic Loaded: trigger loading of components tied to the partition
> (GroupCoordinator, TransactionManager)
> - Topic Deleted: remove any persistent state associated to the topic that
> is stored in additional side system topics
> - Topic Created: the same as “deleted” (ensure that there is no state on
> system topics related to the new topic)
>
>
> ### Goal
>
> This PIP defines a set of events needed for the protocol handlers (and for
> internal broker components) to get notifications about topic-specific
> events as seen by BrokerService. PIP outlines changes needed for protocol
> handlers to keep/cache state consistent with BrokerService’s.
>
> The changes should not affect Pulsar running without protocol handlers or
> with protocol handlers that do not rely on the new events.
>
>
> ### API Changes
>
> ```java
> /**
>  * Listener for the Topic events.
>  */
> @InterfaceAudience.LimitedPrivate
> @InterfaceStability.Evolving
> public interface TopicEventsListener {
>
>     /**
>      * Types of events currently supported.
>      *  create/load/unload/delete
>      */
>     enum TopicEvent {
>         // create events included into load events
>         CREATE,
>         LOAD,
>         UNLOAD,
>         DELETE,
>     }
>
>     /**
>      * Stages of events currently supported.
>      *  before starting the event/successful completion/failed completion
>      */
>     enum EventStage {
>         BEFORE,
>         SUCCESS,
>         FAILURE
>     }
>
>     /**
>      * Outcome of the listener.
>      * Ignored for events at final stages (SUCCESS/FAILURE),
>      *
>      */
>     enum EventProcessingOutcome {
>         OK,
>         FAILURE,
>         NOT_ALLOWED
>     }
>
>     /**
>      * POJO for event processing result (outcome, message)
>      */
>     @ToString(includeFieldNames=true)
>     @Data(staticConstructor="of")
>     class EventProcessingResult {
>         private final EventProcessingOutcome outcome;
>         private final String message;
>     }
>
>     /**
>      * Handle topic event.
>      * Choice of the thread / maintenance of the thread pool is up to the
> event handlers.
>      * @param topicName - name of the topic
>      * @param event - TopicEvent
>      * @param stage - EventStage
>      * @param t - exception in case of FAILURE, if present/known
>      * @return - EventProcessingResult.
>      *      EventProcessingResult.EventProcessingOutcome != OK indicates
> request to cancel
>      *           event at BEFORE stage.
>      */
>     EventProcessingResult handleEvent(String topicName, TopicEvent event,
> EventStage stage, Throwable t);
> }
> ```
>
> BrokerService:
> ```java
>     public void addTopicEventListener(TopicEventsListener... listeners)
>
>     public void removeTopicEventListener(TopicEventsListener... listeners)
> ```
>
> ### Implementation
>
> See PR for the proposed implementation.
> https://github.com/apache/pulsar/pull/19153
>
>
> ### Alternatives
>
> Add new methods to the BrokerInterceptor API
>
> --
> Andrey