You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/26 02:37:03 UTC

[pulsar] branch branch-2.10 updated: ManagedLedger: move to FENCED state in case of BadVersionException (#17736)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 4c0b672a37a ManagedLedger: move to FENCED state in case of BadVersionException (#17736)
4c0b672a37a is described below

commit 4c0b672a37a2873d75a754e3125f4c5594fdc9c4
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Thu Sep 22 15:50:12 2022 +0200

    ManagedLedger: move to FENCED state in case of BadVersionException (#17736)
    
    (cherry picked from commit 63d4cf20e7b9c9bd24d3fcd5ba7397f0d185ce57)
---
 .../bookkeeper/mledger/ManagedLedgerException.java |  4 ++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 54 ++++++++++++++---
 .../mledger/impl/ManagedLedgerErrorsTest.java      | 70 ++++++++++++++++++++++
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 48 +++++++++++++++
 .../broker/service/persistent/PersistentTopic.java | 41 +++++++------
 .../broker/service/BrokerBkEnsemblesTests.java     |  2 -
 6 files changed, 189 insertions(+), 30 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 0280ceda725..3dd4b9e3ca1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -80,6 +80,10 @@ public class ManagedLedgerException extends Exception {
             super(new Exception("Attempted to use a fenced managed ledger"));
         }
 
+        public ManagedLedgerFencedException(String message) {
+            super(message);
+        }
+
         public ManagedLedgerFencedException(Exception e) {
             super(e);
         }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 6f933962d46..6d3a628a367 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -415,6 +415,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
             @Override
             public void operationFailed(MetaStoreException e) {
+                handleBadVersion(e);
                 if (e instanceof MetadataNotFoundException) {
                     callback.initializeFailed(new ManagedLedgerNotFoundException(e));
                 } else {
@@ -465,6 +466,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
             @Override
             public void operationFailed(MetaStoreException e) {
+                handleBadVersion(e);
                 callback.initializeFailed(new ManagedLedgerException(e));
             }
         };
@@ -1005,6 +1007,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
             @Override
             public void operationFailed(MetaStoreException e) {
+                handleBadVersion(e);
                 callback.deleteCursorFailed(e, ctx);
             }
 
@@ -1295,6 +1298,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     @Override
                     public void operationFailed(MetaStoreException e) {
                         log.error("[{}] Failed to terminate managed ledger: {}", name, e.getMessage());
+                        handleBadVersion(e);
                         callback.terminateFailed(new ManagedLedgerException(e), ctx);
                     }
                 });
@@ -1379,6 +1383,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     public synchronized void asyncClose(final CloseCallback callback, final Object ctx) {
         State state = STATE_UPDATER.get(this);
         if (state == State.Fenced) {
+            cancelScheduledTasks();
             factory.close(this);
             callback.closeFailed(new ManagedLedgerFencedException(), ctx);
             return;
@@ -1502,6 +1507,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 @Override
                 public void operationFailed(MetaStoreException e) {
                     log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage());
+                    handleBadVersion(e);
                     mbean.startDataLedgerDeleteOp();
                     bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> {
                         mbean.endDataLedgerDeleteOp();
@@ -1510,14 +1516,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                                     BKException.getMessage(rc1));
                         }
                     }, null);
-
                     if (e instanceof BadVersionException) {
                         synchronized (ManagedLedgerImpl.this) {
                             log.error(
                                 "[{}] Failed to update ledger list. z-node version mismatch. Closing managed ledger",
                                 name);
                             lastLedgerCreationFailureTimestamp = clock.millis();
-                            STATE_UPDATER.set(ManagedLedgerImpl.this, State.Fenced);
                             // Return ManagedLedgerFencedException to addFailed callback
                             // to indicate that the ledger is now fenced and topic needs to be closed
                             clearPendingAddEntries(new ManagedLedgerFencedException(e));
@@ -1540,6 +1544,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             updateLedgersListAfterRollover(cb, newLedger);
         }
     }
