You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/02 00:23:27 UTC

[GitHub] merlimat closed pull request #2673: Avoid problem that topic becomes unavailable due to failure of cursor recovery

merlimat closed pull request #2673: Avoid problem that topic becomes unavailable due to failure of cursor recovery
URL: https://github.com/apache/pulsar/pull/2673
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 0ac818fa34..64efa85e4c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -348,11 +348,16 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
         if (!ledger.ledgerExists(position.getLedgerId())) {
             Long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId());
             if (nextExistingLedger == null) {
-                log.info("[{}-{}] Couldn't find next next valid ledger for recovery {}", ledger.getName(), name,
+                log.info("[{}] [{}] Couldn't find next next valid ledger for recovery {}", ledger.getName(), name,
                         position);
             }
             position = nextExistingLedger != null ? PositionImpl.get(nextExistingLedger, -1) : position;
         }
+        if (position.compareTo(ledger.getLastPosition()) > 0) {
+            log.warn("[{}] [{}] Current position {} is ahead of last position {}", ledger.getName(), name, position,
+                    ledger.getLastPosition());
+            position = PositionImpl.get(ledger.getLastPosition());
+        }
         log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position);
 
         messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition()));
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index a705d156e3..77d6c6c676 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -18,6 +18,11 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.*;
 
 import com.google.common.base.Charsets;
@@ -42,6 +47,7 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -66,6 +72,9 @@
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.zookeeper.KeeperException.Code;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
@@ -2708,5 +2717,56 @@ public void testEstimatedUnackedSize() throws Exception {
         assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 10 * entryData.length);
     }
 
+    @Test(timeOut = 20000)
+    public void testRecoverCursorAheadOfLastPosition() throws Exception {
+        final String mlName = "my_test_ledger";
+        final PositionImpl lastPosition = new PositionImpl(1L, 10L);
+        final PositionImpl nextPosition = new PositionImpl(3L, -1L);
+
+        final String cursorName = "my_test_cursor";
+        final long cursorsLedgerId = -1L;
+        final long markDeleteLedgerId = 2L;
+        final long markDeleteEntryId = -1L;
+
+        MetaStoreImplZookeeper mockMetaStore = mock(MetaStoreImplZookeeper.class);
+        doAnswer(new Answer<Object>() {
+            public Object answer(InvocationOnMock invocation) {
+                ManagedCursorInfo info = ManagedCursorInfo.newBuilder().setCursorsLedgerId(cursorsLedgerId)
+                        .setMarkDeleteLedgerId(markDeleteLedgerId).setMarkDeleteEntryId(markDeleteEntryId)
+                        .setLastActive(0L).build();
+                Stat stat = mock(Stat.class);
+                MetaStoreCallback<ManagedCursorInfo> callback = (MetaStoreCallback<ManagedCursorInfo>) invocation
+                        .getArguments()[2];
+                callback.operationComplete(info, stat);
+                return null;
+            }
+        }).when(mockMetaStore).asyncGetCursorInfo(eq(mlName), eq(cursorName), any(MetaStoreCallback.class));
+
+        ManagedLedgerImpl ml = mock(ManagedLedgerImpl.class);
+        when(ml.getName()).thenReturn(mlName);
+        when(ml.getStore()).thenReturn(mockMetaStore);
+        when(ml.getLastPosition()).thenReturn(lastPosition);
+        when(ml.getNextValidLedger(markDeleteLedgerId)).thenReturn(3L);
+        when(ml.getNextValidPosition(lastPosition)).thenReturn(nextPosition);
+        when(ml.ledgerExists(markDeleteLedgerId)).thenReturn(false);
+
+        BookKeeper mockBookKeeper = mock(BookKeeper.class);
+        final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, new ManagedLedgerConfig(), ml,
+                cursorName);
+
+        cursor.recover(new VoidCallback() {
+            @Override
+            public void operationComplete() {
+                assertEquals(cursor.getMarkDeletedPosition(), lastPosition);
+                assertEquals(cursor.getReadPosition(), nextPosition);
+            }
+
+            @Override
+            public void operationFailed(ManagedLedgerException exception) {
+                fail("Cursor recovery should not fail");
+            }
+        });
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services