You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2023/06/19 07:42:27 UTC
[bookkeeper] 03/31: [Bug] Always one orphan ledger is created (#3813)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 39d5fc1c9ae1809cd140db5e654ec1217da1f143
Author: Reid Chan <re...@apache.org>
AuthorDate: Fri Jun 16 10:41:13 2023 +0800
[Bug] Always one orphan ledger is created (#3813)
Issue #3812
Descriptions of the changes in this PR:
Fix bug and one UT for verification
### Motivation
Fix the bug
### Changes
To avoid the ledger being created twice, add a state check before setting a new state
(cherry picked from commit b062e9ed7e9fe0b0413b1f330f74a4860f0ba19a)
---
.../distributedlog/bk/SimpleLedgerAllocator.java | 4 +-
.../distributedlog/TestBKLogWriteHandler.java | 48 ++++++++++++++++++++++
.../apache/distributedlog/TestRollLogSegments.java | 2 +
3 files changed, 53 insertions(+), 1 deletion(-)
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
index da6ee153e3..c300c92c5a 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
@@ -310,7 +310,9 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
tryObtainTxn = null;
tryObtainListener = null;
// mark flag to issue an allocation request
- shouldAllocate = true;
+ if (lhToNotify == null) {
+ shouldAllocate = true;
+ }
}
}
if (null != listenerToNotify && null != lhToNotify) {
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java
index f4a3050ec7..c2a631e7eb 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java
@@ -23,8 +23,13 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogWriter;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.bk.LedgerAllocator;
import org.apache.distributedlog.bk.LedgerAllocatorPool;
@@ -91,4 +96,47 @@ public class TestBKLogWriteHandler extends TestDistributedLogBase {
Utils.close(writer);
}
+ @Test
+ public void testLedgerNumber() throws Exception {
+ URI uri = createDLMURI("/" + runtime.getMethodName());
+ ensureURICreated(zkc, uri);
+
+ DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+ confLocal.addConfiguration(conf);
+ confLocal.setOutputBufferSize(0);
+
+ BKDistributedLogNamespace namespace = (BKDistributedLogNamespace)
+ NamespaceBuilder.newBuilder()
+ .conf(confLocal)
+ .uri(uri)
+ .build();
+ DistributedLogManager dlm = namespace.openLog("test-log");
+ BookKeeperAdmin admin = new BookKeeperAdmin(zkServers);
+
+ Set<Long> s1 = getLedgers(admin);
+ LOG.info("Ledgers after init: " + s1);
+
+ LogWriter writer = dlm.openLogWriter();
+ writer.write(new LogRecord(1, "test-data".getBytes(StandardCharsets.UTF_8)));
+ writer.close();
+
+ Set<Long> s2 = getLedgers(admin);
+ LOG.info("Ledgers after write: " + s2);
+ dlm.delete();
+ assertEquals(1, s2.size() - s1.size()); // exact 1 ledger created only
+
+ Set<Long> s3 = getLedgers(admin);
+ LOG.info("Ledgers after delete: " + s3);
+
+ assertEquals(s1.size(), s3.size());
+
+ }
+
+ // Get all ledgers from BK admin
+ private Set<Long> getLedgers(BookKeeperAdmin bkAdmin) throws IOException {
+ Set<Long> res = new HashSet<>();
+ bkAdmin.listLedgers().forEach(res::add);
+ return res;
+ }
+
}
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java
index 180646ac70..1a8dc7d8a4 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java
@@ -293,6 +293,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
List<LogSegmentMetadata> segments = dlm.getLogSegments();
logger.info("LogSegments : {}", segments);
+ logger.info("LogSegments size: {}", segments.size());
assertTrue(segments.size() >= 2);
ensureOnlyOneInprogressLogSegments(segments);
@@ -308,6 +309,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
segments = dlm.getLogSegments();
logger.info("LogSegments : {}", segments);
+ logger.info("LogSegments size: {}", segments.size());
assertEquals(numSegmentsAfterAsyncWrites + numLogSegments / 2, segments.size());
ensureOnlyOneInprogressLogSegments(segments);