+
+    private void handleBadVersion(Throwable e) {
+        if (e instanceof BadVersionException) {
+            setFenced();
+        }
+    }
     private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback, LedgerInfo newLedger) {
         if (!metadataMutex.tryLock()) {
             // Defer update for later
@@ -2429,12 +2439,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(),
                         TOTAL_SIZE_UPDATER.get(this));
             }
-            if (STATE_UPDATER.get(this) == State.Closed) {
+            State currentState = STATE_UPDATER.get(this);
+            if (currentState == State.Closed) {
                 log.debug("[{}] Ignoring trimming request since the managed ledger was already closed", name);
                 trimmerMutex.unlock();
                 promise.completeExceptionally(new ManagedLedgerAlreadyClosedException("Can't trim closed ledger"));
                 return;
             }
+            if (currentState == State.Fenced) {
+                log.debug("[{}] Ignoring trimming request since the managed ledger was already fenced", name);
+                trimmerMutex.unlock();
+                promise.completeExceptionally(new ManagedLedgerFencedException("Can't trim fenced ledger"));
+                return;
+            }
 
             long slowestReaderLedgerId = -1;
             if (!cursors.hasDurableCursors()) {
@@ -2523,7 +2540,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 return;
             }
 
-            if (STATE_UPDATER.get(this) == State.CreatingLedger // Give up now and schedule a new trimming
+            if (currentState == State.CreatingLedger // Give up now and schedule a new trimming
                     || !metadataMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
                 scheduleDeferredTrimming(isTruncate, promise);
                 trimmerMutex.unlock();
@@ -2590,6 +2607,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
                     metadataMutex.unlock();
                     trimmerMutex.unlock();
+                    handleBadVersion(e);
 
                     promise.completeExceptionally(e);
                 }
@@ -2680,7 +2698,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) {
         // Delete the managed ledger without closing, since we are not interested in gracefully closing cursors and
         // ledgers
-        STATE_UPDATER.set(this, State.Fenced);
+        setFenced();
         cancelScheduledTasks();
 
         List<ManagedCursor> cursors = Lists.newArrayList(this.cursors);
@@ -2929,7 +2947,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             promise.whenComplete((result, exception) -> {
                 offloadMutex.unlock();
                 if (exception != null) {
-                    callback.offloadFailed(new ManagedLedgerException(exception), ctx);
+                    callback.offloadFailed(ManagedLedgerException.getManagedLedgerException(exception), ctx);
                 } else {
                     callback.offloadComplete(result, ctx);
                 }
@@ -2943,11 +2961,17 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload,
             PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
-        if (getState() == State.Closed) {
+        State currentState = getState();
+        if (currentState == State.Closed) {
             promise.completeExceptionally(new ManagedLedgerAlreadyClosedException(
                     String.format("managed ledger [%s] has already closed", name)));
             return;
         }
+        if (currentState == State.Fenced) {
+            promise.completeExceptionally(new ManagedLedgerFencedException(
+                    String.format("managed ledger [%s] is fenced", name)));
+            return;
+        }
         LedgerInfo info = ledgersToOffload.poll();
         if (info == null) {
             if (firstError.isPresent()) {
@@ -3076,6 +3100,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
                                     @Override
                                     public void operationFailed(MetaStoreException e) {
+                                        handleBadVersion(e);
                                         unlockingPromise.completeExceptionally(e);
                                     }
                                 });
@@ -3608,6 +3633,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     synchronized void setFenced() {
+        log.info("{} Moving to Fenced state", name);
         STATE_UPDATER.set(this, State.Fenced);
     }
 
@@ -3816,12 +3842,21 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     ? Math.max(config.getAddEntryTimeoutSeconds(), config.getReadEntryTimeoutSeconds())
                     : timeoutSec;
             this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
-                checkAddTimeout();
-                checkReadTimeout();
+                checkTimeouts();
             }), timeoutSec, timeoutSec, TimeUnit.SECONDS);
         }
     }
 
