You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "heesung-sn (via GitHub)" <gi...@apache.org> on 2023/02/17 00:08:45 UTC

[GitHub] [pulsar] heesung-sn opened a new pull request, #19546: [improve][broker] PIP-192 Added Disabled and Init states in ServiceUnitState

heesung-sn opened a new pull request, #19546:
URL: https://github.com/apache/pulsar/pull/19546

   <!--
   ### Contribution Checklist
     
     - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*. 
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   -->
   
   
   
   <!-- or this PR is one task of an issue -->
   
   Master Issue: https://github.com/apache/pulsar/issues/16691
   
   ### Motivation
   
   There is a possible edge case where bundle ownership could be in an invalid state in the new load balancer.
   
   Let's say strategic compaction happened in the middle of the bundle transfer.
   Owned->Transfer_Assigned // compaction happened.
   
   After the compaction, when a tableview joins and builds its cache from the compacted topic, it will first see a transition, `null -> Transfer_Assigned`. However, because this is invalid, the tableview will skip this msg, causing an inconsistent view. 
   
   To avoid this issue,  this transition(`null -> Transfer_Assigned`) has been changed to a valid transition in the state diagram, but I realized that this could bring another wrong state when transfer and split occur concurrently.
   
   **Racing condition**
   leader : Owned -> Transfer_Assigned -> Released -> Owned
   Broker-2 : Owned -> Splitting -> null(parent-bundle)
   
   **Wrong state**: a parent bundle is owned by another broker after the split.
   Owned -> Splitting -> **null -> Transfer_Assigned** -> Released -> Owned
   
   To break this invalid transition, we need semi-terminal states(`Disabled` and `Free`) and `Init` state representing tombstoned bundles.
   
   Owned -> Splitting -> Disabled -> Transfer_Assigned // invalid 
   Owned -> Free -> Transfer_Assigned // invalid 
   
   
   
   
   
   ### Modifications
   This PR 
   - added `Disabled` and `Init` States in `ServiceUnitState`
   - scheduled `monitorOwnerships()` on the leader broker's ServiceUnitStateChannel, which tombstones bundles 
     - from inactive brokers
     - in in-flight states too long (orphan bundles)
     - in semi-terminal states( Disabled and Free) long enough to tombstone.
   
   
   <img width="1940" alt="Screen Shot 2023-02-16 at 3 39 46 PM" src="https://user-images.githubusercontent.com/103456639/219512586-7c05d71f-7379-4c7b-beb5-748ac4cdded6.png">
   
   
   <!-- Describe the modifications you've done. -->
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
   
     - *Updaated unit tests.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   We will have separate PRs to update the Doc later.
   
   ### Matching PR in forked repository
   
   PR in forked repository: https://github.com/heesung-sn/pulsar/pull/32
   
   <!--
   After opening this PR, the build in apache/pulsar will fail and instructions will
   be provided for opening a PR in the PR author's forked repository.
   
   apache/pulsar pull requests should be first tested in your own fork since the 
   apache/pulsar CI based on GitHub Actions has constrained resources and quota.
   GitHub Actions provides separate quota for pull requests that are executed in 
   a forked repository.
   
   The tests will be run in the forked repository until all PR review comments have
   been handled, the tests pass and the PR is approved by a reviewer.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1113775844


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -522,12 +590,22 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
     }
 
     private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
-        if (isTargetBroker(data.sourceBroker())) {
-            ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker());
-            // TODO: when close, pass message to clients to connect to the new broker
-            closeServiceUnit(serviceUnit)
-                    .thenCompose(__ -> pubAsync(serviceUnit, next))
-                    .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+
+        if (isTransferCommand(data)) {
+            if (isTargetBroker(data.sourceBroker())) {
+                ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker());
+                // TODO: when close, pass message to clients to connect to the new broker
+                closeServiceUnit(serviceUnit)
+                        .thenCompose(__ -> pubAsync(serviceUnit, next))
+                        .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+            }
+        } else {
+            if (isTargetBroker(data.broker())) {
+                ServiceUnitStateData next = new ServiceUnitStateData(Free, data.broker());
+                closeServiceUnit(serviceUnit)

Review Comment:
   We updated the monitor logic(from tombstone to override), and now `handleInitEvent` does not call `closeServiceUnit`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1111608808


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java:
##########
@@ -24,55 +24,37 @@
 /**
  * Defines the possible states for service units.
  *
- * The following diagram defines the valid state changes
- *
- *                  ┌───────────┐
- *       ┌──────────┤ released  │◄────────┐
- *       │own       └───────────┘         │release
- *       │                                │
- *       │                                │
- *       ▼                                │
- *    ┌────────┐  assign(transfer)  ┌─────┴────┐
- *    │        ├───────────────────►│          │
- *    │ owned  │                    │ assigned │
- *    │        │◄───────────────────┤          │
- *    └──┬─────┤      own           └──────────┘
- *       │  ▲  │                         ▲
- *       │  │  │                         │
- *       │  │  └──────────────┐          │
- *       │  │                 │          │
- *       │  │        unload   │          │ assign(assignment)
- * split │  │                 │          │
- *       │  │                 │          │
- *       │  │ create(child)   │          │
- *       │  │                 │          │
- *       ▼  │                 │          │
- *    ┌─────┴─────┐           └─────►┌───┴──────┐
- *    │           │                  │          │
- *    │ splitting ├────────────────► │   free   │
- *    │           │   discard(parent)│          │
- *    └───────────┘                  └──────────┘
+ * Refer to Service Unit State Channel in https://github.com/apache/pulsar/issues/16691 for additional details.
  */
 public enum ServiceUnitState {
 
-    Free, // not owned by any broker (terminal state)
+    Init, // initializing the state. no previous state(terminal state)
+
+    Disabled, // disabled by the owner broker
+
+    Free, // not owned by any broker (semi-terminal state)
 
     Owned, // owned by a broker (terminal state)
 
     Assigned, // the ownership is assigned(but the assigned broker has not been notified the ownership yet)
 
     Released, // the source broker's ownership has been released (e.g. the topic connections are closed)
 
-    Splitting; // the service unit(e.g. bundle) is in the process of splitting.
+    Splitting, // the service unit(e.g. bundle) is in the process of splitting.
+
+    Deleted; // deleted in the system (semi-terminal state)
 
     private static Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of(
-            // (Free -> Released | Splitting) transitions are required
-            // when the topic is compacted in the middle of transfer or split.
-            Free, Set.of(Owned, Assigned, Released, Splitting),
-            Owned, Set.of(Assigned, Splitting, Free),
-            Assigned, Set.of(Owned, Released, Free),
-            Released, Set.of(Owned, Free),
-            Splitting, Set.of(Free)
+            // (Init -> all states) transitions are required
+            // when the topic is compacted in the middle of assign, transfer or split.
+            Init, Set.of(Disabled, Owned, Assigned, Released, Splitting, Deleted, Init),
+            Disabled, Set.of(Free, Init),

Review Comment:
   We decided to replace `disabled` with `released`. updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1114736634


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -821,27 +934,48 @@ private void scheduleCleanup(String broker, long delayInSecs) {
                 broker, delayInSecs, cleanupJobs.size());
     }
 
+    private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, Set<String> availableBrokers) {
+
+        Optional<String> selectedBroker = brokerSelector.select(availableBrokers, null, getContext());
+        if (selectedBroker.isPresent()) {
+            var override = new ServiceUnitStateData(Owned, selectedBroker.get(), true);
+            log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
+                    serviceUnit, orphanData, override);
+            pubAsync(serviceUnit, override).whenComplete((__, e) -> {
+                if (e != null) {
+                    log.error("Failed to override serviceUnit:{} from orphanData:{} to overrideData:{}",
+                            serviceUnit, orphanData, override, e);
+                }
+            });
+        } else {
+            log.error("Failed to override the ownership serviceUnit:{} orphanData:{}. Empty selected broker.",
+                    serviceUnit, orphanData);
+        }
+    }
+
 
-    private void doCleanup(String broker) {
+    private void doCleanup(String broker) throws ExecutionException, InterruptedException, TimeoutException {
         long startTime = System.nanoTime();
         log.info("Started ownership cleanup for the inactive broker:{}", broker);
-        int serviceUnitTombstoneCnt = 0;
+        int orphanServiceUnitCleanupCnt = 0;
         long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
-        for (Map.Entry<String, ServiceUnitStateData> etr : tableview.entrySet()) {
-            ServiceUnitStateData stateData = etr.getValue();
-            String serviceUnit = etr.getKey();
-            if (StringUtils.equals(broker, stateData.broker())
-                    || StringUtils.equals(broker, stateData.sourceBroker())) {
-                log.info("Cleaning ownership serviceUnit:{}, stateData:{}.", serviceUnit, stateData);
-                tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
-                    if (e != null) {
-                        log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, "
-                                        + "cleanupErrorCnt:{}.",
-                                serviceUnit, stateData,
-                                totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart);
-                    }
-                });
-                serviceUnitTombstoneCnt++;
+        var availableBrokers = new HashSet(brokerRegistry.getAvailableBrokersAsync()

Review Comment:
   Updated.



##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java:
##########
@@ -473,52 +473,172 @@ public void testSeekEarliestAfterCompaction() throws Exception {
                 .readCompacted(true).subscribe()) {
             consumer.seek(MessageId.earliest);
             Message<ServiceUnitStateData> m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getValue().broker(), "content2");
+            Assert.assertEquals(m.getKey(), key);
+            Assert.assertEquals(m.getValue(), testValues.get(2));
         }
 
         try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                 .readCompacted(false).subscribe()) {
             consumer.seek(MessageId.earliest);
 
             Message<ServiceUnitStateData> m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getValue().broker(), "content0");
+            Assert.assertEquals(m.getKey(), key);
+            Assert.assertEquals(m.getValue(), testValues.get(0));
 
             m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getValue().broker(), "content1");
