You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/19 01:42:08 UTC

[pulsar] branch master updated: [improve][broker] Make cursor properties support modify single value concurrently. (#17164)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b85c436f08 [improve][broker] Make cursor properties support modify single value concurrently. (#17164)
2b85c436f08 is described below

commit 2b85c436f08af969eb625406912621e98321f890
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Mon Sep 19 09:42:00 2022 +0800

    [improve][broker] Make cursor properties support modify single value concurrently. (#17164)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   | 37 +++++++---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 84 +++++++++++++++-------
 .../apache/bookkeeper/mledger/util/Futures.java    | 25 +++++++
 .../mledger/impl/ManagedCursorContainerTest.java   | 17 ++++-
 .../mledger/impl/ManagedCursorPropertiesTest.java  | 47 +++++++++++-
 5 files changed, 172 insertions(+), 38 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 3f6852e4085..17dbac09a22 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -86,14 +86,35 @@ public interface ManagedCursor {
      */
     Map<String, String> getCursorProperties();
 
-     /**
-      * Updates the properties.
-      * @param cursorProperties
-      * @return a handle to the result of the operation
-      */
-     default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
-         return CompletableFuture.completedFuture(null);
-     }
+    /**
+     * Add a property associated with the cursor.
+     *
+     * Note: {@link ManagedLedgerException.BadVersionException} will be set in this {@link CompletableFuture},
+     * if there are concurrent modification and store data has changed.
+     *
+     * @return a handle to the result of the operation
+     */
+    CompletableFuture<Void> putCursorProperty(String key, String value);
+
+    /**
+     * Set all properties associated with the cursor.
+     *
+     * Note: {@link ManagedLedgerException.BadVersionException} will be set in this {@link CompletableFuture},
+     * if there are concurrent modification and store data has changed.
+     *
+     * @return a handle to the result of the operation
+     */
+    CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties);
+
+    /**
+     * Remove a property associated with the cursor.
+     *
+     * Note: {@link ManagedLedgerException.BadVersionException} will be set in this {@link CompletableFuture},
+     * if there are concurrent modification and store data has changed.
+     *
+     * @return a handle to the result of the operation
+     */
+    CompletableFuture<Void> removeCursorProperty(String key);
 
     /**
      * Add a property associated with the last stored position.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 2bfa4ebb7fe..6d595e76dc1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.BKException;
@@ -119,6 +120,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     protected final ManagedLedgerConfig config;
     protected final ManagedLedgerImpl ledger;
     private final String name;
+
     private volatile Map<String, String> cursorProperties;
     private final BookKeeper.DigestType digestType;
 
@@ -173,7 +175,10 @@ public class ManagedCursorImpl implements ManagedCursor {
     private boolean isCursorLedgerReadOnly = true;
 
     // Stat of the cursor z-node
+    // NOTE: Don't update cursorLedgerStat alone,
+    // please use updateCursorLedgerStat method to update cursorLedgerStat and managedCursorInfo at the same time.
     private volatile Stat cursorLedgerStat;
+    private volatile ManagedCursorInfo managedCursorInfo;
 
     private static final LongPairConsumer<PositionImpl> positionRangeConverter = PositionImpl::new;
     private static final LongPairConsumer<PositionImplRecyclable> recyclePositionRangeConverter = (key, value) -> {
@@ -314,6 +319,11 @@ public class ManagedCursorImpl implements ManagedCursor {
         this.mbean = new ManagedCursorMXBeanImpl(this);
     }
 
+    private void updateCursorLedgerStat(ManagedCursorInfo cursorInfo, Stat stat) {
+        this.managedCursorInfo = cursorInfo;
+        this.cursorLedgerStat = stat;
+    }
+
     @Override
     public Map<String, Long> getProperties() {
         return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
@@ -324,25 +334,27 @@ public class ManagedCursorImpl implements ManagedCursor {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+    private CompletableFuture<Void> computeCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
-        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
-            @Override
-            public void operationComplete(ManagedCursorInfo info, Stat stat) {
-                ManagedCursorInfo copy = ManagedCursorInfo
-                        .newBuilder(info)
-                        .clearCursorProperties()
-                        .addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
-                        .build();
-                ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
-                        name, copy, stat, new MetaStoreCallback<>() {
+
+        final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;
+
+        Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
+        ManagedCursorInfo copy = ManagedCursorInfo
+                .newBuilder(ManagedCursorImpl.this.managedCursorInfo)
+                .clearCursorProperties()
+                .addAllCursorProperties(buildStringPropertiesMap(newProperties))
+                .build();
+
+        ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
+                name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
                     @Override
                     public void operationComplete(Void result, Stat stat) {
                         log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
                                 name, cursorProperties);
-                        ManagedCursorImpl.this.cursorProperties = cursorProperties;
-                        cursorLedgerStat = stat;
+                        ManagedCursorImpl.this.cursorProperties = Collections.unmodifiableMap(newProperties);
+                        updateCursorLedgerStat(copy, stat);
                         updateCursorPropertiesResult.complete(result);
                     }
 
@@ -353,18 +365,33 @@ public class ManagedCursorImpl implements ManagedCursor {
                         updateCursorPropertiesResult.completeExceptionally(e);
                     }
                 });
-            }
 
-            @Override
-            public void operationFailed(MetaStoreException e) {
-                log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
-                        name, cursorProperties, e);
-                updateCursorPropertiesResult.completeExceptionally(e);
-            }
-        });
         return updateCursorPropertiesResult;
     }
 
+    @Override
+    public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+        return computeCursorProperties(lastRead -> cursorProperties);
+    }
+
+    @Override
+    public CompletableFuture<Void> putCursorProperty(String key, String value) {
+        return computeCursorProperties(lastRead -> {
+            Map<String, String> newProperties = lastRead == null ? new HashMap<>() : new HashMap<>(lastRead);
+            newProperties.put(key, value);
+            return newProperties;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> removeCursorProperty(String key) {
+        return computeCursorProperties(lastRead -> {
+            Map<String, String> newProperties = lastRead == null ? new HashMap<>() : new HashMap<>(lastRead);
+            newProperties.remove(key);
+            return newProperties;
+        });
+    }
+
     @Override
     public boolean putProperty(String key, Long value) {
         if (lastMarkDeleteEntry != null) {
@@ -410,8 +437,9 @@ public class ManagedCursorImpl implements ManagedCursor {
             @Override
             public void operationComplete(ManagedCursorInfo info, Stat stat) {
 
-                cursorLedgerStat = stat;
+                updateCursorLedgerStat(info, stat);
                 lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;
+
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] Recover cursor last active to [{}]", ledger.getName(), name, lastActive);
                 }
@@ -2507,6 +2535,8 @@ public class ManagedCursorImpl implements ManagedCursor {
             return;
         }
 
+        final Stat lastCursorLedgerStat = cursorLedgerStat;
+
         // When closing we store the last mark-delete position in the z-node itself, so we won't need the cursor ledger,
         // hence we write it as -1. The cursor ledger is deleted once the z-node write is confirmed.
         ManagedCursorInfo.Builder info = ManagedCursorInfo.newBuilder() //
@@ -2528,11 +2558,12 @@ public class ManagedCursorImpl implements ManagedCursor {
             log.debug("[{}][{}]  Closing cursor at md-position: {}", ledger.getName(), name, position);
         }
 
-        ledger.getStore().asyncUpdateCursorInfo(ledger.getName(), name, info.build(), cursorLedgerStat,
+        ManagedCursorInfo cursorInfo = info.build();
+        ledger.getStore().asyncUpdateCursorInfo(ledger.getName(), name, cursorInfo, lastCursorLedgerStat,
                 new MetaStoreCallback<Void>() {
                     @Override
                     public void operationComplete(Void result, Stat stat) {
-                        cursorLedgerStat = stat;
+                        updateCursorLedgerStat(cursorInfo, stat);
                         callback.operationComplete(result, stat);
                     }
 
@@ -2548,7 +2579,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                                         new MetaStoreCallback<ManagedCursorInfo>() {
                                             @Override
                                             public void operationComplete(ManagedCursorInfo info, Stat stat) {
-                                                cursorLedgerStat = stat;
+                                                updateCursorLedgerStat(info, stat);
                                             }
 
                                             @Override
@@ -2934,7 +2965,6 @@ public class ManagedCursorImpl implements ManagedCursor {
                 final LedgerHandle oldLedger = cursorLedger;
                 cursorLedger = lh;
                 isCursorLedgerReadOnly = false;
-                cursorLedgerStat = stat;
 
                 // At this point the position had already been safely markdeleted
                 callback.operationComplete();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
index 6e09d096299..0b09118af7c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.util;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 
@@ -68,4 +69,28 @@ public class Futures {
 
         return compositeFuture;
     }
+
+    public static <T> CompletableFuture<T> executeWithRetry(Supplier<CompletableFuture<T>> op,
+                                                            Class<? extends Exception> needRetryExceptionClass) {
+        CompletableFuture<T> resultFuture = new CompletableFuture<>();
+        op.get().whenComplete((res, ex) -> {
+            if (ex == null) {
+                resultFuture.complete(res);
+            } else {
+                if (needRetryExceptionClass.isAssignableFrom(ex.getClass())) {
+                    executeWithRetry(op, needRetryExceptionClass).whenComplete((res2, ex2) -> {
+                        if (ex2 == null) {
+                            resultFuture.complete(res2);
+                        } else {
+                            resultFuture.completeExceptionally(ex2);
+                        }
+                    });
+                    return;
+                }
+                resultFuture.completeExceptionally(ex);
+            }
+        });
+
+        return resultFuture;
+    }
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 6446eca5ae0..312e09f846e 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -29,12 +29,12 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -74,6 +74,21 @@ public class ManagedCursorContainerTest {
             return Collections.emptyMap();
         }
 
+        @Override
+        public CompletableFuture<Void> putCursorProperty(String key, String value) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> removeCursorProperty(String key) {
+            return CompletableFuture.completedFuture(null);
+        }
+
         @Override
         public boolean putProperty(String key, Long value) {
             return false;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
index 74db9d791f3..c9082344d23 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
@@ -18,16 +18,21 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
 import static org.testng.Assert.assertEquals;
-
+import static org.testng.Assert.assertNull;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -209,6 +214,12 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
         assertEquals(c1.getProperties(), properties);
         assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);
 
+        c1.putCursorProperty("custom3", "Five").get();
+        cursorPropertiesUpdated.put("custom3", "Five");
+        c1.removeCursorProperty("custom1").get();
+        cursorPropertiesUpdated.remove("custom1");
+        assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);
+
         // Create a new factory to force a managed ledger close and recovery
         ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
         // Reopen the managed ledger
@@ -218,6 +229,38 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
         assertEquals(c1.getProperties(), properties);
         assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);
 
+        ledger.close();
+
         factory2.shutdown();
     }
+
+    @Test
+    public void testUpdateCursorPropertiesConcurrent() throws Exception {
+        ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
+        ManagedCursor c1 = ledger.openCursor("c1");
+
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        Map<String, String> map = new HashMap<>();
+        map.put("a", "1");
+        map.put("b", "2");
+        map.put("c", "3");
+
+        futures.add(executeWithRetry(() -> c1.setCursorProperties(map),
+                ManagedLedgerException.BadVersionException.class));
+
+        futures.add(executeWithRetry(() -> c1.putCursorProperty("a", "2"),
+                ManagedLedgerException.BadVersionException.class));
+
+        futures.add(executeWithRetry(() -> c1.removeCursorProperty("c"),
+                ManagedLedgerException.BadVersionException.class));
+
+        for (CompletableFuture<Void> future : futures) {
+            future.get();
+        }
+
+        assertEquals(c1.getCursorProperties().get("a"), "2");
+        assertEquals(c1.getCursorProperties().get("b"), "2");
+        assertNull(c1.getCursorProperties().get("c"));
+    }
 }