You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/26 02:37:03 UTC
[pulsar] branch branch-2.10 updated: ManagedLedger: move to FENCED state in case of BadVersionException (#17736)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 4c0b672a37a ManagedLedger: move to FENCED state in case of BadVersionException (#17736)
4c0b672a37a is described below
commit 4c0b672a37a2873d75a754e3125f4c5594fdc9c4
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Thu Sep 22 15:50:12 2022 +0200
ManagedLedger: move to FENCED state in case of BadVersionException (#17736)
(cherry picked from commit 63d4cf20e7b9c9bd24d3fcd5ba7397f0d185ce57)
---
.../bookkeeper/mledger/ManagedLedgerException.java | 4 ++
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 54 ++++++++++++++---
.../mledger/impl/ManagedLedgerErrorsTest.java | 70 ++++++++++++++++++++++
.../bookkeeper/mledger/impl/OffloadPrefixTest.java | 48 +++++++++++++++
.../broker/service/persistent/PersistentTopic.java | 41 +++++++------
.../broker/service/BrokerBkEnsemblesTests.java | 2 -
6 files changed, 189 insertions(+), 30 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 0280ceda725..3dd4b9e3ca1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -80,6 +80,10 @@ public class ManagedLedgerException extends Exception {
super(new Exception("Attempted to use a fenced managed ledger"));
}
+ public ManagedLedgerFencedException(String message) {
+ super(message);
+ }
+
public ManagedLedgerFencedException(Exception e) {
super(e);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 6f933962d46..6d3a628a367 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -415,6 +415,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@Override
public void operationFailed(MetaStoreException e) {
+ handleBadVersion(e);
if (e instanceof MetadataNotFoundException) {
callback.initializeFailed(new ManagedLedgerNotFoundException(e));
} else {
@@ -465,6 +466,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@Override
public void operationFailed(MetaStoreException e) {
+ handleBadVersion(e);
callback.initializeFailed(new ManagedLedgerException(e));
}
};
@@ -1005,6 +1007,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@Override
public void operationFailed(MetaStoreException e) {
+ handleBadVersion(e);
callback.deleteCursorFailed(e, ctx);
}
@@ -1295,6 +1298,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@Override
public void operationFailed(MetaStoreException e) {
log.error("[{}] Failed to terminate managed ledger: {}", name, e.getMessage());
+ handleBadVersion(e);
callback.terminateFailed(new ManagedLedgerException(e), ctx);
}
});
@@ -1379,6 +1383,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
public synchronized void asyncClose(final CloseCallback callback, final Object ctx) {
State state = STATE_UPDATER.get(this);
if (state == State.Fenced) {
+ cancelScheduledTasks();
factory.close(this);
callback.closeFailed(new ManagedLedgerFencedException(), ctx);
return;
@@ -1502,6 +1507,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage());
+ handleBadVersion(e);
mbean.startDataLedgerDeleteOp();
bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> {
mbean.endDataLedgerDeleteOp();
@@ -1510,14 +1516,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
BKException.getMessage(rc1));
}
}, null);
-
if (e instanceof BadVersionException) {
synchronized (ManagedLedgerImpl.this) {
log.error(
"[{}] Failed to update ledger list. z-node version mismatch. Closing managed ledger",
name);
lastLedgerCreationFailureTimestamp = clock.millis();
- STATE_UPDATER.set(ManagedLedgerImpl.this, State.Fenced);
// Return ManagedLedgerFencedException to addFailed callback
// to indicate that the ledger is now fenced and topic needs to be closed
clearPendingAddEntries(new ManagedLedgerFencedException(e));
@@ -1540,6 +1544,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
updateLedgersListAfterRollover(cb, newLedger);
}
}
+
+ private void handleBadVersion(Throwable e) {
+ if (e instanceof BadVersionException) {
+ setFenced();
+ }
+ }
private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback, LedgerInfo newLedger) {
if (!metadataMutex.tryLock()) {
// Defer update for later
@@ -2429,12 +2439,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(),
TOTAL_SIZE_UPDATER.get(this));
}
- if (STATE_UPDATER.get(this) == State.Closed) {
+ State currentState = STATE_UPDATER.get(this);
+ if (currentState == State.Closed) {
log.debug("[{}] Ignoring trimming request since the managed ledger was already closed", name);
trimmerMutex.unlock();
promise.completeExceptionally(new ManagedLedgerAlreadyClosedException("Can't trim closed ledger"));
return;
}
+ if (currentState == State.Fenced) {
+ log.debug("[{}] Ignoring trimming request since the managed ledger was already fenced", name);
+ trimmerMutex.unlock();
+ promise.completeExceptionally(new ManagedLedgerFencedException("Can't trim fenced ledger"));
+ return;
+ }
long slowestReaderLedgerId = -1;
if (!cursors.hasDurableCursors()) {
@@ -2523,7 +2540,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
return;
}
- if (STATE_UPDATER.get(this) == State.CreatingLedger // Give up now and schedule a new trimming
+ if (currentState == State.CreatingLedger // Give up now and schedule a new trimming
|| !metadataMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
scheduleDeferredTrimming(isTruncate, promise);
trimmerMutex.unlock();
@@ -2590,6 +2607,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
metadataMutex.unlock();
trimmerMutex.unlock();
+ handleBadVersion(e);
promise.completeExceptionally(e);
}
@@ -2680,7 +2698,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) {
// Delete the managed ledger without closing, since we are not interested in gracefully closing cursors and
// ledgers
- STATE_UPDATER.set(this, State.Fenced);
+ setFenced();
cancelScheduledTasks();
List<ManagedCursor> cursors = Lists.newArrayList(this.cursors);
@@ -2929,7 +2947,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
promise.whenComplete((result, exception) -> {
offloadMutex.unlock();
if (exception != null) {
- callback.offloadFailed(new ManagedLedgerException(exception), ctx);
+ callback.offloadFailed(ManagedLedgerException.getManagedLedgerException(exception), ctx);
} else {
callback.offloadComplete(result, ctx);
}
@@ -2943,11 +2961,17 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload,
PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
- if (getState() == State.Closed) {
+ State currentState = getState();
+ if (currentState == State.Closed) {
promise.completeExceptionally(new ManagedLedgerAlreadyClosedException(
String.format("managed ledger [%s] has already closed", name)));
return;
}
+ if (currentState == State.Fenced) {
+ promise.completeExceptionally(new ManagedLedgerFencedException(
+ String.format("managed ledger [%s] is fenced", name)));
+ return;
+ }
LedgerInfo info = ledgersToOffload.poll();
if (info == null) {
if (firstError.isPresent()) {
@@ -3076,6 +3100,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@Override
public void operationFailed(MetaStoreException e) {
+ handleBadVersion(e);
unlockingPromise.completeExceptionally(e);
}
});
@@ -3608,6 +3633,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
synchronized void setFenced() {
+ log.info("{} Moving to Fenced state", name);
STATE_UPDATER.set(this, State.Fenced);
}
@@ -3816,12 +3842,21 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
? Math.max(config.getAddEntryTimeoutSeconds(), config.getReadEntryTimeoutSeconds())
: timeoutSec;
this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
- checkAddTimeout();
- checkReadTimeout();
+ checkTimeouts();
}), timeoutSec, timeoutSec, TimeUnit.SECONDS);
}
}
+ private void checkTimeouts() {
+ final State state = STATE_UPDATER.get(this);
+ if (state == State.Closed
+ || state == State.Fenced) {
+ return;
+ }
+ checkAddTimeout();
+ checkReadTimeout();
+ }
+
private void checkAddTimeout() {
long timeoutSec = config.getAddEntryTimeoutSeconds();
if (timeoutSec < 1) {
@@ -3978,6 +4013,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@Override
public void operationFailed(MetaStoreException e) {
log.error("[{}] Update managedLedger's properties failed", name, e);
+ handleBadVersion(e);
callback.updatePropertiesFailed(e, ctx);
metadataMutex.unlock();
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
index 3c09a2a2387..a093bac48b9 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
@@ -18,15 +18,19 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
@@ -387,6 +391,72 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
}
}
+ @Test
+ public void recoverAfterZnodeVersionErrorWhileTrimming() throws Exception {
+ ManagedLedger ledger = factory.open("my_test_ledger_trim",
+ new ManagedLedgerConfig()
+ .setMaxEntriesPerLedger(2));
+ ledger.addEntry("test".getBytes());
+ ledger.addEntry("test".getBytes());
+ ledger.addEntry("test".getBytes());
+
+ metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) ->
+ path.equals("/managed-ledgers/my_test_ledger_trim")
+ && op == FaultInjectionMetadataStore.OperationType.PUT
+ );
+
+ CompletableFuture<?> handle = new CompletableFuture<>();
+ ledger.trimConsumedLedgersInBackground(handle);
+ assertThat(expectThrows(ExecutionException.class, () -> handle.get()).getCause(),
+ instanceOf(ManagedLedgerException.BadVersionException.class));
+
+ assertEquals(ManagedLedgerImpl.State.Fenced, ((ManagedLedgerImpl) ledger).getState());
+
+ // if the task started after the ML moved to Fenced state, it must fail
+ CompletableFuture<?> handleAlreadyFenced = new CompletableFuture<>();
+ ledger.trimConsumedLedgersInBackground(handleAlreadyFenced);
+ assertThat(expectThrows(ExecutionException.class, () -> handleAlreadyFenced.get()).getCause(),
+ instanceOf(ManagedLedgerException.ManagedLedgerFencedException.class));
+
+ try {
+ ledger.addEntry("entry".getBytes());
+ fail("should fail");
+ } catch (ManagedLedgerFencedException e) {
+ assertEquals("Attempted to use a fenced managed ledger", e.getCause().getMessage());
+ }
+
+ assertFalse(factory.ledgers.isEmpty());
+ try {
+ ledger.close();
+ } catch (ManagedLedgerFencedException e) {
+ assertEquals("Attempted to use a fenced managed ledger", e.getCause().getMessage());
+ }
+
+ // verify that the ManagedLedger has been unregistered even if it was fenced
+ assertTrue(factory.ledgers.isEmpty());
+ }
+
+ @Test
+ public void badVersionErrorDuringTruncateLedger() throws Exception {
+ ManagedLedger ledger = factory.open("my_test_ledger_trim",
+ new ManagedLedgerConfig()
+ .setMaxEntriesPerLedger(2));
+ ledger.addEntry("test".getBytes());
+ ledger.addEntry("test".getBytes());
+ ledger.addEntry("test".getBytes());
+
+ metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) ->
+ path.equals("/managed-ledgers/my_test_ledger_trim")
+ && op == FaultInjectionMetadataStore.OperationType.PUT
+ );
+
+ CompletableFuture<?> handle = ledger.asyncTruncate();
+ assertThat(expectThrows(ExecutionException.class, () -> handle.get()).getCause(),
+ instanceOf(ManagedLedgerException.BadVersionException.class));
+
+ assertEquals(ManagedLedgerImpl.State.Fenced, ((ManagedLedgerImpl) ledger).getState());
+ }
+
@Test
public void recoverAfterWriteError() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index a59f53d4575..b9747aa772a 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.ImmutableSet;
@@ -49,6 +50,8 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
@@ -126,6 +129,51 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
.filter(e -> e.getOffloadContext().getComplete())
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
offloader.offloadedLedgers());
+
+ // ledgers should be marked as offloaded
+ ledger.getLedgersInfoAsList().stream().allMatch(l -> l.hasOffloadContext());
+ }
+
+ @Test
+ public void testOffloadFenced() throws Exception {
+ MockLedgerOffloader offloader = new MockLedgerOffloader();
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+ config.setLedgerOffloader(offloader);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
+
+ int i = 0;
+ for (; i < 25; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
+ metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) ->
+ path.equals("/managed-ledgers/my_test_ledger")
+ && op == FaultInjectionMetadataStore.OperationType.PUT
+ );
+
+ assertThrows(ManagedLedgerException.ManagedLedgerFencedException.class, () ->
+ ledger.offloadPrefix(ledger.getLastConfirmedEntry()));
+
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
+ // the offloader actually wrote the data on the storage
+ assertEquals(ledger.getLedgersInfoAsList().stream()
+ .filter(e -> e.getOffloadContext().getComplete())
+ .map(e -> e.getLedgerId()).collect(Collectors.toSet()),
+ offloader.offloadedLedgers());
+
+ // but the ledgers should not be marked as offloaded in local memory, as the write to metadata failed
+ ledger.getLedgersInfoAsList().stream().allMatch(l -> !l.hasOffloadContext());
+
+ // check that the ledger is fenced
+ assertEquals(ManagedLedgerImpl.State.Fenced, ledger.getState());
+
}
@Test
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3044ca5dc0a..835e198ef76 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1285,30 +1285,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
@Override
public void closeComplete(Object ctx) {
// Everything is now closed, remove the topic from map
- brokerService.removeTopicFromCache(topic)
- .thenRun(() -> {
- replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
-
- dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
-
- subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
-
- unregisterTopicPolicyListener();
- log.info("[{}] Topic closed", topic);
- cancelFencedTopicMonitoringTask();
- closeFuture.complete(null);
- })
- .exceptionally(ex -> {
- closeFuture.completeExceptionally(ex);
- return null;
- });
+ disposeTopic(closeFuture);
}
@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
- brokerService.removeTopicFromCache(topic);
- closeFuture.complete(null);
+ disposeTopic(closeFuture);
}
}, null);
}).exceptionally(exception -> {
@@ -1321,6 +1304,26 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return closeFuture;
}
+ private void disposeTopic(CompletableFuture<?> closeFuture) {
+ brokerService.removeTopicFromCache(topic)
+ .thenRun(() -> {
+ replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
+
+ dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
+
+ subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
+
+ unregisterTopicPolicyListener();
+ log.info("[{}] Topic closed", topic);
+ cancelFencedTopicMonitoringTask();
+ closeFuture.complete(null);
+ })
+ .exceptionally(ex -> {
+ closeFuture.completeExceptionally(ex);
+ return null;
+ });
+ }
+
@VisibleForTesting
CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
CompletableFuture<Void> result = new CompletableFuture<Void>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 95ed3efdafa..18ea827ea4e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -368,8 +368,6 @@ public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {
producer.send(message.getBytes());
}
- ml.delete();
-
// Admin should be able to truncate the topic
admin.topics().truncate(topic1);