+    private void checkTimeouts() {
+        final State state = STATE_UPDATER.get(this);
+        if (state == State.Closed
+                || state == State.Fenced) {
+            return;
+        }
+        checkAddTimeout();
+        checkReadTimeout();
+    }
+
     private void checkAddTimeout() {
         long timeoutSec = config.getAddEntryTimeoutSeconds();
         if (timeoutSec < 1) {
@@ -3978,6 +4013,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             @Override
             public void operationFailed(MetaStoreException e) {
                 log.error("[{}] Update managedLedger's properties failed", name, e);
+                handleBadVersion(e);
                 callback.updatePropertiesFailed(e, ctx);
                 metadataMutex.unlock();
             }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
index 3c09a2a2387..a093bac48b9 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
@@ -18,15 +18,19 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
 import static org.testng.Assert.fail;
 import io.netty.buffer.ByteBuf;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException;
@@ -387,6 +391,72 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
         }
     }
 
+    @Test
+    public void recoverAfterZnodeVersionErrorWhileTrimming() throws Exception {
+        ManagedLedger ledger = factory.open("my_test_ledger_trim",
+                new ManagedLedgerConfig()
+                        .setMaxEntriesPerLedger(2));
+        ledger.addEntry("test".getBytes());
+        ledger.addEntry("test".getBytes());
+        ledger.addEntry("test".getBytes());
+
+        metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) ->
+                path.equals("/managed-ledgers/my_test_ledger_trim")
+                        && op == FaultInjectionMetadataStore.OperationType.PUT
+        );
+
+        CompletableFuture<?> handle = new CompletableFuture<>();
+        ledger.trimConsumedLedgersInBackground(handle);
+        assertThat(expectThrows(ExecutionException.class, () -> handle.get()).getCause(),
+                instanceOf(ManagedLedgerException.BadVersionException.class));
+
+        assertEquals(ManagedLedgerImpl.State.Fenced, ((ManagedLedgerImpl) ledger).getState());
+
+        // if the task started after the ML moved to Fenced state, it must fail
+        CompletableFuture<?> handleAlreadyFenced = new CompletableFuture<>();
+        ledger.trimConsumedLedgersInBackground(handleAlreadyFenced);
+        assertThat(expectThrows(ExecutionException.class, () -> handleAlreadyFenced.get()).getCause(),
+                instanceOf(ManagedLedgerException.ManagedLedgerFencedException.class));
+
+        try {
+            ledger.addEntry("entry".getBytes());
+            fail("should fail");
+        } catch (ManagedLedgerFencedException e) {
+            assertEquals("Attempted to use a fenced managed ledger", e.getCause().getMessage());
+        }
+
+        assertFalse(factory.ledgers.isEmpty());
+        try {
+            ledger.close();
+        } catch (ManagedLedgerFencedException e) {
+            assertEquals("Attempted to use a fenced managed ledger", e.getCause().getMessage());
+        }
+
+        // verify that the ManagedLedger has been unregistered even if it was fenced
+        assertTrue(factory.ledgers.isEmpty());
+    }
+
+    @Test
+    public void badVersionErrorDuringTruncateLedger() throws Exception {
+        ManagedLedger ledger = factory.open("my_test_ledger_trim",
+                new ManagedLedgerConfig()
+                        .setMaxEntriesPerLedger(2));
+        ledger.addEntry("test".getBytes());
+        ledger.addEntry("test".getBytes());
+        ledger.addEntry("test".getBytes());
+
+        metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) ->
+                path.equals("/managed-ledgers/my_test_ledger_trim")
+                        && op == FaultInjectionMetadataStore.OperationType.PUT
+        );
+
+        CompletableFuture<?> handle = ledger.asyncTruncate();
+        assertThat(expectThrows(ExecutionException.class, () -> handle.get()).getCause(),
+                instanceOf(ManagedLedgerException.BadVersionException.class));
+
+        assertEquals(ManagedLedgerImpl.State.Fenced, ((ManagedLedgerImpl) ledger).getState());
+    }
+
     @Test
     public void recoverAfterWriteError() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger");
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index a59f53d4575..b9747aa772a 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.impl;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.ImmutableSet;
@@ -49,6 +50,8 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
@@ -126,6 +129,51 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
                             .filter(e -> e.getOffloadContext().getComplete())
                             .map(e -> e.getLedgerId()).collect(Collectors.toSet()),
                             offloader.offloadedLedgers());