+            Assert.assertEquals(m.getKey(), key);
+            Assert.assertEquals(m.getValue(), testValues.get(1));
 
             m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getValue().broker(), "content2");
+            Assert.assertEquals(m.getKey(), key);
+            Assert.assertEquals(m.getValue(), testValues.get(2));
         }
     }
 
     @Test
-    public void testBrokerRestartAfterCompaction() throws Exception {
+    public void testSlowTableviewAfterCompaction() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String strategyClassName = "topicCompactionStrategyClassName";
+        strategy.checkBrokers(true);
+
+        pulsarClient.newConsumer(schema)
+                .topic(topic)
+                .subscriptionName("sub1")
+                .readCompacted(true)
+                .subscribe().close();
+
+        var fastTV = pulsar.getClient().newTableViewBuilder(schema)
+                .topic(topic)
+                .subscriptionName("fastTV")
+                .loadConf(Map.of(
+                        strategyClassName,
+                        ServiceUnitStateCompactionStrategy.class.getName()))
+                .create();
+
+        var defaultConf = getDefaultConf();
+        var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
+        var pulsar2 = additionalPulsarTestContext.getPulsarService();
+
+        var slowTV = pulsar2.getClient().newTableViewBuilder(schema)
+                .topic(topic)
+                .subscriptionName("slowTV")
+                .loadConf(Map.of(
+                        strategyClassName,
+                        ServiceUnitStateCompactionStrategy.class.getName()))
+                .create();
+
+        var semaphore = new Semaphore(0);
+        AtomicBoolean handledReleased = new AtomicBoolean(false);
+
+        slowTV.listen((k, v) -> {
+            if (v.state() == Assigning) {
+                try {
+                    // Stuck at handling Assigned
+                    handledReleased.set(false);
+                    semaphore.acquire();
+                    //Thread.sleep(5000);

Review Comment:
   Updated.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java:
##########
@@ -24,60 +24,47 @@
 /**
  * Defines the possible states for service units.
  *
- * The following diagram defines the valid state changes
- *
- *                  ┌───────────┐
- *       ┌──────────┤ released  │◄────────┐
- *       │own       └───────────┘         │release
- *       │                                │
- *       │                                │
- *       ▼                                │
- *    ┌────────┐  assign(transfer)  ┌─────┴────┐
- *    │        ├───────────────────►│          │
- *    │ owned  │                    │ assigned │
- *    │        │◄───────────────────┤          │
- *    └──┬─────┤      own           └──────────┘
- *       │  ▲  │                         ▲
- *       │  │  │                         │
- *       │  │  └──────────────┐          │
- *       │  │                 │          │
- *       │  │        unload   │          │ assign(assignment)
- * split │  │                 │          │
- *       │  │                 │          │
- *       │  │ create(child)   │          │
- *       │  │                 │          │
- *       ▼  │                 │          │
- *    ┌─────┴─────┐           └─────►┌───┴──────┐
- *    │           │                  │          │
- *    │ splitting ├────────────────► │   free   │
- *    │           │   discard(parent)│          │
- *    └───────────┘                  └──────────┘
+ * Refer to Service Unit State Channel in https://github.com/apache/pulsar/issues/16691 for additional details.

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1113791212


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java:
##########
@@ -24,55 +24,37 @@
 /**
  * Defines the possible states for service units.
  *
- * The following diagram defines the valid state changes
- *
- *                  ┌───────────┐
- *       ┌──────────┤ released  │◄────────┐
- *       │own       └───────────┘         │release
- *       │                                │
- *       │                                │
- *       ▼                                │
- *    ┌────────┐  assign(transfer)  ┌─────┴────┐
- *    │        ├───────────────────►│          │
- *    │ owned  │                    │ assigned │
- *    │        │◄───────────────────┤          │
- *    └──┬─────┤      own           └──────────┘
- *       │  ▲  │                         ▲
- *       │  │  │                         │
- *       │  │  └──────────────┐          │
- *       │  │                 │          │
- *       │  │        unload   │          │ assign(assignment)
- * split │  │                 │          │
- *       │  │                 │          │
- *       │  │ create(child)   │          │
- *       │  │                 │          │
- *       ▼  │                 │          │
- *    ┌─────┴─────┐           └─────►┌───┴──────┐
- *    │           │                  │          │
- *    │ splitting ├────────────────► │   free   │
- *    │           │   discard(parent)│          │
- *    └───────────┘                  └──────────┘
+ * Refer to Service Unit State Channel in https://github.com/apache/pulsar/issues/16691 for additional details.
  */
 public enum ServiceUnitState {
 
-    Free, // not owned by any broker (terminal state)
+    Init, // initializing the state. no previous state(terminal state)
+
+    Disabled, // disabled by the owner broker
+
+    Free, // not owned by any broker (semi-terminal state)
 
     Owned, // owned by a broker (terminal state)
 
     Assigned, // the ownership is assigned(but the assigned broker has not been notified the ownership yet)
 
     Released, // the source broker's ownership has been released (e.g. the topic connections are closed)
 
-    Splitting; // the service unit(e.g. bundle) is in the process of splitting.
+    Splitting, // the service unit(e.g. bundle) is in the process of splitting.
+
+    Deleted; // deleted in the system (semi-terminal state)
 
     private static Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of(
-            // (Free -> Released | Splitting) transitions are required
-            // when the topic is compacted in the middle of transfer or split.
-            Free, Set.of(Owned, Assigned, Released, Splitting),
-            Owned, Set.of(Assigned, Splitting, Free),
-            Assigned, Set.of(Owned, Released, Free),
-            Released, Set.of(Owned, Free),
-            Splitting, Set.of(Free)
+            // (Init -> all states) transitions are required
+            // when the topic is compacted in the middle of assign, transfer or split.
+            Init, Set.of(Disabled, Owned, Assigned, Released, Splitting, Deleted, Init),
+            Disabled, Set.of(Free, Init),
+            Free, Set.of(Assigned, Init),
+            Owned, Set.of(Assigned, Splitting, Disabled, Init),

Review Comment:
   I updated this path as we have the `force` update flag in the ServiceUnitStateData.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1114234240


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -851,28 +985,51 @@ private void doCleanup(String broker) {
             log.error("Failed to flush the in-flight messages.", e);
         }
 
-        if (serviceUnitTombstoneCnt > 0) {
-            this.totalCleanupCnt++;
-            this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt;
-            this.totalBrokerCleanupTombstoneCnt++;
+        if (orphanServiceUnitCleanupCnt > 0) {
+            this.totalOrphanServiceUnitCleanupCnt += orphanServiceUnitCleanupCnt;
+            this.totalInactiveBrokerCleanupCnt++;
         }
 
         double cleanupTime = TimeUnit.NANOSECONDS
                 .toMillis((System.nanoTime() - startTime));
         // TODO: clean load data stores
         log.info("Completed a cleanup for the inactive broker:{} in {} ms. "
-                        + "Published tombstone for orphan service units: serviceUnitTombstoneCnt:{}, "
+                        + "Cleaned up orphan service units: orphanServiceUnitCleanupCnt:{}, "
                         + "approximate cleanupErrorCnt:{}, metrics:{} ",
                 broker,
                 cleanupTime,
-                serviceUnitTombstoneCnt,
+                orphanServiceUnitCleanupCnt,
                 totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
                 printCleanupMetrics());
         cleanupJobs.remove(broker);
     }
 
-    // TODO: integrate this monitor logic when broker registry is added
-    private void monitorOwnerships(List<String> brokers) {
+    private Optional<ServiceUnitStateData> getOverrideStateData(String serviceUnit, ServiceUnitStateData orphanData,
+                                                                Set<String> availableBrokers,
+                                                                LoadManagerContext context) {
+        if (isTransferCommand(orphanData)) {
+            // rollback to the src
+            return Optional.of(new ServiceUnitStateData(Owned, orphanData.sourceBroker(), true));
+        } else if (orphanData.state() == Assigning) { // assign
+            // roll-forward to another broker

Review Comment:
   Do we need to check this is a transfer assign or not?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1114728789


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -851,28 +985,51 @@ private void doCleanup(String broker) {
             log.error("Failed to flush the in-flight messages.", e);
         }
 
-        if (serviceUnitTombstoneCnt > 0) {
-            this.totalCleanupCnt++;
-            this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt;
-            this.totalBrokerCleanupTombstoneCnt++;
+        if (orphanServiceUnitCleanupCnt > 0) {
+            this.totalOrphanServiceUnitCleanupCnt += orphanServiceUnitCleanupCnt;
+            this.totalInactiveBrokerCleanupCnt++;
         }
 
         double cleanupTime = TimeUnit.NANOSECONDS
                 .toMillis((System.nanoTime() - startTime));
         // TODO: clean load data stores
         log.info("Completed a cleanup for the inactive broker:{} in {} ms. "
-                        + "Published tombstone for orphan service units: serviceUnitTombstoneCnt:{}, "
+                        + "Cleaned up orphan service units: orphanServiceUnitCleanupCnt:{}, "
                         + "approximate cleanupErrorCnt:{}, metrics:{} ",
                 broker,
                 cleanupTime,
-                serviceUnitTombstoneCnt,
+                orphanServiceUnitCleanupCnt,
                 totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
                 printCleanupMetrics());
         cleanupJobs.remove(broker);
     }
 
-    // TODO: integrate this monitor logic when broker registry is added
-    private void monitorOwnerships(List<String> brokers) {
+    private Optional<ServiceUnitStateData> getOverrideStateData(String serviceUnit, ServiceUnitStateData orphanData,
+                                                                Set<String> availableBrokers,
+                                                                LoadManagerContext context) {
+        if (isTransferCommand(orphanData)) {
+            // rollback to the src
+            return Optional.of(new ServiceUnitStateData(Owned, orphanData.sourceBroker(), true));
+        } else if (orphanData.state() == Assigning) { // assign
+            // roll-forward to another broker

Review Comment:
   No. transfer_assign is handled above `if (isTransferCommand(orphanData)) {`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Disabled, Deleted, and Init states in ServiceUnitState

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1111058818


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -207,14 +244,29 @@ public synchronized void start() throws PulsarServerException {
             throw new IllegalStateException("Invalid channel state:" + channelState.name());
         }
 
+        boolean debug = debug();
         try {
+            this.brokerRegistry = getBrokerRegistry();
+            this.brokerRegistry.addListener((broker, type) -> {
+                if (type == NotificationType.Deleted) {

Review Comment:
   There is a method `handleBrokerRegistrationEvent`, could we use it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 closed pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 closed pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState
URL: https://github.com/apache/pulsar/pull/19546


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Disabled, Deleted, and Init states in ServiceUnitState

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1111058818


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -207,14 +244,29 @@ public synchronized void start() throws PulsarServerException {
             throw new IllegalStateException("Invalid channel state:" + channelState.name());
         }
 
+        boolean debug = debug();
         try {
+            this.brokerRegistry = getBrokerRegistry();
+            this.brokerRegistry.addListener((broker, type) -> {
+                if (type == NotificationType.Deleted) {

Review Comment:
   There is a method `handleBrokerRegistrationEvent`, can we use it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1113791353


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java:
##########
@@ -24,55 +24,34 @@
 /**
  * Defines the possible states for service units.
  *
- * The following diagram defines the valid state changes
- *
- *                  ┌───────────┐
- *       ┌──────────┤ released  │◄────────┐
- *       │own       └───────────┘         │release
- *       │                                │
- *       │                                │
- *       ▼                                │
- *    ┌────────┐  assign(transfer)  ┌─────┴────┐
- *    │        ├───────────────────►│          │
- *    │ owned  │                    │ assigned │
- *    │        │◄───────────────────┤          │
- *    └──┬─────┤      own           └──────────┘
- *       │  ▲  │                         ▲
- *       │  │  │                         │
- *       │  │  └──────────────┐          │
- *       │  │                 │          │
- *       │  │        unload   │          │ assign(assignment)
- * split │  │                 │          │
- *       │  │                 │          │
- *       │  │ create(child)   │          │
- *       │  │                 │          │
- *       ▼  │                 │          │
- *    ┌─────┴─────┐           └─────►┌───┴──────┐
- *    │           │                  │          │
- *    │ splitting ├────────────────► │   free   │
- *    │           │   discard(parent)│          │
- *    └───────────┘                  └──────────┘
+ * Refer to Service Unit State Channel in https://github.com/apache/pulsar/issues/16691 for additional details.
  */
 public enum ServiceUnitState {
 
-    Free, // not owned by any broker (terminal state)
+    Init, // initializing the state. no previous state(terminal state)
+
+    Free, // not owned by any broker (semi-terminal state)
 
     Owned, // owned by a broker (terminal state)
 
     Assigned, // the ownership is assigned(but the assigned broker has not been notified the ownership yet)
 
     Released, // the source broker's ownership has been released (e.g. the topic connections are closed)
 
-    Splitting; // the service unit(e.g. bundle) is in the process of splitting.
+    Splitting, // the service unit(e.g. bundle) is in the process of splitting.
+
+    Disabled; // disabled in the system (semi-terminal state)
 
     private static Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of(

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1113791415


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -538,7 +616,15 @@ private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) {
         }
     }
 
-    private void handleFreeEvent(String serviceUnit) {
+    private void handleDisableEvent(String serviceUnit, ServiceUnitStateData data) {

Review Comment:
   Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1111661832


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -538,7 +616,15 @@ private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) {
         }
     }
 
-    private void handleFreeEvent(String serviceUnit) {
+    private void handleDisableEvent(String serviceUnit, ServiceUnitStateData data) {

Review Comment:
   This method was never used. Please remove it.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -522,12 +590,22 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
     }
 
     private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
-        if (isTargetBroker(data.sourceBroker())) {
-            ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker());
-            // TODO: when close, pass message to clients to connect to the new broker
-            closeServiceUnit(serviceUnit)
-                    .thenCompose(__ -> pubAsync(serviceUnit, next))
-                    .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+
+        if (isTransferCommand(data)) {
+            if (isTargetBroker(data.sourceBroker())) {
+                ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker());
+                // TODO: when close, pass message to clients to connect to the new broker
+                closeServiceUnit(serviceUnit)
+                        .thenCompose(__ -> pubAsync(serviceUnit, next))
+                        .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+            }
+        } else {
+            if (isTargetBroker(data.broker())) {
+                ServiceUnitStateData next = new ServiceUnitStateData(Free, data.broker());
+                closeServiceUnit(serviceUnit)

Review Comment:
   Since we already closed the service unit here, do we need to close the service unit again in `handleInitEvent`?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -522,12 +590,22 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
     }
 
     private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
-        if (isTargetBroker(data.sourceBroker())) {
-            ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker());
-            // TODO: when close, pass message to clients to connect to the new broker
-            closeServiceUnit(serviceUnit)
-                    .thenCompose(__ -> pubAsync(serviceUnit, next))
-                    .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+
+        if (isTransferCommand(data)) {
+            if (isTargetBroker(data.sourceBroker())) {
+                ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker());
+                // TODO: when close, pass message to clients to connect to the new broker
+                closeServiceUnit(serviceUnit)
+                        .thenCompose(__ -> pubAsync(serviceUnit, next))
+                        .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+            }
+        } else {
+            if (isTargetBroker(data.broker())) {

Review Comment:
   I have been thinking, should we rename the `sourceBroker` to `destBroker`, since the `broker` will usually be treated as "sourceBroker".
   
   Then we can change the logic like this:
   
   ```
           if (isTargetBroker(data.broker())) {
               if (isTransferCommand(data)) {
                   ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.destBroker());
                   // TODO: when close, pass message to clients to connect to the new broker
                   closeServiceUnit(serviceUnit)
                           .thenCompose(__ -> pubAsync(serviceUnit, next))
                           .whenComplete((__, e) -> log(e, serviceUnit, data, next));
               } else {
                   ServiceUnitStateData next = new ServiceUnitStateData(Free, data.broker());
                   closeServiceUnit(serviceUnit)
                           .thenCompose(__ -> pubAsync(serviceUnit, next))
                           .whenComplete((__, e) -> log(e, serviceUnit, data, next));
               }
           }
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java:
##########
@@ -24,55 +24,34 @@
 /**
  * Defines the possible states for service units.
  *
- * The following diagram defines the valid state changes
- *
- *                  ┌───────────┐
- *       ┌──────────┤ released  │◄────────┐
- *       │own       └───────────┘         │release
- *       │                                │
- *       │                                │
- *       ▼                                │
- *    ┌────────┐  assign(transfer)  ┌─────┴────┐
- *    │        ├───────────────────►│          │
- *    │ owned  │                    │ assigned │
- *    │        │◄───────────────────┤          │
- *    └──┬─────┤      own           └──────────┘
- *       │  ▲  │                         ▲
- *       │  │  │                         │
- *       │  │  └──────────────┐          │
- *       │  │                 │          │
- *       │  │        unload   │          │ assign(assignment)
- * split │  │                 │          │
- *       │  │                 │          │
- *       │  │ create(child)   │          │
- *       │  │                 │          │
- *       ▼  │                 │          │
- *    ┌─────┴─────┐           └─────►┌───┴──────┐
- *    │           │                  │          │
- *    │ splitting ├────────────────► │   free   │
- *    │           │   discard(parent)│          │
- *    └───────────┘                  └──────────┘
+ * Refer to Service Unit State Channel in https://github.com/apache/pulsar/issues/16691 for additional details.
  */
 public enum ServiceUnitState {
 
-    Free, // not owned by any broker (terminal state)
+    Init, // initializing the state. no previous state(terminal state)
+
+    Free, // not owned by any broker (semi-terminal state)
 
     Owned, // owned by a broker (terminal state)
 
     Assigned, // the ownership is assigned(but the assigned broker has not been notified the ownership yet)
 
     Released, // the source broker's ownership has been released (e.g. the topic connections are closed)
 
-    Splitting; // the service unit(e.g. bundle) is in the process of splitting.
+    Splitting, // the service unit(e.g. bundle) is in the process of splitting.
+
+    Disabled; // disabled in the system (semi-terminal state)
 
     private static Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of(

Review Comment:
   ```suggestion
       private static final Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of(
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -207,14 +243,23 @@ public synchronized void start() throws PulsarServerException {
             throw new IllegalStateException("Invalid channel state:" + channelState.name());
         }
 
+        boolean debug = debug();
         try {
+            this.brokerRegistry = getBrokerRegistry();
+            this.brokerRegistry.addListener((broker, type) -> {
+                handleBrokerRegistrationEvent(broker, type);
+            });

Review Comment:
   ```suggestion
               this.brokerRegistry.addListener(this::handleBrokerRegistrationEvent);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1112684717


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -348,7 +413,7 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
         }
 
         ServiceUnitStateData data = tableview.get(serviceUnit);
-        ServiceUnitState state = data == null ? Free : data.state();
+        ServiceUnitState state = data == null ? Init : data.state();

Review Comment:
   Maybe we can add a static method to get the service unit state and reuse the method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1115143826


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -851,28 +985,51 @@ private void doCleanup(String broker) {
             log.error("Failed to flush the in-flight messages.", e);
         }
 
-        if (serviceUnitTombstoneCnt > 0) {
-            this.totalCleanupCnt++;
-            this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt;
-            this.totalBrokerCleanupTombstoneCnt++;
+        if (orphanServiceUnitCleanupCnt > 0) {
+            this.totalOrphanServiceUnitCleanupCnt += orphanServiceUnitCleanupCnt;
+            this.totalInactiveBrokerCleanupCnt++;
         }
 
         double cleanupTime = TimeUnit.NANOSECONDS
                 .toMillis((System.nanoTime() - startTime));
         // TODO: clean load data stores
         log.info("Completed a cleanup for the inactive broker:{} in {} ms. "
-                        + "Published tombstone for orphan service units: serviceUnitTombstoneCnt:{}, "
+                        + "Cleaned up orphan service units: orphanServiceUnitCleanupCnt:{}, "
                         + "approximate cleanupErrorCnt:{}, metrics:{} ",
                 broker,
                 cleanupTime,
-                serviceUnitTombstoneCnt,
+                orphanServiceUnitCleanupCnt,
                 totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
                 printCleanupMetrics());
         cleanupJobs.remove(broker);
     }
 
-    // TODO: integrate this monitor logic when broker registry is added
-    private void monitorOwnerships(List<String> brokers) {
+    private Optional<ServiceUnitStateData> getOverrideStateData(String serviceUnit, ServiceUnitStateData orphanData,
+                                                                Set<String> availableBrokers,
+                                                                LoadManagerContext context) {
+        if (isTransferCommand(orphanData)) {
+            // rollback to the src
+            return Optional.of(new ServiceUnitStateData(Owned, orphanData.sourceBroker(), true));
+        } else if (orphanData.state() == Assigning) { // assign
+            // roll-forward to another broker

Review Comment:
   Oh, sorry, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Disabled, Deleted, and Init states in ServiceUnitState

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1111056732


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java:
##########
@@ -24,55 +24,37 @@
 /**
  * Defines the possible states for service units.
  *
- * The following diagram defines the valid state changes
- *
- *                  ┌───────────┐
- *       ┌──────────┤ released  │◄────────┐
- *       │own       └───────────┘         │release
- *       │                                │
- *       │                                │
- *       ▼                                │
- *    ┌────────┐  assign(transfer)  ┌─────┴────┐
- *    │        ├───────────────────►│          │
- *    │ owned  │                    │ assigned │
- *    │        │◄───────────────────┤          │
- *    └──┬─────┤      own           └──────────┘
- *       │  ▲  │                         ▲
- *       │  │  │                         │
- *       │  │  └──────────────┐          │
- *       │  │                 │          │
- *       │  │        unload   │          │ assign(assignment)
- * split │  │                 │          │
- *       │  │                 │          │
- *       │  │ create(child)   │          │
- *       │  │                 │          │
- *       ▼  │                 │          │
- *    ┌─────┴─────┐           └─────►┌───┴──────┐
- *    │           │                  │          │
- *    │ splitting ├────────────────► │   free   │
- *    │           │   discard(parent)│          │
- *    └───────────┘                  └──────────┘
+ * Refer to Service Unit State Channel in https://github.com/apache/pulsar/issues/16691 for additional details.
  */
 public enum ServiceUnitState {
 
-    Free, // not owned by any broker (terminal state)
+    Init, // initializing the state. no previous state(terminal state)
+
+    Disabled, // disabled by the owner broker
+
+    Free, // not owned by any broker (semi-terminal state)
 
     Owned, // owned by a broker (terminal state)
 
     Assigned, // the ownership is assigned(but the assigned broker has not been notified the ownership yet)
 
     Released, // the source broker's ownership has been released (e.g. the topic connections are closed)
 
-    Splitting; // the service unit(e.g. bundle) is in the process of splitting.
+    Splitting, // the service unit(e.g. bundle) is in the process of splitting.
+
+    Deleted; // deleted in the system (semi-terminal state)
 
     private static Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of(
-            // (Free -> Released | Splitting) transitions are required
-            // when the topic is compacted in the middle of transfer or split.
-            Free, Set.of(Owned, Assigned, Released, Splitting),
-            Owned, Set.of(Assigned, Splitting, Free),
-            Assigned, Set.of(Owned, Released, Free),
-            Released, Set.of(Owned, Free),
-            Splitting, Set.of(Free)
+            // (Init -> all states) transitions are required
+            // when the topic is compacted in the middle of assign, transfer or split.
+            Init, Set.of(Disabled, Owned, Assigned, Released, Splitting, Deleted, Init),
+            Disabled, Set.of(Free, Init),
+            Free, Set.of(Assigned, Init),
+            Owned, Set.of(Assigned, Splitting, Disabled, Init),

Review Comment:
   Do `owend`, `Assigned`, `Released`, `Splitting`, `Disabled` state can transist to `Init` state?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -268,11 +338,13 @@ public synchronized void close() throws PulsarServerException {
                 log.info("Successfully closed the channel producer.");
             }
 
-            // TODO: clean brokerRegistry
+            if (brokerRegistry != null) {
+                brokerRegistry = null;

Review Comment:
   Do we need to close the broker registry?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java:
##########
@@ -24,55 +24,37 @@
 /**
  * Defines the possible states for service units.
  *
- * The following diagram defines the valid state changes
- *
- *                  ┌───────────┐
- *       ┌──────────┤ released  │◄────────┐
- *       │own       └───────────┘         │release
- *       │                                │
- *       │                                │
- *       ▼                                │
- *    ┌────────┐  assign(transfer)  ┌─────┴────┐
- *    │        ├───────────────────►│          │
- *    │ owned  │                    │ assigned │
- *    │        │◄───────────────────┤          │
- *    └──┬─────┤      own           └──────────┘
- *       │  ▲  │                         ▲
- *       │  │  │                         │
- *       │  │  └──────────────┐          │
- *       │  │                 │          │
- *       │  │        unload   │          │ assign(assignment)
- * split │  │                 │          │
- *       │  │                 │          │
- *       │  │ create(child)   │          │
- *       │  │                 │          │
- *       ▼  │                 │          │
- *    ┌─────┴─────┐           └─────►┌───┴──────┐
- *    │           │                  │          │
- *    │ splitting ├────────────────► │   free   │
- *    │           │   discard(parent)│          │
- *    └───────────┘                  └──────────┘
+ * Refer to Service Unit State Channel in https://github.com/apache/pulsar/issues/16691 for additional details.
  */
 public enum ServiceUnitState {
 
-    Free, // not owned by any broker (terminal state)
+    Init, // initializing the state. no previous state(terminal state)
+
+    Disabled, // disabled by the owner broker
+
+    Free, // not owned by any broker (semi-terminal state)
 
     Owned, // owned by a broker (terminal state)
 
     Assigned, // the ownership is assigned(but the assigned broker has not been notified the ownership yet)
 
     Released, // the source broker's ownership has been released (e.g. the topic connections are closed)
 
-    Splitting; // the service unit(e.g. bundle) is in the process of splitting.
+    Splitting, // the service unit(e.g. bundle) is in the process of splitting.
+
+    Deleted; // deleted in the system (semi-terminal state)
 
     private static Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of(
-            // (Free -> Released | Splitting) transitions are required
-            // when the topic is compacted in the middle of transfer or split.
-            Free, Set.of(Owned, Assigned, Released, Splitting),
-            Owned, Set.of(Assigned, Splitting, Free),
-            Assigned, Set.of(Owned, Released, Free),
-            Released, Set.of(Owned, Free),
-            Splitting, Set.of(Free)
+            // (Init -> all states) transitions are required
+            // when the topic is compacted in the middle of assign, transfer or split.
+            Init, Set.of(Disabled, Owned, Assigned, Released, Splitting, Deleted, Init),
+            Disabled, Set.of(Free, Init),

Review Comment:
   Why need to add `Disabled` state, it seems only `Disabled` state can transist to `Free` state.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -207,14 +244,29 @@ public synchronized void start() throws PulsarServerException {
             throw new IllegalStateException("Invalid channel state:" + channelState.name());
         }
 
+        boolean debug = debug();
         try {
+            this.brokerRegistry = getBrokerRegistry();
+            this.brokerRegistry.addListener((broker, type) -> {
+                if (type == NotificationType.Deleted) {

Review Comment:
   There is a method `handleBrokerRegistrationEvent`, does it is useful?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -294,7 +366,7 @@ private boolean validateChannelState(ChannelState targetState, boolean checkLowe
     }
 
     private boolean debug() {
-        return pulsar.getConfiguration().isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
+        return config.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();

Review Comment:
   I'm not sure why need to check log is in debug level or not, if the log in debug level, the logger will print too many logs, it's hard to check load balancer logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1111606973


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java:
##########
@@ -24,55 +24,37 @@
 /**
  * Defines the possible states for service units.
  *
- * The following diagram defines the valid state changes
- *
- *                  ┌───────────┐
- *       ┌──────────┤ released  │◄────────┐
- *       │own       └───────────┘         │release
- *       │                                │
- *       │                                │
- *       ▼                                │
- *    ┌────────┐  assign(transfer)  ┌─────┴────┐
- *    │        ├───────────────────►│          │
- *    │ owned  │                    │ assigned │
- *    │        │◄───────────────────┤          │
- *    └──┬─────┤      own           └──────────┘
- *       │  ▲  │                         ▲
- *       │  │  │                         │
- *       │  │  └──────────────┐          │
- *       │  │                 │          │
- *       │  │        unload   │          │ assign(assignment)
- * split │  │                 │          │
- *       │  │                 │          │
- *       │  │ create(child)   │          │
- *       │  │                 │          │
- *       ▼  │                 │          │
- *    ┌─────┴─────┐           └─────►┌───┴──────┐
- *    │           │                  │          │
- *    │ splitting ├────────────────► │   free   │
- *    │           │   discard(parent)│          │
- *    └───────────┘                  └──────────┘
+ * Refer to Service Unit State Channel in https://github.com/apache/pulsar/issues/16691 for additional details.
  */
 public enum ServiceUnitState {
 
-    Free, // not owned by any broker (terminal state)
+    Init, // initializing the state. no previous state(terminal state)
+
+    Disabled, // disabled by the owner broker
+
+    Free, // not owned by any broker (semi-terminal state)
 
     Owned, // owned by a broker (terminal state)
 
     Assigned, // the ownership is assigned(but the assigned broker has not been notified the ownership yet)
 
     Released, // the source broker's ownership has been released (e.g. the topic connections are closed)
 
-    Splitting; // the service unit(e.g. bundle) is in the process of splitting.
+    Splitting, // the service unit(e.g. bundle) is in the process of splitting.
+
+    Deleted; // deleted in the system (semi-terminal state)
 
     private static Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of(
-            // (Free -> Released | Splitting) transitions are required
-            // when the topic is compacted in the middle of transfer or split.
-            Free, Set.of(Owned, Assigned, Released, Splitting),
-            Owned, Set.of(Assigned, Splitting, Free),
-            Assigned, Set.of(Owned, Released, Free),
-            Released, Set.of(Owned, Free),
-            Splitting, Set.of(Free)
+            // (Init -> all states) transitions are required
+            // when the topic is compacted in the middle of assign, transfer or split.
+            Init, Set.of(Disabled, Owned, Assigned, Released, Splitting, Deleted, Init),
+            Disabled, Set.of(Free, Init),
+            Free, Set.of(Assigned, Init),
+            Owned, Set.of(Assigned, Splitting, Disabled, Init),

Review Comment:
   Yes, to tombstone bundles in stuck states or for admin to override.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -207,14 +244,29 @@ public synchronized void start() throws PulsarServerException {
             throw new IllegalStateException("Invalid channel state:" + channelState.name());
         }
 
+        boolean debug = debug();
         try {
+            this.brokerRegistry = getBrokerRegistry();
+            this.brokerRegistry.addListener((broker, type) -> {
+                if (type == NotificationType.Deleted) {

Review Comment:
   thanks. updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- merged PR #19546:
URL: https://github.com/apache/pulsar/pull/19546


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1112757979


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -522,12 +590,22 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
     }
 
     private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
-        if (isTargetBroker(data.sourceBroker())) {
-            ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker());
-            // TODO: when close, pass message to clients to connect to the new broker
-            closeServiceUnit(serviceUnit)
-                    .thenCompose(__ -> pubAsync(serviceUnit, next))
-                    .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+
+        if (isTransferCommand(data)) {
+            if (isTargetBroker(data.sourceBroker())) {

Review Comment:
   Should we use `isTargetBroker(data.sourceBroker())` in `handleAssignEvent ` method when data is a transfer command?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1111607844


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -294,7 +366,7 @@ private boolean validateChannelState(ChannelState targetState, boolean checkLowe
     }
 
     private boolean debug() {
-        return pulsar.getConfiguration().isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
+        return config.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();

Review Comment:
   If log4j debug is enabled, we also want to turn on the loadbalancer debug logs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1112684717


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -348,7 +413,7 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
         }
 
         ServiceUnitStateData data = tableview.get(serviceUnit);
-        ServiceUnitState state = data == null ? Free : data.state();
+        ServiceUnitState state = data == null ? Init : data.state();

Review Comment:
   Maybe we can add a static method to get the service unit state and reuse it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1112275433


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -522,12 +590,22 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
     }
 
     private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
-        if (isTargetBroker(data.sourceBroker())) {
-            ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker());
-            // TODO: when close, pass message to clients to connect to the new broker
-            closeServiceUnit(serviceUnit)
-                    .thenCompose(__ -> pubAsync(serviceUnit, next))
-                    .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+
+        if (isTransferCommand(data)) {
+            if (isTargetBroker(data.sourceBroker())) {
+                ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker());
+                // TODO: when close, pass message to clients to connect to the new broker
+                closeServiceUnit(serviceUnit)
+                        .thenCompose(__ -> pubAsync(serviceUnit, next))
+                        .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+            }
+        } else {
+            if (isTargetBroker(data.broker())) {

Review Comment:
   `sourceBroker` and `(dest)broker` are Transfer specific properties.
   
   Currently, `broker` means the target/dest broker. For example, Init->Assign transition, there is no source broker.
   
   I have considered making ServiceUnitStateData generic with command-specific properties, but let's not over-engineer this now.
   
   SeviceUnitType {Bundle, NameSpace}
   Map<String, Object> properties
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1114494416


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -821,27 +934,48 @@ private void scheduleCleanup(String broker, long delayInSecs) {
                 broker, delayInSecs, cleanupJobs.size());
     }
 
+    private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, Set<String> availableBrokers) {
+
+        Optional<String> selectedBroker = brokerSelector.select(availableBrokers, null, getContext());
+        if (selectedBroker.isPresent()) {
+            var override = new ServiceUnitStateData(Owned, selectedBroker.get(), true);
+            log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
+                    serviceUnit, orphanData, override);
+            pubAsync(serviceUnit, override).whenComplete((__, e) -> {
+                if (e != null) {
+                    log.error("Failed to override serviceUnit:{} from orphanData:{} to overrideData:{}",
+                            serviceUnit, orphanData, override, e);
+                }
+            });
+        } else {
+            log.error("Failed to override the ownership serviceUnit:{} orphanData:{}. Empty selected broker.",
+                    serviceUnit, orphanData);
+        }
+    }
+
 
-    private void doCleanup(String broker) {
+    private void doCleanup(String broker) throws ExecutionException, InterruptedException, TimeoutException {
         long startTime = System.nanoTime();
         log.info("Started ownership cleanup for the inactive broker:{}", broker);
-        int serviceUnitTombstoneCnt = 0;
+        int orphanServiceUnitCleanupCnt = 0;
         long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
-        for (Map.Entry<String, ServiceUnitStateData> etr : tableview.entrySet()) {
-            ServiceUnitStateData stateData = etr.getValue();
-            String serviceUnit = etr.getKey();
-            if (StringUtils.equals(broker, stateData.broker())
-                    || StringUtils.equals(broker, stateData.sourceBroker())) {
-                log.info("Cleaning ownership serviceUnit:{}, stateData:{}.", serviceUnit, stateData);
-                tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
-                    if (e != null) {
-                        log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, "
-                                        + "cleanupErrorCnt:{}.",
-                                serviceUnit, stateData,
-                                totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart);
-                    }
-                });
-                serviceUnitTombstoneCnt++;
+        var availableBrokers = new HashSet(brokerRegistry.getAvailableBrokersAsync()

Review Comment:
   ```suggestion
           var availableBrokers = new HashSet<>(brokerRegistry.getAvailableBrokersAsync()
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java:
##########
@@ -24,60 +24,47 @@
 /**
  * Defines the possible states for service units.
  *
- * The following diagram defines the valid state changes
- *
- *                  ┌───────────┐
- *       ┌──────────┤ released  │◄────────┐
- *       │own       └───────────┘         │release
- *       │                                │
- *       │                                │
- *       ▼                                │
- *    ┌────────┐  assign(transfer)  ┌─────┴────┐
- *    │        ├───────────────────►│          │
- *    │ owned  │                    │ assigned │
- *    │        │◄───────────────────┤          │
- *    └──┬─────┤      own           └──────────┘
- *       │  ▲  │                         ▲
- *       │  │  │                         │
- *       │  │  └──────────────┐          │
- *       │  │                 │          │
- *       │  │        unload   │          │ assign(assignment)
- * split │  │                 │          │
- *       │  │                 │          │
- *       │  │ create(child)   │          │
- *       │  │                 │          │
- *       ▼  │                 │          │
- *    ┌─────┴─────┐           └─────►┌───┴──────┐
- *    │           │                  │          │
- *    │ splitting ├────────────────► │   free   │
- *    │           │   discard(parent)│          │
- *    └───────────┘                  └──────────┘
+ * Refer to Service Unit State Channel in https://github.com/apache/pulsar/issues/16691 for additional details.

Review Comment:
   ```suggestion
    * Refer to Service Unit State Channel in <a href="https://github.com/apache/pulsar/issues/16691"/a> for additional details.
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java:
##########
@@ -473,52 +473,172 @@ public void testSeekEarliestAfterCompaction() throws Exception {
                 .readCompacted(true).subscribe()) {
             consumer.seek(MessageId.earliest);
             Message<ServiceUnitStateData> m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getValue().broker(), "content2");
+            Assert.assertEquals(m.getKey(), key);
+            Assert.assertEquals(m.getValue(), testValues.get(2));
         }
 
         try (Consumer<ServiceUnitStateData> consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
                 .readCompacted(false).subscribe()) {
             consumer.seek(MessageId.earliest);
 
             Message<ServiceUnitStateData> m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getValue().broker(), "content0");
+            Assert.assertEquals(m.getKey(), key);
+            Assert.assertEquals(m.getValue(), testValues.get(0));
 
             m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getValue().broker(), "content1");
+            Assert.assertEquals(m.getKey(), key);
+            Assert.assertEquals(m.getValue(), testValues.get(1));
 
             m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getValue().broker(), "content2");
+            Assert.assertEquals(m.getKey(), key);
+            Assert.assertEquals(m.getValue(), testValues.get(2));
         }
     }
 
     @Test
-    public void testBrokerRestartAfterCompaction() throws Exception {
+    public void testSlowTableviewAfterCompaction() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String strategyClassName = "topicCompactionStrategyClassName";
+        strategy.checkBrokers(true);
+
+        pulsarClient.newConsumer(schema)
+                .topic(topic)
+                .subscriptionName("sub1")
+                .readCompacted(true)
+                .subscribe().close();
+
+        var fastTV = pulsar.getClient().newTableViewBuilder(schema)
+                .topic(topic)
+                .subscriptionName("fastTV")
+                .loadConf(Map.of(
+                        strategyClassName,
+                        ServiceUnitStateCompactionStrategy.class.getName()))
+                .create();
+
+        var defaultConf = getDefaultConf();
+        var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
+        var pulsar2 = additionalPulsarTestContext.getPulsarService();
+
+        var slowTV = pulsar2.getClient().newTableViewBuilder(schema)
+                .topic(topic)
+                .subscriptionName("slowTV")
+                .loadConf(Map.of(
+                        strategyClassName,
+                        ServiceUnitStateCompactionStrategy.class.getName()))
+                .create();
+
+        var semaphore = new Semaphore(0);
+        AtomicBoolean handledReleased = new AtomicBoolean(false);
+
+        slowTV.listen((k, v) -> {
+            if (v.state() == Assigning) {
+                try {
+                    // Stuck at handling Assigned
+                    handledReleased.set(false);
+                    semaphore.acquire();
+                    //Thread.sleep(5000);

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1113774325


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -207,14 +243,23 @@ public synchronized void start() throws PulsarServerException {
             throw new IllegalStateException("Invalid channel state:" + channelState.name());
         }
 
+        boolean debug = debug();
         try {
+            this.brokerRegistry = getBrokerRegistry();
+            this.brokerRegistry.addListener((broker, type) -> {
+                handleBrokerRegistrationEvent(broker, type);
+            });

Review Comment:
   updated.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -522,12 +590,22 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
     }
 
     private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
-        if (isTargetBroker(data.sourceBroker())) {
-            ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker());
-            // TODO: when close, pass message to clients to connect to the new broker
-            closeServiceUnit(serviceUnit)
-                    .thenCompose(__ -> pubAsync(serviceUnit, next))
-                    .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+
+        if (isTransferCommand(data)) {
+            if (isTargetBroker(data.sourceBroker())) {
+                ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker());
+                // TODO: when close, pass message to clients to connect to the new broker
+                closeServiceUnit(serviceUnit)
+                        .thenCompose(__ -> pubAsync(serviceUnit, next))
+                        .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+            }
+        } else {
+            if (isTargetBroker(data.broker())) {
+                ServiceUnitStateData next = new ServiceUnitStateData(Free, data.broker());
+                closeServiceUnit(serviceUnit)

Review Comment:
   We updated the monitor logic(from tombstone to rollback), and now `handleInitEvent` does not call `closeServiceUnit`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -522,12 +590,22 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
     }
 
     private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
-        if (isTargetBroker(data.sourceBroker())) {
-            ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker());
-            // TODO: when close, pass message to clients to connect to the new broker
-            closeServiceUnit(serviceUnit)
-                    .thenCompose(__ -> pubAsync(serviceUnit, next))
-                    .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+
+        if (isTransferCommand(data)) {
+            if (isTargetBroker(data.sourceBroker())) {

Review Comment:
   no. `assign` is handled by the target broker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1111608385


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -268,11 +338,13 @@ public synchronized void close() throws PulsarServerException {
                 log.info("Successfully closed the channel producer.");
             }
 
-            // TODO: clean brokerRegistry
+            if (brokerRegistry != null) {
+                brokerRegistry = null;

Review Comment:
   The broker registry is closed in the load manager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1111672730


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java:
##########
@@ -24,55 +24,37 @@
 /**
  * Defines the possible states for service units.
  *
- * The following diagram defines the valid state changes
- *
- *                  ┌───────────┐
- *       ┌──────────┤ released  │◄────────┐
- *       │own       └───────────┘         │release
- *       │                                │
- *       │                                │
- *       ▼                                │
- *    ┌────────┐  assign(transfer)  ┌─────┴────┐
- *    │        ├───────────────────►│          │
- *    │ owned  │                    │ assigned │
- *    │        │◄───────────────────┤          │
- *    └──┬─────┤      own           └──────────┘
- *       │  ▲  │                         ▲
- *       │  │  │                         │
- *       │  │  └──────────────┐          │
- *       │  │                 │          │
- *       │  │        unload   │          │ assign(assignment)
- * split │  │                 │          │
- *       │  │                 │          │
- *       │  │ create(child)   │          │
- *       │  │                 │          │
- *       ▼  │                 │          │
- *    ┌─────┴─────┐           └─────►┌───┴──────┐
- *    │           │                  │          │
- *    │ splitting ├────────────────► │   free   │
- *    │           │   discard(parent)│          │
- *    └───────────┘                  └──────────┘
+ * Refer to Service Unit State Channel in https://github.com/apache/pulsar/issues/16691 for additional details.
  */
 public enum ServiceUnitState {
 
-    Free, // not owned by any broker (terminal state)
+    Init, // initializing the state. no previous state(terminal state)
+
+    Disabled, // disabled by the owner broker
+
+    Free, // not owned by any broker (semi-terminal state)
 
     Owned, // owned by a broker (terminal state)
 
     Assigned, // the ownership is assigned(but the assigned broker has not been notified the ownership yet)
 
     Released, // the source broker's ownership has been released (e.g. the topic connections are closed)
 
-    Splitting; // the service unit(e.g. bundle) is in the process of splitting.
+    Splitting, // the service unit(e.g. bundle) is in the process of splitting.
+
+    Deleted; // deleted in the system (semi-terminal state)
 
     private static Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of(
-            // (Free -> Released | Splitting) transitions are required
-            // when the topic is compacted in the middle of transfer or split.
-            Free, Set.of(Owned, Assigned, Released, Splitting),
-            Owned, Set.of(Assigned, Splitting, Free),
-            Assigned, Set.of(Owned, Released, Free),
-            Released, Set.of(Owned, Free),
-            Splitting, Set.of(Free)
+            // (Init -> all states) transitions are required
+            // when the topic is compacted in the middle of assign, transfer or split.
+            Init, Set.of(Disabled, Owned, Assigned, Released, Splitting, Deleted, Init),
+            Disabled, Set.of(Free, Init),

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1111672002


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -268,11 +338,13 @@ public synchronized void close() throws PulsarServerException {
                 log.info("Successfully closed the channel producer.");
             }
 
-            // TODO: clean brokerRegistry
+            if (brokerRegistry != null) {
+                brokerRegistry = null;

Review Comment:
   Ok, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1113791662


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -348,7 +413,7 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
         }
 
         ServiceUnitStateData data = tableview.get(serviceUnit);
-        ServiceUnitState state = data == null ? Free : data.state();
+        ServiceUnitState state = data == null ? Init : data.state();

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1112774848


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -522,12 +590,22 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
     }
 
     private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
-        if (isTargetBroker(data.sourceBroker())) {
-            ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker());
-            // TODO: when close, pass message to clients to connect to the new broker
-            closeServiceUnit(serviceUnit)
-                    .thenCompose(__ -> pubAsync(serviceUnit, next))
-                    .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+
+        if (isTransferCommand(data)) {
+            if (isTargetBroker(data.sourceBroker())) {

Review Comment:
   Should we use `data.sourceBroker()` in `handleAssignEvent ` method, when it is a transfer command?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Disabled and Init states in ServiceUnitState

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1109279281


##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -301,6 +304,34 @@ private <T> void phaseOneLoop(Reader<T> reader, CompletableFuture<PhaseOneResult
 
     }
 
+    private <T> void waitForReconnection(Reader<T> reader) {
+        long started = System.currentTimeMillis();
+
+        // initial sleep
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+        }
+        while (!reader.isConnected()) {
+            long now = System.currentTimeMillis();
+            if (now - started > MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS) {
+                String errorMsg = String.format(
+                        "Reader has not been reconnected for %d secs. Stopping the compaction.",
+                        MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS / 1000);
+                log.error(errorMsg);
+                throw new RuntimeException(errorMsg);
+            }
+            log.warn(
+                    "Reader has not been reconnected after the cursor reset. elapsed :{} ms. Retrying "
+                            + "soon.", now - started);
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                log.warn("The thread got interrupted while waiting. continuing", e);
+            }
+        }
+    }
+

Review Comment:
   Why do we add `waitForReconnection`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19546: [improve][broker] PIP-192 Added Disabled, Deleted, and Init states in ServiceUnitState

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1110428037


##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -301,6 +304,34 @@ private <T> void phaseOneLoop(Reader<T> reader, CompletableFuture<PhaseOneResult
 
     }
 
+    private <T> void waitForReconnection(Reader<T> reader) {
+        long started = System.currentTimeMillis();
+
+        // initial sleep
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+        }
+        while (!reader.isConnected()) {
+            long now = System.currentTimeMillis();
+            if (now - started > MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS) {
+                String errorMsg = String.format(
+                        "Reader has not been reconnected for %d secs. Stopping the compaction.",
+                        MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS / 1000);
+                log.error(errorMsg);
+                throw new RuntimeException(errorMsg);
+            }
+            log.warn(
+                    "Reader has not been reconnected after the cursor reset. elapsed :{} ms. Retrying "
+                            + "soon.", now - started);
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                log.warn("The thread got interrupted while waiting. continuing", e);
+            }
+        }
+    }
+

Review Comment:
   When `seek()` resets the cursor, this reader will be temporarily disconnected. 
   
   Then, when calling `acknowledgeCumulativeAsync()` at the end of the compaction(below code), the reader might throw an exception because state == Connecting. This issue could likely happen if there is only one message to compact.
   
   ```
                   .thenCompose(v -> {
                       log.info("Acking ledger id {}", phaseOneResult.firstId);
                       return ((CompactionReaderImpl<T>) reader)
                               .acknowledgeCumulativeAsync(
                                       phaseOneResult.lastId, Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY,
                                               ledger.getId()));
                   })
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org