You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/18 13:19:53 UTC

[GitHub] [pulsar] coderzc opened a new pull request, #17164: [improve][broker] Make cursor properties support modify single value concurrently.

coderzc opened a new pull request, #17164:
URL: https://github.com/apache/pulsar/pull/17164

   Master Issue: #16763
   
   ### Motivation
   
   Currently, the cursor properties only support overall update and it is not thread-safe, but sometimes we need to modify a single value concurrently, such as #16763
   
   ### Modifications
   
   Add `putCursorProperty`/ `putAllCursorProperties` / `removeCursorProperty` method and set `setCursorProperties` method to private.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r967963849


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -365,6 +367,48 @@ public void operationFailed(MetaStoreException e) {
         return updateCursorPropertiesResult;
     }
 
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+        Map<String, String> newProperties =
+                cursorProperties == null ? new HashMap<>() : new HashMap<>(cursorProperties);
+        long stamp = cursorPropertiesUpdateLock.writeLock();

Review Comment:
   After a closer look. This lock will block admin handler thread, and the unlock depends on metadata callback thread, it has potential deadlock risk (which depends on if `setCursorProperties` is called on metadata callback thread).
   
   I not quite certain about the purpose of this lock. Can you describe the non-thread-safe case here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r972700657


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -350,21 +356,48 @@ public void operationComplete(Void result, Stat stat) {
                     public void operationFailed(MetaStoreException e) {
                         log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties, e);
-                        updateCursorPropertiesResult.completeExceptionally(e);
+                        // if resource is updated by other operate then we will get bad-version exception
+                        // so, retry the operation.
+                        if (e instanceof ManagedLedgerException.BadVersionException) {
+                            asyncReadAndUpdateCursorProperties(updateFunction).whenComplete((__, ex) -> {

Review Comment:
   > computeCursorProperties(Function<Map<String, String>, Map<String, String>> updateFunction) instead of setCursorProperties(Map<String, String> cursorProperties)
   
   We can add it after we confirm we really need it.
   
   > It looks like giving the callers handle the update conflict is better. Can we add a parameter to choose whether need to automatically retry?
   
   IMO, we can just fail the request with BadVersionException. Let the caller make the decision, for the most case I think they can just treat it as a RuntimeException.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r972618141


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -350,21 +356,48 @@ public void operationComplete(Void result, Stat stat) {
                     public void operationFailed(MetaStoreException e) {
                         log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties, e);
-                        updateCursorPropertiesResult.completeExceptionally(e);
+                        // if resource is updated by other operate then we will get bad-version exception
+                        // so, retry the operation.
+                        if (e instanceof ManagedLedgerException.BadVersionException) {
+                            asyncReadAndUpdateCursorProperties(updateFunction).whenComplete((__, ex) -> {

Review Comment:
   > If we only retry put/remove here, I think it should be ok. It looks like we can introduce a new API in the feature to support compute operation for cursor properties. If we have a set operation, it will overwrite the whole cursor properties, the incoming properties maybe just a new version based on the old one. The retry will mess up the properties.
   
   I agree this, I think we can use `computeCursorProperties(Function<Map<String, String>, Map<String, String>> updateFunction)` instead of `setCursorProperties(Map<String, String> cursorProperties)`.
   
   > As @Jason918 said, it's a concurrency issue. If we have an internal retry, the callers need to guarantee the concurrent behavior. If we don't have an internal retry, the callers need to handle the update conflict. In my opinion, the former is more challenging than the latter
   
   It looks like giving the callers handle the update conflict is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973106613


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +328,27 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> computeCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
-        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
-            @Override
-            public void operationComplete(ManagedCursorInfo info, Stat stat) {
-                ManagedCursorInfo copy = ManagedCursorInfo
-                        .newBuilder(info)
-                        .clearCursorProperties()
-                        .addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
-                        .build();
-                ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
-                        name, copy, stat, new MetaStoreCallback<>() {
+
+        final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;

Review Comment:
   Question: Are we using this value with the ZK versioning mechanism for concurrency safety? this is a pointer(even if it's final), so I'm not sure if we can ensure that...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r970415765


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +326,25 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> asyncReadAndUpdateCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
         ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
             @Override
             public void operationComplete(ManagedCursorInfo info, Stat stat) {
+                final Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);

Review Comment:
   We should use the properties from `info`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973101218


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java:
##########
@@ -68,4 +69,28 @@ public static CompletableFuture<Void> waitForAll(List<CompletableFuture<Void>> f
 
         return compositeFuture;
     }
+
+    public static <T> CompletableFuture<T> executeWithRetry(Supplier<CompletableFuture<T>> op,
+                                                            Class<? extends Exception> needRetryExceptionClass) {
+        CompletableFuture<T> resultFuture = new CompletableFuture<>();
+        op.get().whenComplete((res, ex) -> {
+            if (ex == null) {
+                resultFuture.complete(res);
+            } else {
+                if (needRetryExceptionClass.isAssignableFrom(ex.getClass())) {
+                    executeWithRetry(op, needRetryExceptionClass).whenComplete((res2, ex2) -> {
+                        if (ex2 == null) {
+                            resultFuture.complete(res2);
+                        } else {
+                            resultFuture.completeExceptionally(ex2);
+                        }
+                    });
+                    return;
+                }
+                resultFuture.completeExceptionally(ex);
+            }
+        });
+
+        return resultFuture;
+    }

Review Comment:
   `RetryUtil#retryAsynchronously` can't retry based on a specific exception, I will improve it in another PR when.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r952242956


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -86,14 +86,20 @@ enum IndividualDeletedEntries {
      */
     Map<String, String> getCursorProperties();
 
-     /**
-      * Updates the properties.
-      * @param cursorProperties
-      * @return a handle to the result of the operation
-      */
-     default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
-         return CompletableFuture.completedFuture(null);
-     }
+    /**
+     * Add a property associated with the cursor.
+     */
+    void putCursorProperty(String key, String value);
+
+    /**
+     * Set all properties associated with the cursor, internal properties are still preserved.
+     */
+    void setAllCursorProperties(Map<String, String> properties);
+
+    /**
+     * Remove a property associated with the cursor.
+     */
+    void removeCursorProperty(String key);
 

Review Comment:
   > What's wrong with using the synchronous method?
   
   The main reason to avoid blocking methods in Pulsar is to prevent blocking Netty IO threads. Netty uses a small number of IO threads to execute and using blocking on these threads could cause the execution of work assigned to Netty event loops to not proceed as expected (other connections might hang / timeout / get delayed etc.). 
   There are also other reasons to avoid synchronous methods such as "[PIP-149 Making the REST Admin API fully async](https://github.com/apache/pulsar/issues/14365)". (cc @Technoboy- and @mattisonchao)
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r963025838


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -365,6 +372,50 @@ public void operationFailed(MetaStoreException e) {
         return updateCursorPropertiesResult;
     }
 
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+        MutableObject<CompletableFuture<Void>> updateCursorPropertiesResultRef = new MutableObject<>();
+        CURSOR_PROPERTIES_UPDATER.updateAndGet(this, map -> {
+            Map<String, String> newProperties =
+                    cursorProperties == null ? new TreeMap<>() : new TreeMap<>(cursorProperties);
+            if (map != null) {
+                map.forEach((k, v) -> {
+                    if (((String) k).startsWith(CURSOR_INTERNAL_PROPERTY_PREFIX)) {
+                        newProperties.put((String) k, (String) v);
+                    }
+                });
+
+                updateCursorPropertiesResultRef.setValue(asyncUpdateCursorProperties(newProperties));

Review Comment:
   It makes sense, I use non-reentrant lock instead of `AtomicReferenceFieldUpdater` to ensure the new properties take effect after they are persisted to storage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r967584845


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -365,6 +368,48 @@ public void operationFailed(MetaStoreException e) {
         return updateCursorPropertiesResult;
     }
 
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+        Map<String, String> newProperties =
+                cursorProperties == null ? new TreeMap<>() : new TreeMap<>(cursorProperties);

Review Comment:
   Thanks, it seems that `HashMap` is enough, I improve it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1245521675

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r970364061


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +326,25 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> asyncReadAndUpdateCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
         ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
             @Override
             public void operationComplete(ManagedCursorInfo info, Stat stat) {
+                final Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);

Review Comment:
   Update the zookeeper with local stat instead of new stat?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r952435063


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -86,14 +86,20 @@ enum IndividualDeletedEntries {
      */
     Map<String, String> getCursorProperties();
 
-     /**
-      * Updates the properties.
-      * @param cursorProperties
-      * @return a handle to the result of the operation
-      */
-     default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
-         return CompletableFuture.completedFuture(null);
-     }
+    /**
+     * Add a property associated with the cursor.
+     */
+    void putCursorProperty(String key, String value);
+
+    /**
+     * Set all properties associated with the cursor, internal properties are still preserved.
+     */
+    void setAllCursorProperties(Map<String, String> properties);
+
+    /**
+     * Remove a property associated with the cursor.
+     */
+    void removeCursorProperty(String key);
 

Review Comment:
   > > What's wrong with using the synchronous method?
   > 
   > The main reason to avoid blocking methods in Pulsar is to prevent blocking Netty IO threads. Netty uses a small number of IO threads to execute and using blocking on these threads could cause the execution of work assigned to Netty event loops to not proceed as expected (other unrelated connections might hang / timeout / get delayed etc.). There are also other reasons to avoid synchronous methods such as "[PIP-149 Making the REST Admin API fully async](https://github.com/apache/pulsar/issues/14365)". (cc @Technoboy- and @mattisonchao)
   
   I see, I think can keep `CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties)` asynchronous method, but need first update variable before update store.
   And add `CompletableFuture<Void> putCursorProperty(String key, String value)` and `CompletableFuture<Void> setCursorProperties(Map<String, String> properties)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1240382188

   
   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r967976234


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -365,6 +367,48 @@ public void operationFailed(MetaStoreException e) {
         return updateCursorPropertiesResult;
     }
 
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+        Map<String, String> newProperties =
+                cursorProperties == null ? new HashMap<>() : new HashMap<>(cursorProperties);
+        long stamp = cursorPropertiesUpdateLock.writeLock();

Review Comment:
   @Jason918 
   > I not quite certain about the purpose of this lock. Can you describe the non-thread-safe case here?
   
   The perform `updateSubscriptionProperties` by Admin-API and others using `CursorProperties` to have data race, for example, store the list of delayed message index bucket snapshot in the [PIP-195](https://github.com/apache/pulsar/issues/16763), so we need to add the lock to ensure data consistency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r970299664


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +326,25 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> asyncReadAndUpdateCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
         ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {

Review Comment:
   Can we remove the get operation?
   Instead, we can only update the local properties cache if we get a BadVersion exception 
   
   



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +326,25 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> asyncReadAndUpdateCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
         ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
             @Override
             public void operationComplete(ManagedCursorInfo info, Stat stat) {
+                final Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);

Review Comment:
   Here might introduce a problem, the `ManagedCursorImpl.this.cursorProperties` is staled, but we will update the zookeeper with a new stat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973082569


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java:
##########
@@ -68,4 +69,28 @@ public static CompletableFuture<Void> waitForAll(List<CompletableFuture<Void>> f
 
         return compositeFuture;
     }
+
+    public static <T> CompletableFuture<T> executeWithRetry(Supplier<CompletableFuture<T>> op,
+                                                            Class<? extends Exception> needRetryExceptionClass) {
+        CompletableFuture<T> resultFuture = new CompletableFuture<>();
+        op.get().whenComplete((res, ex) -> {
+            if (ex == null) {
+                resultFuture.complete(res);
+            } else {
+                if (needRetryExceptionClass.isAssignableFrom(ex.getClass())) {
+                    executeWithRetry(op, needRetryExceptionClass).whenComplete((res2, ex2) -> {
+                        if (ex2 == null) {
+                            resultFuture.complete(res2);
+                        } else {
+                            resultFuture.completeExceptionally(ex2);
+                        }
+                    });
+                    return;
+                }
+                resultFuture.completeExceptionally(ex);
+            }
+        });
+
+        return resultFuture;
+    }

Review Comment:
   Maybe we don't need to introduce a new method. the `RetryUtil#retryAsynchronously` method maybe can work here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973516304


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   > could we add the comments on here or asyncUpdateCursorInfo method to alter other contributors won't forget to update it?
   
   I package `updateCursorLedgerStat` method to update `cursorLedgerStat` and `managedCursorInfo` at the same tim. Also, add some notes to remind other contributors don't update `cursorLedgerStat` alone.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r972556310


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -350,21 +356,48 @@ public void operationComplete(Void result, Stat stat) {
                     public void operationFailed(MetaStoreException e) {
                         log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties, e);
-                        updateCursorPropertiesResult.completeExceptionally(e);
+                        // if resource is updated by other operate then we will get bad-version exception
+                        // so, retry the operation.
+                        if (e instanceof ManagedLedgerException.BadVersionException) {
+                            asyncReadAndUpdateCursorProperties(updateFunction).whenComplete((__, ex) -> {

Review Comment:
   I think the zookeeper will guarantee that the task execution sequence and all callbacks are executed serially in the same thread, so it conforms to linearizable semantics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973093200


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +328,27 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> computeCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
-        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
-            @Override
-            public void operationComplete(ManagedCursorInfo info, Stat stat) {
-                ManagedCursorInfo copy = ManagedCursorInfo
-                        .newBuilder(info)
-                        .clearCursorProperties()
-                        .addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
-                        .build();
-                ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
-                        name, copy, stat, new MetaStoreCallback<>() {
+
+        final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;
+
+        Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
+        ManagedCursorInfo copy = ManagedCursorInfo
+                .newBuilder(ManagedCursorImpl.this.managedCursorInfo)
+                .clearCursorProperties()
+                .addAllCursorProperties(buildStringPropertiesMap(newProperties))
+                .build();
+
+        ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
+                name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
                     @Override
                     public void operationComplete(Void result, Stat stat) {
                         log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties);
-                        ManagedCursorImpl.this.cursorProperties = cursorProperties;
+                        ManagedCursorImpl.this.managedCursorInfo = copy;
+                        ManagedCursorImpl.this.cursorProperties = Collections.unmodifiableMap(newProperties);

Review Comment:
   Yes, the newProperties already a copy, but properties are not copied at `setCursorProperties`, I will deal with it in another PR("support internal properties").



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r972618141


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -350,21 +356,48 @@ public void operationComplete(Void result, Stat stat) {
                     public void operationFailed(MetaStoreException e) {
                         log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties, e);
-                        updateCursorPropertiesResult.completeExceptionally(e);
+                        // if resource is updated by other operate then we will get bad-version exception
+                        // so, retry the operation.
+                        if (e instanceof ManagedLedgerException.BadVersionException) {
+                            asyncReadAndUpdateCursorProperties(updateFunction).whenComplete((__, ex) -> {

Review Comment:
   > If we only retry put/remove here, I think it should be ok. It looks like we can introduce a new API in the feature to support compute operation for cursor properties. If we have a set operation, it will overwrite the whole cursor properties, the incoming properties maybe just a new version based on the old one. The retry will mess up the properties.
   
   I agree this, I think we can use `computeCursorProperties(Function<Map<String, String>, Map<String, String>> updateFunction)` instead of `setCursorProperties(Map<String, String> cursorProperties)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r972908036


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +328,26 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> computeCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
-        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
-            @Override
-            public void operationComplete(ManagedCursorInfo info, Stat stat) {
-                ManagedCursorInfo copy = ManagedCursorInfo
-                        .newBuilder(info)
-                        .clearCursorProperties()
-                        .addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
-                        .build();
-                ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
-                        name, copy, stat, new MetaStoreCallback<>() {
+
+        final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;
+
+        Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
+        ManagedCursorInfo copy = ManagedCursorInfo
+                .newBuilder(ManagedCursorImpl.this.managedCursorInfo)
+                .clearCursorProperties()
+                .addAllCursorProperties(buildStringPropertiesMap(newProperties))
+                .build();
+
+        ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
+                name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
                     @Override
                     public void operationComplete(Void result, Stat stat) {
                         log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties);
-                        ManagedCursorImpl.this.cursorProperties = cursorProperties;
+                        ManagedCursorImpl.this.cursorProperties = newProperties;

Review Comment:
   The persistPositionMetaStore method uses the newly created ManagedCursorInfo to update zookeeper.
   
   https://github.com/apache/pulsar/blob/5a87c47705a156534c7be42dce2cc71707e80998/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2510-L2529



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973108078


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   Yes, but it can reduce a get operation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973516304


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   > could we add the comments on here or asyncUpdateCursorInfo method to alter other contributors won't forget to update it?
   
   I package `updateCursorLedgerStat` method to update cursorLedgerStat and managedCursorInfo at the same time and add some notes to remind other contributors don't update cursorLedgerStat alone.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1242591695

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1236872277

   > In the description it says: "use a special prefix to prevent internal property loss." Why is a special prefix needed? What are the consequences of this changes? Is it backwards compatible? @coderzc Please explain this detail.
   
   In the original implementation, all key-value are removed when update subscription properties, this will cause internal property loss, so use a special prefix to prevent internal properties removed by `updateSubscriptionProperties `
   
   https://github.com/apache/pulsar/blob/c1c3c86f83e6bece06c29bbdd6e5b1b1cb841ef5/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L1221-L1232
   
   https://github.com/apache/pulsar/blob/2db3ed5a0e1df52da83244baada63c5a0da5924f/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L338


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r952242956


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -86,14 +86,20 @@ enum IndividualDeletedEntries {
      */
     Map<String, String> getCursorProperties();
 
-     /**
-      * Updates the properties.
-      * @param cursorProperties
-      * @return a handle to the result of the operation
-      */
-     default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
-         return CompletableFuture.completedFuture(null);
-     }
+    /**
+     * Add a property associated with the cursor.
+     */
+    void putCursorProperty(String key, String value);
+
+    /**
+     * Set all properties associated with the cursor, internal properties are still preserved.
+     */
+    void setAllCursorProperties(Map<String, String> properties);
+
+    /**
+     * Remove a property associated with the cursor.
+     */
+    void removeCursorProperty(String key);
 

Review Comment:
   > What's wrong with using the synchronous method?
   
   The main reason to avoid blocking methods in Pulsar is to prevent blocking Netty IO threads. Netty uses a small number of IO threads to execute and using blocking on these threads could cause the execution of work assigned to Netty event loops to not proceed as expected (other unrelated connections might hang / timeout / get delayed etc.). 
   There are also other reasons to avoid synchronous methods such as "[PIP-149 Making the REST Admin API fully async](https://github.com/apache/pulsar/issues/14365)". (cc @Technoboy- and @mattisonchao)
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r952245005


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -86,14 +86,20 @@ enum IndividualDeletedEntries {
      */
     Map<String, String> getCursorProperties();
 
-     /**
-      * Updates the properties.
-      * @param cursorProperties
-      * @return a handle to the result of the operation
-      */
-     default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
-         return CompletableFuture.completedFuture(null);
-     }
+    /**
+     * Add a property associated with the cursor.
+     */
+    void putCursorProperty(String key, String value);
+
+    /**
+     * Set all properties associated with the cursor, internal properties are still preserved.
+     */
+    void setAllCursorProperties(Map<String, String> properties);
+
+    /**
+     * Remove a property associated with the cursor.
+     */
+    void removeCursorProperty(String key);
 

Review Comment:
   > If keep using an asynchronous method will not ensure data consistency when the concurrent modification.
   
   It should be possible to resolve this issue without changing the API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r969143558


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -365,6 +367,48 @@ public void operationFailed(MetaStoreException e) {
         return updateCursorPropertiesResult;
     }
 
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+        Map<String, String> newProperties =
+                cursorProperties == null ? new HashMap<>() : new HashMap<>(cursorProperties);
+        long stamp = cursorPropertiesUpdateLock.writeLock();

Review Comment:
   @Jason918 Thanks for your suggestion.
   
   I use `ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {})` to serial execution operation instead of use lock and check if exist `BadVersionException` and retry the operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r966655922


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -119,6 +121,11 @@ public class ManagedCursorImpl implements ManagedCursor {
     protected final ManagedLedgerConfig config;
     protected final ManagedLedgerImpl ledger;
     private final String name;
+
+    public static final String CURSOR_INTERNAL_PROPERTY_PREFIX = "#pulsar.";

Review Comment:
   As mentioned in a comment, remove internal properties support from changes made in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973537812


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   Got it, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r962829422


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -365,6 +372,50 @@ public void operationFailed(MetaStoreException e) {
         return updateCursorPropertiesResult;
     }
 
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+        MutableObject<CompletableFuture<Void>> updateCursorPropertiesResultRef = new MutableObject<>();
+        CURSOR_PROPERTIES_UPDATER.updateAndGet(this, map -> {
+            Map<String, String> newProperties =
+                    cursorProperties == null ? new TreeMap<>() : new TreeMap<>(cursorProperties);
+            if (map != null) {
+                map.forEach((k, v) -> {
+                    if (((String) k).startsWith(CURSOR_INTERNAL_PROPERTY_PREFIX)) {
+                        newProperties.put((String) k, (String) v);
+                    }
+                });
+
+                updateCursorPropertiesResultRef.setValue(asyncUpdateCursorProperties(newProperties));

Review Comment:
   the new properties must take effect only after they are persisted to storage.
   otherwise:
   - in case of failure the value is no consistent on storage
   - other readers of the value won't see the value consistently (who reads from this node may see something that will never been read)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973011027


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -86,14 +86,26 @@ enum IndividualDeletedEntries {
      */
     Map<String, String> getCursorProperties();
 
-     /**
-      * Updates the properties.
-      * @param cursorProperties
-      * @return a handle to the result of the operation
-      */
-     default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
-         return CompletableFuture.completedFuture(null);
-     }
+    /**
+     * Add a property associated with the cursor.
+     *
+     * @return a handle to the result of the operation
+     */
+    CompletableFuture<Void> putCursorProperty(String key, String value);

Review Comment:
   Ok, I add some notes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973106613


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +328,27 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> computeCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
-        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
-            @Override
-            public void operationComplete(ManagedCursorInfo info, Stat stat) {
-                ManagedCursorInfo copy = ManagedCursorInfo
-                        .newBuilder(info)
-                        .clearCursorProperties()
-                        .addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
-                        .build();
-                ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
-                        name, copy, stat, new MetaStoreCallback<>() {
+
+        final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;

Review Comment:
   Question: Are we using this value with the ZK versioning mechanism for concurrency safety?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r963025838


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -365,6 +372,50 @@ public void operationFailed(MetaStoreException e) {
         return updateCursorPropertiesResult;
     }
 
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+        MutableObject<CompletableFuture<Void>> updateCursorPropertiesResultRef = new MutableObject<>();
+        CURSOR_PROPERTIES_UPDATER.updateAndGet(this, map -> {
+            Map<String, String> newProperties =
+                    cursorProperties == null ? new TreeMap<>() : new TreeMap<>(cursorProperties);
+            if (map != null) {
+                map.forEach((k, v) -> {
+                    if (((String) k).startsWith(CURSOR_INTERNAL_PROPERTY_PREFIX)) {
+                        newProperties.put((String) k, (String) v);
+                    }
+                });
+
+                updateCursorPropertiesResultRef.setValue(asyncUpdateCursorProperties(newProperties));

Review Comment:
   It makes sense, I use a non-reentrant lock instead of `AtomicReferenceFieldUpdater` to ensure the new properties take effect after they are persisted to storage. And keep using an asynchronous method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1228271520

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r952435063


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -86,14 +86,20 @@ enum IndividualDeletedEntries {
      */
     Map<String, String> getCursorProperties();
 
-     /**
-      * Updates the properties.
-      * @param cursorProperties
-      * @return a handle to the result of the operation
-      */
-     default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
-         return CompletableFuture.completedFuture(null);
-     }
+    /**
+     * Add a property associated with the cursor.
+     */
+    void putCursorProperty(String key, String value);
+
+    /**
+     * Set all properties associated with the cursor, internal properties are still preserved.
+     */
+    void setAllCursorProperties(Map<String, String> properties);
+
+    /**
+     * Remove a property associated with the cursor.
+     */
+    void removeCursorProperty(String key);
 

Review Comment:
   > > What's wrong with using the synchronous method?
   > 
   > The main reason to avoid blocking methods in Pulsar is to prevent blocking Netty IO threads. Netty uses a small number of IO threads to execute and using blocking on these threads could cause the execution of work assigned to Netty event loops to not proceed as expected (other unrelated connections might hang / timeout / get delayed etc.). There are also other reasons to avoid synchronous methods such as "[PIP-149 Making the REST Admin API fully async](https://github.com/apache/pulsar/issues/14365)". (cc @Technoboy- and @mattisonchao)
   
   I see, I think can keep `CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties)` asynchronous method, but need first update variable before update store.
   
   And add `CompletableFuture<Void> putCursorProperty(String key, String value)`/`CompletableFuture<Void> setCursorProperties(Map<String, String> properties)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
lhotari commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1241515020

   > In the original implementation, all key-value is removed when update subscription properties, this will cause internal property loss, so use a special prefix to prevent internal properties removed by updateSubscriptionProperties.
   
   @coderzc I still don't fully understand that. What do you mean with "prevent internal properties removed by updateSubscriptionProperties"? What are "internal properties"?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1239625323

   @lhotari PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r972910488


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -350,21 +356,48 @@ public void operationComplete(Void result, Stat stat) {
                     public void operationFailed(MetaStoreException e) {
                         log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties, e);
-                        updateCursorPropertiesResult.completeExceptionally(e);
+                        // if resource is updated by other operate then we will get bad-version exception
+                        // so, retry the operation.
+                        if (e instanceof ManagedLedgerException.BadVersionException) {
+                            asyncReadAndUpdateCursorProperties(updateFunction).whenComplete((__, ex) -> {

Review Comment:
   +1 for return conflict operation exception directly. (easy means stronger.
   
   I'm not saying retry is a bad idea. I just think it's not necessary(for engineering effort). if we can handle all potential problems and reach a consensus with the user, I definitely support it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973149095


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   could we add the comments on here or `asyncUpdateCursorInfo` method to alter other contributors won't forget to update it? 
   also, we need to ensure the updated order.
   - `managedCursorInfo` before `cursorLedgerStat`.
   
   if it is out of order may cause the update will use the new `cursorLedgerStat` value and old `managedCursorInfo`.
   
   Since the concurrent problem is hard to review and debug, more information will very helpful. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973149095


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   could we add the comments on here or `asyncUpdateCursorInfo` method to alter other contributors won't forget to update it? 
   also, we need to ensure the updated order.
   - managedCursorInfo before cursorLedgerStat.
   
   if it is out of order may cause the update will use the new cursorLedgerStat value and old managedCursorInfo.
   
   Because the concurrent problem is hard to find and debug. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r970419491


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +326,25 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> asyncReadAndUpdateCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
         ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
             @Override
             public void operationComplete(ManagedCursorInfo info, Stat stat) {
+                final Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);

Review Comment:
   @Jason918 Maybe we can be removed `asyncGetCursorInfo` and cache `ManagedCursorInfo`, What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r970419491


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +326,25 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> asyncReadAndUpdateCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
         ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
             @Override
             public void operationComplete(ManagedCursorInfo info, Stat stat) {
+                final Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);

Review Comment:
   @Jason918 Maybe we can be removed `asyncGetCursorInfo` and cache `ManagedCursorInfo`, what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973530502


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   > Also, I think one more get operation might be fine since this feature isn't used very often.
   > But when we add this variable it will affect other logic and introduce the risk of data inconsistency.
   
   @mattisonchao 
   This PR have this "get" at first. It's removed under Penghui's suggestion.
   After check the usage of `cursorLedgerStat`, I think it's ok to drop the "get" op.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1235405965

   @lhotari Please review again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r952435063


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -86,14 +86,20 @@ enum IndividualDeletedEntries {
      */
     Map<String, String> getCursorProperties();
 
-     /**
-      * Updates the properties.
-      * @param cursorProperties
-      * @return a handle to the result of the operation
-      */
-     default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
-         return CompletableFuture.completedFuture(null);
-     }
+    /**
+     * Add a property associated with the cursor.
+     */
+    void putCursorProperty(String key, String value);
+
+    /**
+     * Set all properties associated with the cursor, internal properties are still preserved.
+     */
+    void setAllCursorProperties(Map<String, String> properties);
+
+    /**
+     * Remove a property associated with the cursor.
+     */
+    void removeCursorProperty(String key);
 

Review Comment:
   > > What's wrong with using the synchronous method?
   > 
   > The main reason to avoid blocking methods in Pulsar is to prevent blocking Netty IO threads. Netty uses a small number of IO threads to execute and using blocking on these threads could cause the execution of work assigned to Netty event loops to not proceed as expected (other unrelated connections might hang / timeout / get delayed etc.). There are also other reasons to avoid synchronous methods such as "[PIP-149 Making the REST Admin API fully async](https://github.com/apache/pulsar/issues/14365)". (cc @Technoboy- and @mattisonchao)
   
   I see, I think can keep `CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties)` asynchronous method, but need first update variable before update store.
   
   And add `CompletableFuture<Void> putCursorProperty(String key, String value)`/`CompletableFuture<Void> removeCursorProperty(String key)` to add/remove single key value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1237658762

   @lhotari @eolivelli I have already improved the code, please review it again, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1235425709

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1241523618

   > > In the original implementation, all key-value is removed when update subscription properties, this will cause internal property loss, so use a special prefix to prevent internal properties removed by updateSubscriptionProperties.
   > 
   > @coderzc I still don't fully understand that. What do you mean with "prevent internal properties removed by updateSubscriptionProperties"? What are "internal properties"?
   
   @lhotari The internal properties are can't be set by `updateSubscriptionProperties`, them is invisible to the customer and only used to implement some pulsar feature, for example, store the list of delayed message index bucket snapshot in the [PIP-195](https://github.com/apache/pulsar/issues/16763)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r968068015


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -365,6 +367,48 @@ public void operationFailed(MetaStoreException e) {
         return updateCursorPropertiesResult;
     }
 
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+        Map<String, String> newProperties =
+                cursorProperties == null ? new HashMap<>() : new HashMap<>(cursorProperties);
+        long stamp = cursorPropertiesUpdateLock.writeLock();

Review Comment:
   > This lock will block admin handler thread, and the unlock depends on metadata callback thread, it has potential deadlock risk (which depends on if setCursorProperties is called on metadata callback thread).
   
   Use the `ledger.getExecutor().executeOrdered(ledger.getName(), () -> {})` to make execute them on the same thread to instead of use the lock? It would be better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1240172890

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1245456931

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973106613


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +328,27 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> computeCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
-        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
-            @Override
-            public void operationComplete(ManagedCursorInfo info, Stat stat) {
-                ManagedCursorInfo copy = ManagedCursorInfo
-                        .newBuilder(info)
-                        .clearCursorProperties()
-                        .addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
-                        .build();
-                ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
-                        name, copy, stat, new MetaStoreCallback<>() {
+
+        final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;

Review Comment:
   Question: Are we using this value with the ZK versioning mechanism for concurrency safety? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973101218


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java:
##########
@@ -68,4 +69,28 @@ public static CompletableFuture<Void> waitForAll(List<CompletableFuture<Void>> f
 
         return compositeFuture;
     }
+
+    public static <T> CompletableFuture<T> executeWithRetry(Supplier<CompletableFuture<T>> op,
+                                                            Class<? extends Exception> needRetryExceptionClass) {
+        CompletableFuture<T> resultFuture = new CompletableFuture<>();
+        op.get().whenComplete((res, ex) -> {
+            if (ex == null) {
+                resultFuture.complete(res);
+            } else {
+                if (needRetryExceptionClass.isAssignableFrom(ex.getClass())) {
+                    executeWithRetry(op, needRetryExceptionClass).whenComplete((res2, ex2) -> {
+                        if (ex2 == null) {
+                            resultFuture.complete(res2);
+                        } else {
+                            resultFuture.completeExceptionally(ex2);
+                        }
+                    });
+                    return;
+                }
+                resultFuture.completeExceptionally(ex);
+            }
+        });
+
+        return resultFuture;
+    }

Review Comment:
   `RetryUtil#retryAsynchronously` can't retry based on a specific exception, I will improve it in another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r972556310


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -350,21 +356,48 @@ public void operationComplete(Void result, Stat stat) {
                     public void operationFailed(MetaStoreException e) {
                         log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties, e);
-                        updateCursorPropertiesResult.completeExceptionally(e);
+                        // if resource is updated by other operate then we will get bad-version exception
+                        // so, retry the operation.
+                        if (e instanceof ManagedLedgerException.BadVersionException) {
+                            asyncReadAndUpdateCursorProperties(updateFunction).whenComplete((__, ex) -> {

Review Comment:
   > If we want to update the key A to value 1 (current is 0)
   Another operation removed the key A
   After the retry, we will put key A back
   
   I think the zookeeper will guarantee that the task execution sequence and all callbacks are executed serially in the same thread, so it conforms to linearizable semantics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1246668513

   @Jason918 @codelipenghui I removed `asyncGetCursorInfo` operation and cached `ManagedCursorInfo`, all update operations will with a local stat, and all local cache data may old than zookeeper storage but they won't be newer than zookeeper storage. 
   I'm not sure if I have missed considerations. Please take a look, thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973160103


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   maybe I missing usage for PIP-195, I will check it tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r969143558


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -365,6 +367,48 @@ public void operationFailed(MetaStoreException e) {
         return updateCursorPropertiesResult;
     }
 
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+        Map<String, String> newProperties =
+                cursorProperties == null ? new HashMap<>() : new HashMap<>(cursorProperties);
+        long stamp = cursorPropertiesUpdateLock.writeLock();

Review Comment:
   @Jason918 Thanks for your suggestion.
   
   I use `ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {})` to serial execution operation and check if exist `BadVersionException` and retry the operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r968068015


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -365,6 +367,48 @@ public void operationFailed(MetaStoreException e) {
         return updateCursorPropertiesResult;
     }
 
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+        Map<String, String> newProperties =
+                cursorProperties == null ? new HashMap<>() : new HashMap<>(cursorProperties);
+        long stamp = cursorPropertiesUpdateLock.writeLock();

Review Comment:
   > This lock will block admin handler thread, and the unlock depends on metadata callback thread, it has potential deadlock risk (which depends on if setCursorProperties is called on metadata callback thread).
   
   I think can add a timeout handle to avoid deadlock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #17164:
URL: https://github.com/apache/pulsar/pull/17164


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1249996310

   @lhotari   Please take the final review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r972607432


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -350,21 +356,48 @@ public void operationComplete(Void result, Stat stat) {
                     public void operationFailed(MetaStoreException e) {
                         log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties, e);
-                        updateCursorPropertiesResult.completeExceptionally(e);
+                        // if resource is updated by other operate then we will get bad-version exception
+                        // so, retry the operation.
+                        if (e instanceof ManagedLedgerException.BadVersionException) {
+                            asyncReadAndUpdateCursorProperties(updateFunction).whenComplete((__, ex) -> {

Review Comment:
   I think I made a confused example before 
   
   > If we want to update the key A to value 1 (current is 0)
   
   I want to say "we want to update the key A to value 1 based on the value is 0". If the key is absent, we don't need to do this operation.
   
   More detailed explanation:
   
   - It looks like we have data with versions 1, 2, 3
   - 2 operations will introduce data versions 4 and 5 based on data version 3
   - If we introduce retry logic here means the data from version 3 change to version 5 is acceptable (although the retry logic will perform the update operation based on data version actually)
   - But the caller doesn't know. If the update operation has some precondition based on version 3, the precondition has changed in data version 4. The caller doesn't want the update to continue. If the caller can pass a function, it should be ok, it is more like a compute operation to a Map. Of course, we can also use lock for the get-check-set operation for a Map. But here is an async API.
   
   If we only retry put/remove here, I think it should be ok. It looks like we can introduce a new API in the feature to support compute operation for cursor properties. If we have a set operation, it will overwrite the whole cursor properties, the incoming properties maybe just a new version based on the old one. The retry will mess up the properties.
   
   As @Jason918 said, it's a concurrency issue. If we have an internal retry, the callers need to guarantee the concurrent behavior. If we don't have an internal retry, the callers need to handle the update conflict. In my opinion, the former is more challenging than the latter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r972872271


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +328,26 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> computeCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
-        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
-            @Override
-            public void operationComplete(ManagedCursorInfo info, Stat stat) {
-                ManagedCursorInfo copy = ManagedCursorInfo
-                        .newBuilder(info)
-                        .clearCursorProperties()
-                        .addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
-                        .build();
-                ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
-                        name, copy, stat, new MetaStoreCallback<>() {
+
+        final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;
+
+        Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
+        ManagedCursorInfo copy = ManagedCursorInfo
+                .newBuilder(ManagedCursorImpl.this.managedCursorInfo)
+                .clearCursorProperties()
+                .addAllCursorProperties(buildStringPropertiesMap(newProperties))
+                .build();
+
+        ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
+                name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
                     @Override
                     public void operationComplete(Void result, Stat stat) {
                         log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties);
-                        ManagedCursorImpl.this.cursorProperties = cursorProperties;
+                        ManagedCursorImpl.this.cursorProperties = newProperties;

Review Comment:
   Do we need to add
   
   ```java
   ManagedCursorImpl.this.managedCursorInfo = cursorInfo;
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +328,26 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> computeCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
-        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
-            @Override
-            public void operationComplete(ManagedCursorInfo info, Stat stat) {
-                ManagedCursorInfo copy = ManagedCursorInfo
-                        .newBuilder(info)
-                        .clearCursorProperties()
-                        .addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
-                        .build();
-                ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
-                        name, copy, stat, new MetaStoreCallback<>() {
+
+        final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;
+
+        Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
+        ManagedCursorInfo copy = ManagedCursorInfo
+                .newBuilder(ManagedCursorImpl.this.managedCursorInfo)
+                .clearCursorProperties()
+                .addAllCursorProperties(buildStringPropertiesMap(newProperties))
+                .build();
+
+        ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
+                name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
                     @Override
                     public void operationComplete(Void result, Stat stat) {
                         log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties);
-                        ManagedCursorImpl.this.cursorProperties = cursorProperties;
+                        ManagedCursorImpl.this.cursorProperties = newProperties;

Review Comment:
   Please also add a test for this case. 
   If we missed here, the `persistPositionMetaStore` method would use the old ManagedCursorInfo to update zookeeper, but with the correct `cursorLedgerStat`



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java:
##########
@@ -68,4 +69,28 @@ public static CompletableFuture<Void> waitForAll(List<CompletableFuture<Void>> f
 
         return compositeFuture;
     }
+
+    public static <T> CompletableFuture<T> executeWithRetry(Supplier<CompletableFuture<T>> op,
+                                                            Class<? extends Exception> needRetryExceptionClass) {
+        CompletableFuture<T> resultFuture = new CompletableFuture<>();
+        op.get().whenComplete((res, ex) -> {
+            if (ex == null) {
+                resultFuture.complete(res);
+            } else {
+                if (needRetryExceptionClass.isAssignableFrom(ex.getClass())) {
+                    op.get().whenComplete((res2, ex2) -> {
+                        if (ex2 == null) {
+                            resultFuture.complete(res2);
+                        } else {
+                            resultFuture.completeExceptionally(ex2);
+                        }
+                    });

Review Comment:
   Will it only retry once?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1249371895

   @lhotari Please help review this PR again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973093200


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +328,27 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> computeCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
-        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
-            @Override
-            public void operationComplete(ManagedCursorInfo info, Stat stat) {
-                ManagedCursorInfo copy = ManagedCursorInfo
-                        .newBuilder(info)
-                        .clearCursorProperties()
-                        .addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
-                        .build();
-                ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
-                        name, copy, stat, new MetaStoreCallback<>() {
+
+        final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;
+
+        Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
+        ManagedCursorInfo copy = ManagedCursorInfo
+                .newBuilder(ManagedCursorImpl.this.managedCursorInfo)
+                .clearCursorProperties()
+                .addAllCursorProperties(buildStringPropertiesMap(newProperties))
+                .build();
+
+        ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
+                name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
                     @Override
                     public void operationComplete(Void result, Stat stat) {
                         log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties);
-                        ManagedCursorImpl.this.cursorProperties = cursorProperties;
+                        ManagedCursorImpl.this.managedCursorInfo = copy;
+                        ManagedCursorImpl.this.cursorProperties = Collections.unmodifiableMap(newProperties);

Review Comment:
   Yes, the newProperties already a copy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973149095


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   could we add the comments on here or `asyncUpdateCursorInfo` method to alter other contributors won't forget to update it? 
   also, we need to ensure the updated order.
   - managedCursorInfo first
   - cursorLedgerStat last
   
   if it is out of order may cause the update will use the new cursorLedgerStat value and old managedCursorInfo.
   
   Because the concurrent problem is hard to find and debug. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973160103


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   maybe I missing usage for PIP-195, I will check it tomorrow. but this PR still LGTM.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   maybe I missing usage for PIP-195, I will check it tomorrow. but this PR is still LGTM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973057855


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java:
##########
@@ -68,4 +69,28 @@ public static CompletableFuture<Void> waitForAll(List<CompletableFuture<Void>> f
 
         return compositeFuture;
     }
+
+    public static <T> CompletableFuture<T> executeWithRetry(Supplier<CompletableFuture<T>> op,
+                                                            Class<? extends Exception> needRetryExceptionClass) {
+        CompletableFuture<T> resultFuture = new CompletableFuture<>();
+        op.get().whenComplete((res, ex) -> {
+            if (ex == null) {
+                resultFuture.complete(res);
+            } else {
+                if (needRetryExceptionClass.isAssignableFrom(ex.getClass())) {
+                    executeWithRetry(op, needRetryExceptionClass).whenComplete((res2, ex2) -> {
+                        if (ex2 == null) {
+                            resultFuture.complete(res2);
+                        } else {
+                            resultFuture.completeExceptionally(ex2);
+                        }
+                    });
+                    return;
+                }
+                resultFuture.completeExceptionally(ex);
+            }
+        });
+
+        return resultFuture;
+    }

Review Comment:
   Retry without any backoff is not usually recommended. could we use the exponential backoff solution here?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +328,27 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> computeCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
-        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
-            @Override
-            public void operationComplete(ManagedCursorInfo info, Stat stat) {
-                ManagedCursorInfo copy = ManagedCursorInfo
-                        .newBuilder(info)
-                        .clearCursorProperties()
-                        .addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
-                        .build();
-                ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
-                        name, copy, stat, new MetaStoreCallback<>() {
+
+        final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;
+
+        Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
+        ManagedCursorInfo copy = ManagedCursorInfo
+                .newBuilder(ManagedCursorImpl.this.managedCursorInfo)
+                .clearCursorProperties()
+                .addAllCursorProperties(buildStringPropertiesMap(newProperties))
+                .build();
+
+        ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
+                name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
                     @Override
                     public void operationComplete(Void result, Stat stat) {
                         log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties);
-                        ManagedCursorImpl.this.cursorProperties = cursorProperties;
+                        ManagedCursorImpl.this.managedCursorInfo = copy;
+                        ManagedCursorImpl.this.cursorProperties = Collections.unmodifiableMap(newProperties);

Review Comment:
   `Collections.unmodifiableMap` alone won't make a map immutable if the map gets modified outside of the wrapper. is `newProperties` already a copy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r952435063


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -86,14 +86,20 @@ enum IndividualDeletedEntries {
      */
     Map<String, String> getCursorProperties();
 
-     /**
-      * Updates the properties.
-      * @param cursorProperties
-      * @return a handle to the result of the operation
-      */
-     default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
-         return CompletableFuture.completedFuture(null);
-     }
+    /**
+     * Add a property associated with the cursor.
+     */
+    void putCursorProperty(String key, String value);
+
+    /**
+     * Set all properties associated with the cursor, internal properties are still preserved.
+     */
+    void setAllCursorProperties(Map<String, String> properties);
+
+    /**
+     * Remove a property associated with the cursor.
+     */
+    void removeCursorProperty(String key);
 

Review Comment:
   > > What's wrong with using the synchronous method?
   > 
   > The main reason to avoid blocking methods in Pulsar is to prevent blocking Netty IO threads. Netty uses a small number of IO threads to execute and using blocking on these threads could cause the execution of work assigned to Netty event loops to not proceed as expected (other unrelated connections might hang / timeout / get delayed etc.). There are also other reasons to avoid synchronous methods such as "[PIP-149 Making the REST Admin API fully async](https://github.com/apache/pulsar/issues/14365)". (cc @Technoboy- and @mattisonchao)
   
   I see, I think can keep `CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties)` asynchronous method, but need first update variable before update store.
   
   And add `CompletableFuture<Void> putCursorProperty(String key, String value)`/`CompletableFuture<Void> removeCursorProperty(String key)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1241383823

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
lhotari commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1241536546

   > @lhotari The internal properties are can't be set by `updateSubscriptionProperties`, them is invisible to the customer and only used to implement some pulsar feature, for example, store the list of delayed message index bucket snapshot in the [PIP-195](https://github.com/apache/pulsar/issues/16763)
   
   This is relevant context for the code changes that have been added. Why is the support for internal properties added in this PR? The PR title or description doesn't mention that. I'm suggesting that internal property support is removed from this PR and added in another PR with proper tests and code comments.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r972547777


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -350,21 +356,48 @@ public void operationComplete(Void result, Stat stat) {
                     public void operationFailed(MetaStoreException e) {
                         log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties, e);
-                        updateCursorPropertiesResult.completeExceptionally(e);
+                        // if resource is updated by other operate then we will get bad-version exception
+                        // so, retry the operation.
+                        if (e instanceof ManagedLedgerException.BadVersionException) {
+                            asyncReadAndUpdateCursorProperties(updateFunction).whenComplete((__, ex) -> {

Review Comment:
   > If we want to update the key A to value 1 (current is 0)
   > Another operation removed the key A
   > After the retry, we will put key A back
   
   From caller's perspective, if they calls `set` and `remove` concurrently, the resulting order should be undefined. 
   Maybe the operating order should be maintained by caller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973108503


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +328,27 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> computeCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
-        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
-            @Override
-            public void operationComplete(ManagedCursorInfo info, Stat stat) {
-                ManagedCursorInfo copy = ManagedCursorInfo
-                        .newBuilder(info)
-                        .clearCursorProperties()
-                        .addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
-                        .build();
-                ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
-                        name, copy, stat, new MetaStoreCallback<>() {
+
+        final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;

Review Comment:
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973104625


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   Introducing this variable worries me a bit, because it means remembering to update this variable every update. otherwise, we may lose some data in managed cursor info. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973149095


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   could we add the comments on here or `asyncUpdateCursorInfo` method to alter other contributors won't forget to update it? 
   also, we need to ensure the updated order.
   - managedCursorInfo first
   - cursorLedgerStat last
   if it is out of order may cause the update will use the new cursorLedgerStat value and old managedCursorInfo.
   
   Because the concurrent problem is hard to find and debug. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973158399


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   Also, I think one more get operation might be fine since this feature isn't used very often.
   But when we add this variable it will affect other logic and introduce the risk of data inconsistency.
   
   I think we need to reach a consensus here.
   /cc @codelipenghui  @lhotari  @eolivelli @Jason918  WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1250586880

   @lhotari @eolivelli @Jason918 @codelipenghui @mattisonchao PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973516304


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   > could we add the comments on here or asyncUpdateCursorInfo method to alter other contributors won't forget to update it?
   
   I package `updateCursorLedgerStat` method to update cursorLedgerStat and managedCursorInfo at the same time and add some notes to alter other contributors don't update individually cursorLedgerStat.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r967575217


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -365,6 +368,48 @@ public void operationFailed(MetaStoreException e) {
         return updateCursorPropertiesResult;
     }
 
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+        Map<String, String> newProperties =
+                cursorProperties == null ? new TreeMap<>() : new TreeMap<>(cursorProperties);

Review Comment:
   Any reason of using `TreeMap` stead of `HashMap`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r970364061


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +326,25 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> asyncReadAndUpdateCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
         ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
             @Override
             public void operationComplete(ManagedCursorInfo info, Stat stat) {
+                final Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);

Review Comment:
   Update the zookeeper with local stat?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r970418380


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +326,25 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> asyncReadAndUpdateCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
         ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {

Review Comment:
   > Instead, we can only update the local properties cache if we get a BadVersion exception
   
   Will this introduce inconsistency between broker memory state and zookeeper storage?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1241539497

   > This is relevant context for the code changes that have been added. Why is the support for internal properties added in this PR? The PR title or description doesn't mention that. I'm suggesting that internal property support is removed from this PR and added in another PR with proper tests and code comments.
   
   Ok. I see,I will split PR.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r952138443


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -86,14 +86,20 @@ enum IndividualDeletedEntries {
      */
     Map<String, String> getCursorProperties();
 
-     /**
-      * Updates the properties.
-      * @param cursorProperties
-      * @return a handle to the result of the operation
-      */
-     default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
-         return CompletableFuture.completedFuture(null);
-     }
+    /**
+     * Add a property associated with the cursor.
+     */
+    void putCursorProperty(String key, String value);
+
+    /**
+     * Set all properties associated with the cursor, internal properties are still preserved.
+     */
+    void setAllCursorProperties(Map<String, String> properties);
+
+    /**
+     * Remove a property associated with the cursor.
+     */
+    void removeCursorProperty(String key);
 

Review Comment:
   Instead of modifying the interface, it would be better to fix the issue in the existing method, `CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties)`.
   
   Adding a new "blocking API" instead of using an asynchronous method will cause issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r952211081


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -86,14 +86,20 @@ enum IndividualDeletedEntries {
      */
     Map<String, String> getCursorProperties();
 
-     /**
-      * Updates the properties.
-      * @param cursorProperties
-      * @return a handle to the result of the operation
-      */
-     default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
-         return CompletableFuture.completedFuture(null);
-     }
+    /**
+     * Add a property associated with the cursor.
+     */
+    void putCursorProperty(String key, String value);
+
+    /**
+     * Set all properties associated with the cursor, internal properties are still preserved.
+     */
+    void setAllCursorProperties(Map<String, String> properties);
+
+    /**
+     * Remove a property associated with the cursor.
+     */
+    void removeCursorProperty(String key);
 

Review Comment:
   > Instead of modifying the interface, it would be better to fix the issue in the existing method, `CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties)`.
   > 
   > Adding a new "blocking API" instead of using an asynchronous method will cause issues.
   
   If keep using an asynchronous method will not ensure data consistency when the concurrent modification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r952211081


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -86,14 +86,20 @@ enum IndividualDeletedEntries {
      */
     Map<String, String> getCursorProperties();
 
-     /**
-      * Updates the properties.
-      * @param cursorProperties
-      * @return a handle to the result of the operation
-      */
-     default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
-         return CompletableFuture.completedFuture(null);
-     }
+    /**
+     * Add a property associated with the cursor.
+     */
+    void putCursorProperty(String key, String value);
+
+    /**
+     * Set all properties associated with the cursor, internal properties are still preserved.
+     */
+    void setAllCursorProperties(Map<String, String> properties);
+
+    /**
+     * Remove a property associated with the cursor.
+     */
+    void removeCursorProperty(String key);
 

Review Comment:
   > Instead of modifying the interface, it would be better to fix the issue in the existing method, `CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties)`.
   
   If keep using an asynchronous method will not ensure data consistency when the concurrent modification.
   
   > Adding a new "blocking API" instead of using an asynchronous method will cause issues.
   
   What's wrong with using the synchronous method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1247885352

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r972118230


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -350,21 +356,48 @@ public void operationComplete(Void result, Stat stat) {
                     public void operationFailed(MetaStoreException e) {
                         log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties, e);
-                        updateCursorPropertiesResult.completeExceptionally(e);
+                        // if resource is updated by other operate then we will get bad-version exception
+                        // so, retry the operation.
+                        if (e instanceof ManagedLedgerException.BadVersionException) {
+                            asyncReadAndUpdateCursorProperties(updateFunction).whenComplete((__, ex) -> {

Review Comment:
   For example:
   
   If we want to update the key A to value 1 (current is 0)
   Another operation removed the key A
   After the retry, we will put key A back
   
   We'd better expose the Exception to the callers? they might need to read the properties again and reorganize what to modify



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r969080552


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -365,6 +367,48 @@ public void operationFailed(MetaStoreException e) {
         return updateCursorPropertiesResult;
     }
 
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+        Map<String, String> newProperties =
+                cursorProperties == null ? new HashMap<>() : new HashMap<>(cursorProperties);
+        long stamp = cursorPropertiesUpdateLock.writeLock();

Review Comment:
   @coderzc I think timeout is not the best option here.
   
   You can take a look at `MetadataCacheImpl#readModifyUpdateOrCreate`. It implements atomic update on metadata cache and supports concurrent writes. The main idea is update metadata store with version check and retry the op if there is concurrent updates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r969143558


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -365,6 +367,48 @@ public void operationFailed(MetaStoreException e) {
         return updateCursorPropertiesResult;
     }
 
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+        Map<String, String> newProperties =
+                cursorProperties == null ? new HashMap<>() : new HashMap<>(cursorProperties);
+        long stamp = cursorPropertiesUpdateLock.writeLock();

Review Comment:
   @Jason918 Thanks for your suggestion.
   
   I will serial execution operation instead of using lock and check if exist `BadVersionException` and retry the operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973516304


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   > could we add the comments on here or asyncUpdateCursorInfo method to alter other contributors won't forget to update it?
   also, we need to ensure the updated order.
   
   I package `updateCursorLedgerStat` method to update `cursorLedgerStat` and `managedCursorInfo` at the same tim. Also, add some notes to remind other contributors don't update `cursorLedgerStat` alone.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973516304


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   > could we add the comments on here or asyncUpdateCursorInfo method to alter other contributors won't forget to update it?
   
   I package `updateCursorLedgerStat` method to update cursorLedgerStat and managedCursorInfo at the same tim. Also, add some notes to remind other contributors don't update cursorLedgerStat alone.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r972618141


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -350,21 +356,48 @@ public void operationComplete(Void result, Stat stat) {
                     public void operationFailed(MetaStoreException e) {
                         log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties, e);
-                        updateCursorPropertiesResult.completeExceptionally(e);
+                        // if resource is updated by other operate then we will get bad-version exception
+                        // so, retry the operation.
+                        if (e instanceof ManagedLedgerException.BadVersionException) {
+                            asyncReadAndUpdateCursorProperties(updateFunction).whenComplete((__, ex) -> {

Review Comment:
   > If we only retry put/remove here, I think it should be ok. It looks like we can introduce a new API in the feature to support compute operation for cursor properties. If we have a set operation, it will overwrite the whole cursor properties, the incoming properties maybe just a new version based on the old one. The retry will mess up the properties.
   
   I agree this, I think we can use `computeCursorProperties(Function<Map<String, String>, Map<String, String>> updateFunction)` instead of `setCursorProperties(Map<String, String> cursorProperties)`.
   
   > As @Jason918 said, it's a concurrency issue. If we have an internal retry, the callers need to guarantee the concurrent behavior. If we don't have an internal retry, the callers need to handle the update conflict. In my opinion, the former is more challenging than the latter
   
   It looks like giving the callers handle the update conflict is better. Can we add a parameter to choose whether need to automatically retry?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r972962686


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -86,14 +86,26 @@ enum IndividualDeletedEntries {
      */
     Map<String, String> getCursorProperties();
 
-     /**
-      * Updates the properties.
-      * @param cursorProperties
-      * @return a handle to the result of the operation
-      */
-     default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
-         return CompletableFuture.completedFuture(null);
-     }
+    /**
+     * Add a property associated with the cursor.
+     *
+     * @return a handle to the result of the operation
+     */
+    CompletableFuture<Void> putCursorProperty(String key, String value);

Review Comment:
   We should add some notes here (like what exception will be set in this CompletableFuture, if there are concurrent modification) if we want decide to let caller to handle conflict.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r973149095


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -175,6 +177,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     // Stat of the cursor z-node
     private volatile Stat cursorLedgerStat;
 
+    private volatile ManagedCursorInfo managedCursorInfo;

Review Comment:
   could we add the comments on here or `asyncUpdateCursorInfo` method to alter other contributors won't forget to update it? 
   also, we need to ensure the updated order.
   - `managedCursorInfo` before `cursorLedgerStat`.
   
   if it is out of order may cause the update will use the new `cursorLedgerStat` value and old `managedCursorInfo`.
   
   Because the concurrent problem is hard to find and debug. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1227503100

   @lhotari PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on pull request #17164: [improve][broker] Make cursor properties support modify single value concurrently.

Posted by GitBox <gi...@apache.org>.
lhotari commented on PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#issuecomment-1236857030

   In the description it says: "use a special prefix to prevent internal property loss." 
   Why is a special prefix needed? What are the consequences of this changes? Is it backwards compatible?
   @coderzc Please explain this detail. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org