+
+        // ledgers should be marked as offloaded
+        ledger.getLedgersInfoAsList().stream().allMatch(l -> l.hasOffloadContext());
+    }
+
+    @Test
+    public void testOffloadFenced() throws Exception {
+        MockLedgerOffloader offloader = new MockLedgerOffloader();
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        config.setRetentionTime(10, TimeUnit.MINUTES);
+        config.setRetentionSizeInMB(10);
+        config.setLedgerOffloader(offloader);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
+
+        int i = 0;
+        for (; i < 25; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
+        assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
+        metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) ->
+                path.equals("/managed-ledgers/my_test_ledger")
+                        && op == FaultInjectionMetadataStore.OperationType.PUT
+        );
+
+        assertThrows(ManagedLedgerException.ManagedLedgerFencedException.class, () ->
+            ledger.offloadPrefix(ledger.getLastConfirmedEntry()));
+
+        assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
+        // the offloader actually wrote the data on the storage
+        assertEquals(ledger.getLedgersInfoAsList().stream()
+                        .filter(e -> e.getOffloadContext().getComplete())
+                        .map(e -> e.getLedgerId()).collect(Collectors.toSet()),
+                offloader.offloadedLedgers());
+
+        // but the ledgers should not be marked as offloaded in local memory, as the write to metadata failed
+        ledger.getLedgersInfoAsList().stream().allMatch(l -> !l.hasOffloadContext());
+
+        // check that the ledger is fenced
+        assertEquals(ManagedLedgerImpl.State.Fenced, ledger.getState());
+
     }
 
     @Test
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3044ca5dc0a..835e198ef76 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1285,30 +1285,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 @Override
                 public void closeComplete(Object ctx) {
                     // Everything is now closed, remove the topic from map
-                    brokerService.removeTopicFromCache(topic)
-                            .thenRun(() -> {
-                                replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
-
-                                dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
-
-                                subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
-
-                                unregisterTopicPolicyListener();
-                                log.info("[{}] Topic closed", topic);
-                                cancelFencedTopicMonitoringTask();
-                                closeFuture.complete(null);
-                            })
-                    .exceptionally(ex -> {
-                        closeFuture.completeExceptionally(ex);
-                        return null;
-                    });
+                    disposeTopic(closeFuture);
                 }
 
                 @Override
                 public void closeFailed(ManagedLedgerException exception, Object ctx) {
                     log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
-                    brokerService.removeTopicFromCache(topic);
-                    closeFuture.complete(null);
+                    disposeTopic(closeFuture);
                 }
             }, null);
         }).exceptionally(exception -> {
@@ -1321,6 +1304,26 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return closeFuture;
     }
 
+    private void disposeTopic(CompletableFuture<?> closeFuture) {
+        brokerService.removeTopicFromCache(topic)
+                .thenRun(() -> {
+                    replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
+
+                    dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
+
+                    subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
+
+                    unregisterTopicPolicyListener();
+                    log.info("[{}] Topic closed", topic);
+                    cancelFencedTopicMonitoringTask();
+                    closeFuture.complete(null);
+                })
+                .exceptionally(ex -> {
+                    closeFuture.completeExceptionally(ex);
+                    return null;
+                });
+    }
+
     @VisibleForTesting
     CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
         CompletableFuture<Void> result = new CompletableFuture<Void>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 95ed3efdafa..18ea827ea4e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -368,8 +368,6 @@ public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {
             producer.send(message.getBytes());
         }
 
-        ml.delete();
-
         // Admin should be able to truncate the topic
         admin.topics().truncate(topic1);