You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/19 01:42:08 UTC
[pulsar] branch master updated: [improve][broker] Make cursor properties support modify single value concurrently. (#17164)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2b85c436f08 [improve][broker] Make cursor properties support modify single value concurrently. (#17164)
2b85c436f08 is described below
commit 2b85c436f08af969eb625406912621e98321f890
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Mon Sep 19 09:42:00 2022 +0800
[improve][broker] Make cursor properties support modify single value concurrently. (#17164)
---
.../apache/bookkeeper/mledger/ManagedCursor.java | 37 +++++++---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 84 +++++++++++++++-------
.../apache/bookkeeper/mledger/util/Futures.java | 25 +++++++
.../mledger/impl/ManagedCursorContainerTest.java | 17 ++++-
.../mledger/impl/ManagedCursorPropertiesTest.java | 47 +++++++++++-
5 files changed, 172 insertions(+), 38 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 3f6852e4085..17dbac09a22 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -86,14 +86,35 @@ public interface ManagedCursor {
*/
Map<String, String> getCursorProperties();
- /**
- * Updates the properties.
- * @param cursorProperties
- * @return a handle to the result of the operation
- */
- default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
- return CompletableFuture.completedFuture(null);
- }
+ /**
+ * Add a property associated with the cursor.
+ *
+ * Note: {@link ManagedLedgerException.BadVersionException} will be set in this {@link CompletableFuture},
+ * if there are concurrent modification and store data has changed.
+ *
+ * @return a handle to the result of the operation
+ */
+ CompletableFuture<Void> putCursorProperty(String key, String value);
+
+ /**
+ * Set all properties associated with the cursor.
+ *
+ * Note: {@link ManagedLedgerException.BadVersionException} will be set in this {@link CompletableFuture},
+ * if there are concurrent modification and store data has changed.
+ *
+ * @return a handle to the result of the operation
+ */
+ CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties);
+
+ /**
+ * Remove a property associated with the cursor.
+ *
+ * Note: {@link ManagedLedgerException.BadVersionException} will be set in this {@link CompletableFuture},
+ * if there are concurrent modification and store data has changed.
+ *
+ * @return a handle to the result of the operation
+ */
+ CompletableFuture<Void> removeCursorProperty(String key);
/**
* Add a property associated with the last stored position.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 2bfa4ebb7fe..6d595e76dc1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
@@ -119,6 +120,7 @@ public class ManagedCursorImpl implements ManagedCursor {
protected final ManagedLedgerConfig config;
protected final ManagedLedgerImpl ledger;
private final String name;
+
private volatile Map<String, String> cursorProperties;
private final BookKeeper.DigestType digestType;
@@ -173,7 +175,10 @@ public class ManagedCursorImpl implements ManagedCursor {
private boolean isCursorLedgerReadOnly = true;
// Stat of the cursor z-node
+ // NOTE: Don't update cursorLedgerStat alone,
+ // please use updateCursorLedgerStat method to update cursorLedgerStat and managedCursorInfo at the same time.
private volatile Stat cursorLedgerStat;
+ private volatile ManagedCursorInfo managedCursorInfo;
private static final LongPairConsumer<PositionImpl> positionRangeConverter = PositionImpl::new;
private static final LongPairConsumer<PositionImplRecyclable> recyclePositionRangeConverter = (key, value) -> {
@@ -314,6 +319,11 @@ public class ManagedCursorImpl implements ManagedCursor {
this.mbean = new ManagedCursorMXBeanImpl(this);
}
+ private void updateCursorLedgerStat(ManagedCursorInfo cursorInfo, Stat stat) {
+ this.managedCursorInfo = cursorInfo;
+ this.cursorLedgerStat = stat;
+ }
+
@Override
public Map<String, Long> getProperties() {
return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
@@ -324,25 +334,27 @@ public class ManagedCursorImpl implements ManagedCursor {
return cursorProperties;
}
- @Override
- public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+ private CompletableFuture<Void> computeCursorProperties(
+ final Function<Map<String, String>, Map<String, String>> updateFunction) {
CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
- ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
- @Override
- public void operationComplete(ManagedCursorInfo info, Stat stat) {
- ManagedCursorInfo copy = ManagedCursorInfo
- .newBuilder(info)
- .clearCursorProperties()
- .addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
- .build();
- ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
- name, copy, stat, new MetaStoreCallback<>() {
+
+ final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;
+
+ Map<String, String> newProperties = updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
+ ManagedCursorInfo copy = ManagedCursorInfo
+ .newBuilder(ManagedCursorImpl.this.managedCursorInfo)
+ .clearCursorProperties()
+ .addAllCursorProperties(buildStringPropertiesMap(newProperties))
+ .build();
+
+ ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
+ name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
name, cursorProperties);
- ManagedCursorImpl.this.cursorProperties = cursorProperties;
- cursorLedgerStat = stat;
+ ManagedCursorImpl.this.cursorProperties = Collections.unmodifiableMap(newProperties);
+ updateCursorLedgerStat(copy, stat);
updateCursorPropertiesResult.complete(result);
}
@@ -353,18 +365,33 @@ public class ManagedCursorImpl implements ManagedCursor {
updateCursorPropertiesResult.completeExceptionally(e);
}
});
- }
- @Override
- public void operationFailed(MetaStoreException e) {
- log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
- name, cursorProperties, e);
- updateCursorPropertiesResult.completeExceptionally(e);
- }
- });
return updateCursorPropertiesResult;
}
+ @Override
+ public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+ return computeCursorProperties(lastRead -> cursorProperties);
+ }
+
+ @Override
+ public CompletableFuture<Void> putCursorProperty(String key, String value) {
+ return computeCursorProperties(lastRead -> {
+ Map<String, String> newProperties = lastRead == null ? new HashMap<>() : new HashMap<>(lastRead);
+ newProperties.put(key, value);
+ return newProperties;
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> removeCursorProperty(String key) {
+ return computeCursorProperties(lastRead -> {
+ Map<String, String> newProperties = lastRead == null ? new HashMap<>() : new HashMap<>(lastRead);
+ newProperties.remove(key);
+ return newProperties;
+ });
+ }
+
@Override
public boolean putProperty(String key, Long value) {
if (lastMarkDeleteEntry != null) {
@@ -410,8 +437,9 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
- cursorLedgerStat = stat;
+ updateCursorLedgerStat(info, stat);
lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;
+
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Recover cursor last active to [{}]", ledger.getName(), name, lastActive);
}
@@ -2507,6 +2535,8 @@ public class ManagedCursorImpl implements ManagedCursor {
return;
}
+ final Stat lastCursorLedgerStat = cursorLedgerStat;
+
// When closing we store the last mark-delete position in the z-node itself, so we won't need the cursor ledger,
// hence we write it as -1. The cursor ledger is deleted once the z-node write is confirmed.
ManagedCursorInfo.Builder info = ManagedCursorInfo.newBuilder() //
@@ -2528,11 +2558,12 @@ public class ManagedCursorImpl implements ManagedCursor {
log.debug("[{}][{}] Closing cursor at md-position: {}", ledger.getName(), name, position);
}
- ledger.getStore().asyncUpdateCursorInfo(ledger.getName(), name, info.build(), cursorLedgerStat,
+ ManagedCursorInfo cursorInfo = info.build();
+ ledger.getStore().asyncUpdateCursorInfo(ledger.getName(), name, cursorInfo, lastCursorLedgerStat,
new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
- cursorLedgerStat = stat;
+ updateCursorLedgerStat(cursorInfo, stat);
callback.operationComplete(result, stat);
}
@@ -2548,7 +2579,7 @@ public class ManagedCursorImpl implements ManagedCursor {
new MetaStoreCallback<ManagedCursorInfo>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
- cursorLedgerStat = stat;
+ updateCursorLedgerStat(info, stat);
}
@Override
@@ -2934,7 +2965,6 @@ public class ManagedCursorImpl implements ManagedCursor {
final LedgerHandle oldLedger = cursorLedger;
cursorLedger = lh;
isCursorLedgerReadOnly = false;
- cursorLedgerStat = stat;
// At this point the position had already been safely markdeleted
callback.operationComplete();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
index 6e09d096299..0b09118af7c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.util;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -68,4 +69,28 @@ public class Futures {
return compositeFuture;
}
+
+ public static <T> CompletableFuture<T> executeWithRetry(Supplier<CompletableFuture<T>> op,
+ Class<? extends Exception> needRetryExceptionClass) {
+ CompletableFuture<T> resultFuture = new CompletableFuture<>();
+ op.get().whenComplete((res, ex) -> {
+ if (ex == null) {
+ resultFuture.complete(res);
+ } else {
+ if (needRetryExceptionClass.isAssignableFrom(ex.getClass())) {
+ executeWithRetry(op, needRetryExceptionClass).whenComplete((res2, ex2) -> {
+ if (ex2 == null) {
+ resultFuture.complete(res2);
+ } else {
+ resultFuture.completeExceptionally(ex2);
+ }
+ });
+ return;
+ }
+ resultFuture.completeExceptionally(ex);
+ }
+ });
+
+ return resultFuture;
+ }
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 6446eca5ae0..312e09f846e 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -29,12 +29,12 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -74,6 +74,21 @@ public class ManagedCursorContainerTest {
return Collections.emptyMap();
}
+ @Override
+ public CompletableFuture<Void> putCursorProperty(String key, String value) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeCursorProperty(String key) {
+ return CompletableFuture.completedFuture(null);
+ }
+
@Override
public boolean putProperty(String key, Long value) {
return false;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
index 74db9d791f3..c9082344d23 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java
@@ -18,16 +18,21 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
import static org.testng.Assert.assertEquals;
-
+import static org.testng.Assert.assertNull;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -209,6 +214,12 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
assertEquals(c1.getProperties(), properties);
assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);
+ c1.putCursorProperty("custom3", "Five").get();
+ cursorPropertiesUpdated.put("custom3", "Five");
+ c1.removeCursorProperty("custom1").get();
+ cursorPropertiesUpdated.remove("custom1");
+ assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);
+
// Create a new factory to force a managed ledger close and recovery
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
// Reopen the managed ledger
@@ -218,6 +229,38 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
assertEquals(c1.getProperties(), properties);
assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);
+ ledger.close();
+
factory2.shutdown();
}
+
+ @Test
+ public void testUpdateCursorPropertiesConcurrent() throws Exception {
+ ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
+ ManagedCursor c1 = ledger.openCursor("c1");
+
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ Map<String, String> map = new HashMap<>();
+ map.put("a", "1");
+ map.put("b", "2");
+ map.put("c", "3");
+
+ futures.add(executeWithRetry(() -> c1.setCursorProperties(map),
+ ManagedLedgerException.BadVersionException.class));
+
+ futures.add(executeWithRetry(() -> c1.putCursorProperty("a", "2"),
+ ManagedLedgerException.BadVersionException.class));
+
+ futures.add(executeWithRetry(() -> c1.removeCursorProperty("c"),
+ ManagedLedgerException.BadVersionException.class));
+
+ for (CompletableFuture<Void> future : futures) {
+ future.get();
+ }
+
+ assertEquals(c1.getCursorProperties().get("a"), "2");
+ assertEquals(c1.getCursorProperties().get("b"), "2");
+ assertNull(c1.getCursorProperties().get("c"));
+ }
}