You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 02:18:41 UTC
[pulsar] 09/16: Update cursor last active timestamp when reseting cursor (#13166)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7cce3f9b2c76c90953b943d9c27a45d5abdff59b
Author: Zhanpeng Wu <zh...@qq.com>
AuthorDate: Fri Dec 10 17:45:45 2021 +0800
Update cursor last active timestamp when reseting cursor (#13166)
Resolves #13165
### Modifications
1. trigger last active time update after resetting cursor
2. add related test case
(cherry picked from commit 26342996f8336dd4f63634d8962b6fef11087485)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 42 ++++++++++++++++++++++
2 files changed, 43 insertions(+), 1 deletion(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index f13fa93..521ec7a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1103,7 +1103,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}
}
callback.resetComplete(newPosition);
-
+ updateLastActive();
}
@Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index bbc1827..2f80bc8 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -771,6 +772,47 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
}
@Test(timeOut = 20000)
+ void testLastActiveAfterResetCursor() throws Exception {
+ ManagedLedger ledger = factory.open("test_cursor_ledger");
+ ManagedCursor cursor = ledger.openCursor("tla");
+
+ PositionImpl lastPosition = null;
+ for (int i = 0; i < 3; i++) {
+ lastPosition = (PositionImpl) ledger.addEntry("dummy-entry".getBytes(Encoding));
+ }
+
+ final AtomicBoolean moveStatus = new AtomicBoolean(false);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ long lastActive = cursor.getLastActive();
+
+ cursor.asyncResetCursor(lastPosition, new AsyncCallbacks.ResetCursorCallback() {
+ @Override
+ public void resetComplete(Object ctx) {
+ moveStatus.set(true);
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void resetFailed(ManagedLedgerException exception, Object ctx) {
+ moveStatus.set(false);
+ countDownLatch.countDown();
+ }
+ });
+
+ countDownLatch.await();
+ assertTrue(moveStatus.get());
+
+ assertNotNull(lastPosition);
+ assertEquals(lastPosition, cursor.getReadPosition());
+
+ assertNotEquals(lastActive, cursor.getLastActive());
+
+ cursor.close();
+ ledger.close();
+ }
+
+ @Test(timeOut = 20000)
void seekPosition() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
ManagedCursor cursor = ledger.openCursor("c1");