You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by re...@apache.org on 2019/05/04 22:28:06 UTC
[bookkeeper] branch master updated: GetListOfEntriesOfLedger
implementation
This is an automated email from the ASF dual-hosted git repository.
reddycharan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new aa84c7f GetListOfEntriesOfLedger implementation
aa84c7f is described below
commit aa84c7fdd24a925f57f087670b205a35fb8ef237
Author: Charan Reddy Guttapalem <re...@gmail.com>
AuthorDate: Sat May 4 15:28:01 2019 -0700
GetListOfEntriesOfLedger implementation
Descriptions of the changes in this PR:
As described in this BP - https://github.com/apache/bookkeeper/blob/master/site/bps/BP-34-cluster-metadata-checker.md, this request returns list of entries bookie contains for the given ledger in an encoded format. The returned list provides weakly consistent state, of the entries of the ledger.
Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
This closes #1931 from reddycharan/getlistofentries and squashes the following commits:
467bb8b73 [Charan Reddy Guttapalem] GetListOfEntriesOfLedger implementation
757f99156 [Charan Reddy Guttapalem] GetListOfEntriesOfLedger implementation
4e84bcba0 [Charan Reddy Guttapalem] GetListOfEntriesOfLedger implementation
f5655bb29 [Charan Reddy Guttapalem] GetListOfEntriesOfLedger implementation
---
.../src/main/proto/BookkeeperProtocol.proto | 13 +
.../bookkeeper/bookie/BookKeeperServerStats.java | 3 +
.../java/org/apache/bookkeeper/bookie/Bookie.java | 22 ++
.../apache/bookkeeper/bookie/EntryMemTable.java | 49 ++-
.../bookie/InterleavedLedgerStorage.java | 16 +-
.../InterleavedStorageRegenerateIndexOp.java | 5 +
.../org/apache/bookkeeper/bookie/LedgerCache.java | 4 +
.../apache/bookkeeper/bookie/LedgerCacheImpl.java | 49 +++
.../apache/bookkeeper/bookie/LedgerDescriptor.java | 4 +
.../bookkeeper/bookie/LedgerDescriptorImpl.java | 6 +
.../apache/bookkeeper/bookie/LedgerEntryPage.java | 27 ++
.../apache/bookkeeper/bookie/LedgerStorage.java | 17 +
.../bookkeeper/bookie/SortedLedgerStorage.java | 11 +
.../bookkeeper/bookie/stats/BookieStats.java | 9 +
.../bookie/storage/ldb/DbLedgerStorage.java | 8 +
.../ldb/SingleDirectoryDbLedgerStorage.java | 7 +
.../apache/bookkeeper/client/BookKeeperAdmin.java | 18 +-
.../bookkeeper/client/BookKeeperClientStats.java | 2 +
.../org/apache/bookkeeper/proto/BookieClient.java | 15 +
.../apache/bookkeeper/proto/BookieClientImpl.java | 31 ++
.../bookkeeper/proto/BookieRequestProcessor.java | 13 +
.../proto/BookkeeperInternalCallbacks.java | 48 +++
.../proto/GetListOfEntriesOfLedgerProcessorV3.java | 110 ++++++
.../bookkeeper/proto/PerChannelBookieClient.java | 83 +++++
.../org/apache/bookkeeper/proto/RequestStats.java | 17 +
.../util/AvailabilityOfEntriesOfLedger.java | 377 +++++++++++++++++++++
.../apache/bookkeeper/util/IteratorUtility.java | 171 ++++++++++
.../bookie/InterleavedLedgerStorageTest.java | 56 ++-
.../bookkeeper/bookie/LedgerStorageTest.java | 45 +++
.../bookkeeper/bookie/SortedLedgerStorageTest.java | 194 +++++++++++
.../bookkeeper/bookie/TestEntryMemTable.java | 154 +++++++++
.../apache/bookkeeper/bookie/TestSyncThread.java | 6 +
.../bookkeeper/client/BookKeeperAdminTest.java | 98 ++++++
.../org/apache/bookkeeper/meta/GcLedgersTest.java | 6 +
.../bookkeeper/meta/LedgerManagerTestCase.java | 6 +
.../apache/bookkeeper/proto/MockBookieClient.java | 13 +
.../util/AvailabilityOfEntriesOfLedgerTest.java | 192 +++++++++++
.../bookkeeper/util/IteratorUtilityTest.java | 141 ++++++++
38 files changed, 2041 insertions(+), 5 deletions(-)
diff --git a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
index f34d56e..4f178aa 100644
--- a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
@@ -64,6 +64,7 @@ enum OperationType {
GET_BOOKIE_INFO = 8;
START_TLS = 9;
FORCE_LEDGER = 10;
+ GET_LIST_OF_ENTRIES_OF_LEDGER = 11;
}
/**
@@ -92,6 +93,7 @@ message Request {
optional GetBookieInfoRequest getBookieInfoRequest = 105;
optional StartTLSRequest startTLSRequest = 106;
optional ForceLedgerRequest forceLedgerRequest = 107;
+ optional GetListOfEntriesOfLedgerRequest getListOfEntriesOfLedgerRequest = 108;
// to pass MDC context
repeated ContextPair requestContext = 200;
}
@@ -152,6 +154,10 @@ message GetBookieInfoRequest {
optional int64 requested = 1;
}
+message GetListOfEntriesOfLedgerRequest {
+ required int64 ledgerId = 1;
+}
+
message Response {
required BKPacketHeader header = 1;
@@ -167,6 +173,7 @@ message Response {
optional GetBookieInfoResponse getBookieInfoResponse = 105;
optional StartTLSResponse startTLSResponse = 106;
optional ForceLedgerResponse forceLedgerResponse = 107;
+ optional GetListOfEntriesOfLedgerResponse getListOfEntriesOfLedgerResponse = 108;
}
message ReadResponse {
@@ -213,5 +220,11 @@ message GetBookieInfoResponse {
optional int64 freeDiskSpace = 3;
}
+message GetListOfEntriesOfLedgerResponse {
+ required StatusCode status = 1;
+ required int64 ledgerId = 2;
+ optional bytes availabilityOfEntriesOfLedger = 3; // condensed encoded format representing availability of entries of ledger
+}
+
message StartTLSResponse {
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index b58514b..bdca048 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -64,6 +64,8 @@ public interface BookKeeperServerStats {
String READ_LAC = "READ_LAC";
String GET_BOOKIE_INFO_REQUEST = "GET_BOOKIE_INFO_REQUEST";
String GET_BOOKIE_INFO = "GET_BOOKIE_INFO";
+ String GET_LIST_OF_ENTRIES_OF_LEDGER = "GET_LIST_OF_ENTRIES_OF_LEDGER";
+ String GET_LIST_OF_ENTRIES_OF_LEDGER_REQUEST = "GET_LIST_OF_ENTRIES_OF_LEDGER_REQUEST";
// Ensemble Stats
String WATCHER_SCOPE = "bookie_watcher";
@@ -80,6 +82,7 @@ public interface BookKeeperServerStats {
String BOOKIE_READ_LAST_CONFIRMED = "BOOKIE_READ_LAST_CONFIRMED";
String BOOKIE_ADD_ENTRY_BYTES = "BOOKIE_ADD_ENTRY_BYTES";
String BOOKIE_READ_ENTRY_BYTES = "BOOKIE_READ_ENTRY_BYTES";
+ String BOOKIE_GET_LIST_OF_ENTRIES_OF_LEDGER = "BOOKIE_GET_LIST_OF_ENTRIES_OF_LEDGER";
String ADD_ENTRY_IN_PROGRESS = "ADD_ENTRY_IN_PROGRESS";
String ADD_ENTRY_BLOCKED = "ADD_ENTRY_BLOCKED";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index e253fdc..32adb5d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -52,6 +52,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.PrimitiveIterator.OfLong;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -1594,4 +1595,25 @@ public class Bookie extends BookieCriticalThread {
return new LedgerDirsManager(conf, idxDirs, diskChecker, statsLogger);
}
}
+
+ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedgerException {
+ long requestNanos = MathUtils.nowInNano();
+ boolean success = false;
+ try {
+ LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("GetEntriesOfLedger {}", ledgerId);
+ }
+ OfLong entriesOfLedger = handle.getListOfEntriesOfLedger(ledgerId);
+ success = true;
+ return entriesOfLedger;
+ } finally {
+ long elapsedNanos = MathUtils.elapsedNanos(requestNanos);
+ if (success) {
+ bookieStats.getReadEntryStats().registerSuccessfulEvent(elapsedNanos, TimeUnit.NANOSECONDS);
+ } else {
+ bookieStats.getReadEntryStats().registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS);
+ }
+ }
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index a27e54e..941906c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -21,17 +21,21 @@ package org.apache.bookkeeper.bookie;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.PrimitiveIterator;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.stats.EntryMemTableStats;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.IteratorUtility;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +48,6 @@ import org.slf4j.LoggerFactory;
*/
public class EntryMemTable implements AutoCloseable{
private static Logger logger = LoggerFactory.getLogger(Journal.class);
-
/**
* Entry skip list.
*/
@@ -456,4 +459,48 @@ public class EntryMemTable implements AutoCloseable{
public void close() throws Exception {
// no-op
}
+
+ /*
+ * returns the primitive long iterator of entries of a ledger available in
+ * this EntryMemTable. It would be in the ascending order and this Iterator
+ * is weakly consistent.
+ */
+ PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) {
+ EntryKey thisLedgerFloorEntry = new EntryKey(ledgerId, 0);
+ EntryKey thisLedgerCeilingEntry = new EntryKey(ledgerId, Long.MAX_VALUE);
+ Iterator<EntryKey> thisLedgerEntriesInKVMap;
+ Iterator<EntryKey> thisLedgerEntriesInSnapshot;
+ this.lock.readLock().lock();
+ try {
+ /*
+ * Gets a view of the portion of this map that corresponds to
+ * entries of this ledger.
+ *
+ * Here 'kvmap' is of type 'ConcurrentSkipListMap', so its 'subMap'
+ * call would return a view of the portion of this map whose keys
+ * range from fromKey to toKey and it would be of type
+ * 'ConcurrentNavigableMap'. ConcurrentNavigableMap's 'keySet' would
+ * return NavigableSet view of the keys contained in this map. This
+ * view's iterator would be weakly consistent -
+ * https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/
+ * package-summary.html#Weakly.
+ *
+ * 'weakly consistent' would guarantee 'to traverse elements as they
+ * existed upon construction exactly once, and may (but are not
+ * guaranteed to) reflect any modifications subsequent to
+ * construction.'
+ *
+ */
+ thisLedgerEntriesInKVMap = this.kvmap.subMap(thisLedgerFloorEntry, thisLedgerCeilingEntry).keySet()
+ .iterator();
+ thisLedgerEntriesInSnapshot = this.snapshot.subMap(thisLedgerFloorEntry, thisLedgerCeilingEntry).keySet()
+ .iterator();
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ return IteratorUtility.mergeIteratorsForPrimitiveLongIterator(thisLedgerEntriesInKVMap,
+ thisLedgerEntriesInSnapshot, EntryKey.COMPARATOR, (entryKey) -> {
+ return entryKey.entryId;
+ });
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index becb16b..5d4ec0e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -34,6 +34,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_P
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -45,12 +46,14 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
+import java.util.PrimitiveIterator.OfLong;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
import lombok.Getter;
+
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
@@ -65,8 +68,8 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.SnapshotMap;
-import org.apache.commons.lang.mutable.MutableBoolean;
-import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,6 +86,7 @@ import org.slf4j.LoggerFactory;
)
public class InterleavedLedgerStorage implements CompactableLedgerStorage, EntryLogListener {
private static final Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class);
+ public static final long INVALID_ENTRYID = -1;
EntryLogger entryLogger;
@Getter
@@ -103,6 +107,8 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
// this indicates that a write has happened since the last flush
private final AtomicBoolean somethingWritten = new AtomicBoolean(false);
+ private int pageSize;
+
// Expose Stats
@StatsDoc(
name = STORAGE_GET_OFFSET,
@@ -191,6 +197,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger.scope("gc"));
+ pageSize = conf.getPageSize();
ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
// Expose Stats
getOffsetStats = statsLogger.getOpStatsLogger(STORAGE_GET_OFFSET);
@@ -649,4 +656,9 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
public List<GarbageCollectionStatus> getGarbageCollectionStatus() {
return Collections.singletonList(gcThread.getGarbageCollectionStatus());
}
+
+ @Override
+ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
+ return ledgerCache.getEntriesIterator(ledgerId);
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
index 880d3c1..90f5afc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
+import java.util.PrimitiveIterator.OfLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -240,5 +241,9 @@ public class InterleavedStorageRegenerateIndexOp {
public LedgerIndexMetadata readLedgerIndexMetadata(long ledgerId) throws IOException {
throw new UnsupportedOperationException();
}
+ @Override
+ public OfLong getEntriesIterator(long ledgerId) throws IOException {
+ throw new UnsupportedOperationException();
+ }
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
index cae8bb4..606afb4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
@@ -26,6 +26,8 @@ import static org.apache.bookkeeper.tools.cli.commands.bookie.FormatUtil.bytes2H
import io.netty.buffer.ByteBuf;
import java.io.Closeable;
import java.io.IOException;
+import java.util.PrimitiveIterator.OfLong;
+
import org.apache.bookkeeper.common.util.Watcher;
/**
@@ -87,6 +89,8 @@ public interface LedgerCache extends Closeable {
PageEntriesIterable listEntries(long ledgerId) throws IOException;
+ OfLong getEntriesIterator(long ledgerId) throws IOException;
+
/**
* Represents summary of ledger metadata.
*/
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
index e6de2f9..7341ace 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
@@ -23,6 +23,11 @@ package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.PrimitiveIterator.OfLong;
+
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -186,4 +191,48 @@ public class LedgerCacheImpl implements LedgerCache {
public LedgerIndexMetadata readLedgerIndexMetadata(long ledgerId) throws IOException {
return indexPersistenceManager.readLedgerIndexMetadata(ledgerId);
}
+
+ @Override
+ public OfLong getEntriesIterator(long ledgerId) throws IOException {
+ Iterator<LedgerCache.PageEntries> pageEntriesIteratorNonFinal = null;
+ try {
+ pageEntriesIteratorNonFinal = listEntries(ledgerId).iterator();
+ } catch (Bookie.NoLedgerException noLedgerException) {
+ pageEntriesIteratorNonFinal = Collections.emptyIterator();
+ }
+ final Iterator<LedgerCache.PageEntries> pageEntriesIterator = pageEntriesIteratorNonFinal;
+ return new OfLong() {
+ private OfLong entriesInCurrentLEPIterator = null;
+ {
+ if (pageEntriesIterator.hasNext()) {
+ entriesInCurrentLEPIterator = pageEntriesIterator.next().getLEP().getEntriesIterator();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ while ((entriesInCurrentLEPIterator != null) && (!entriesInCurrentLEPIterator.hasNext())) {
+ if (pageEntriesIterator.hasNext()) {
+ entriesInCurrentLEPIterator = pageEntriesIterator.next().getLEP().getEntriesIterator();
+ } else {
+ entriesInCurrentLEPIterator = null;
+ }
+ }
+ return (entriesInCurrentLEPIterator != null);
+ } catch (Exception exc) {
+ throw new RuntimeException(
+ "Received exception in InterleavedLedgerStorage getEntriesOfLedger hasNext call", exc);
+ }
+ }
+
+ @Override
+ public long nextLong() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return entriesInCurrentLEPIterator.nextLong();
+ }
+ };
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index 74bc8b5..abb7b34 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -27,6 +27,8 @@ import com.google.common.util.concurrent.SettableFuture;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
+import java.util.PrimitiveIterator.OfLong;
+
import org.apache.bookkeeper.common.util.Watcher;
/**
@@ -86,4 +88,6 @@ public abstract class LedgerDescriptor {
abstract void setExplicitLac(ByteBuf entry) throws IOException;
abstract ByteBuf getExplicitLac();
+
+ abstract OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index 563494e..da884b5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.SettableFuture;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Arrays;
+import java.util.PrimitiveIterator.OfLong;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.common.util.Watcher;
@@ -173,4 +174,9 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
void cancelWaitForLastAddConfirmedUpdate(Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
ledgerStorage.cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher);
}
+
+ @Override
+ OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
+ return ledgerStorage.getListOfEntriesOfLedger(ledgerId);
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
index c272e7c..98ea948 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
@@ -23,6 +23,8 @@ package org.apache.bookkeeper.bookie;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
+import java.util.PrimitiveIterator.OfLong;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.util.ZeroBuffer;
@@ -298,6 +300,31 @@ public class LedgerEntryPage implements AutoCloseable {
}
}
+ public OfLong getEntriesIterator() {
+ return new OfLong() {
+ long firstEntry = getFirstEntry();
+ int curDiffEntry = 0;
+
+ @Override
+ public boolean hasNext() {
+ while ((curDiffEntry < entriesPerPage) && (getOffset(curDiffEntry * 8) == 0)) {
+ curDiffEntry++;
+ }
+ return (curDiffEntry != entriesPerPage);
+ }
+
+ @Override
+ public long nextLong() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ long nextEntry = firstEntry + curDiffEntry;
+ curDiffEntry++;
+ return nextEntry;
+ }
+ };
+ }
+
@Override
public void close() throws Exception {
releasePage();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index 2c47b28..d371520 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -30,6 +30,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.PrimitiveIterator;
+
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -258,4 +260,19 @@ public interface LedgerStorage {
default List<GarbageCollectionStatus> getGarbageCollectionStatus() {
return Collections.emptyList();
}
+
+ /**
+ * Returns the primitive long iterator for entries of the ledger, stored in
+ * this LedgerStorage. The returned iterator provide weakly consistent state
+ * of the ledger. It is guaranteed that entries of the ledger added to this
+ * LedgerStorage by the time this method is called will be available but
+ * modifications made after method invocation may not be available.
+ *
+ * @param ledgerId
+ * - id of the ledger
+ * @return the list of entries of the ledger available in this
+ * ledgerstorage.
+ * @throws Exception
+ */
+ PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 23e0716..c1ad591 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -23,21 +23,25 @@ package org.apache.bookkeeper.bookie;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
+import java.util.PrimitiveIterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.IteratorUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -361,4 +365,11 @@ public class SortedLedgerStorage
public List<GarbageCollectionStatus> getGarbageCollectionStatus() {
return interleavedLedgerStorage.getGarbageCollectionStatus();
}
+
+ @Override
+ public PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
+ PrimitiveIterator.OfLong entriesInMemtableItr = memTable.getListOfEntriesOfLedger(ledgerId);
+ PrimitiveIterator.OfLong entriesFromILSItr = interleavedLedgerStorage.getListOfEntriesOfLedger(ledgerId);
+ return IteratorUtility.mergePrimitiveLongIterator(entriesInMemtableItr, entriesFromILSItr);
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java
index 5e033e9..5df2186 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java
@@ -23,11 +23,13 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY_BYTES;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_FORCE_LEDGER;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_GET_LIST_OF_ENTRIES_OF_LEDGER;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY_BYTES;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_RECOVERY_ADD_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_LIST_OF_ENTRIES_OF_LEDGER;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_BYTES;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_BYTES;
@@ -72,6 +74,12 @@ public class BookieStats {
parent = READ_ENTRY
)
private final OpStatsLogger readEntryStats;
+ @StatsDoc(
+ name = BOOKIE_GET_LIST_OF_ENTRIES_OF_LEDGER,
+ help = "operation stats of GetListOfEntriesOfLedger on a bookie",
+ parent = GET_LIST_OF_ENTRIES_OF_LEDGER
+ )
+ private final OpStatsLogger getListOfEntriesOfLedgerStats;
// Bookie Operation Bytes Stats
@StatsDoc(name = BOOKIE_ADD_ENTRY_BYTES, help = "bytes stats of AddEntry on a bookie")
private final OpStatsLogger addBytesStats;
@@ -86,6 +94,7 @@ public class BookieStats {
addEntryStats = statsLogger.getOpStatsLogger(BOOKIE_ADD_ENTRY);
recoveryAddEntryStats = statsLogger.getOpStatsLogger(BOOKIE_RECOVERY_ADD_ENTRY);
readEntryStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY);
+ getListOfEntriesOfLedgerStats = statsLogger.getOpStatsLogger(BOOKIE_GET_LIST_OF_ENTRIES_OF_LEDGER);
addBytesStats = statsLogger.getOpStatsLogger(BOOKIE_ADD_ENTRY_BYTES);
readBytesStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY_BYTES);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index a68cd1c..73d254a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -37,6 +37,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.PrimitiveIterator.OfLong;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -359,4 +360,11 @@ public class DbLedgerStorage implements LedgerStorage {
return conf.getLong(keyName);
}
}
+
+ @Override
+ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
+ // check Issue #2078
+ throw new UnsupportedOperationException(
+ "getListOfEntriesOfLedger method is currently unsupported for DbLedgerStorage");
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 4c41235..58ce2be 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -33,6 +33,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.PrimitiveIterator.OfLong;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -888,4 +889,10 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
}
private static final Logger log = LoggerFactory.getLogger(DbLedgerStorage.class);
+
+ @Override
+ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
+ throw new UnsupportedOperationException(
+ "getListOfEntriesOfLedger method is currently unsupported for SingleDirectoryDbLedgerStorage");
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 6d2319c..4a869ee 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -27,8 +27,8 @@ import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithRegistra
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-
import com.google.common.util.concurrent.UncheckedExecutionException;
+
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -80,6 +80,7 @@ import org.apache.bookkeeper.replication.ReplicationException.CompatibilityExcep
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
@@ -1663,4 +1664,19 @@ public class BookKeeperAdmin implements AutoCloseable {
return bkc.getPlacementPolicy().isEnsembleAdheringToPlacementPolicy(ensembleBookiesList, writeQuorumSize,
ackQuorumSize);
}
+
+ /**
+ * Makes async request for getting list of entries of ledger from a bookie
+ * and returns Future for the result.
+ *
+ * @param address
+ * BookieSocketAddress of the bookie
+ * @param ledgerId
+ * ledgerId
+ * @return returns Future
+ */
+ public CompletableFuture<AvailabilityOfEntriesOfLedger> asyncGetListOfEntriesOfLedger(BookieSocketAddress address,
+ long ledgerId) {
+ return bkc.getBookieClient().getListOfEntriesOfLedger(address, ledgerId);
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index cdfde67..36c304c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -64,6 +64,7 @@ public interface BookKeeperClientStats {
String GET_BOOKIE_INFO_OP = "GET_BOOKIE_INFO";
String SPECULATIVE_READ_COUNT = "SPECULATIVE_READ_COUNT";
String READ_REQUESTS_REORDERED = "READ_REQUESTS_REORDERED";
+ String GET_LIST_OF_ENTRIES_OF_LEDGER_OP = "GET_LIST_OF_ENTRIES_OF_LEDGER";
// per channel stats
String CHANNEL_SCOPE = "per_channel_bookie_client";
@@ -81,6 +82,7 @@ public interface BookKeeperClientStats {
String TIMEOUT_GET_BOOKIE_INFO = "TIMEOUT_GET_BOOKIE_INFO";
String CHANNEL_START_TLS_OP = "START_TLS";
String CHANNEL_TIMEOUT_START_TLS_OP = "TIMEOUT_START_TLS";
+ String TIMEOUT_GET_LIST_OF_ENTRIES_OF_LEDGER = "TIMEOUT_GET_LIST_OF_ENTRIES_OF_LEDGER";
String NETTY_EXCEPTION_CNT = "NETTY_EXCEPTION_CNT";
String CLIENT_CHANNEL_WRITE_WAIT = "CLIENT_CHANNEL_WRITE_WAIT";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 85a4ef9..2092a67 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.proto;
import java.util.EnumSet;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -31,6 +32,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
import org.apache.bookkeeper.util.ByteBufList;
/**
@@ -216,6 +218,19 @@ public interface BookieClient {
GetBookieInfoCallback cb, Object ctx);
/**
+ * Makes async request for getting list of entries of ledger from a bookie
+ * and returns Future for the result.
+ *
+ * @param address
+ * BookieSocketAddress of the bookie
+ * @param ledgerId
+ * ledgerId
+ * @return returns Future
+ */
+ CompletableFuture<AvailabilityOfEntriesOfLedger> getListOfEntriesOfLedger(BookieSocketAddress address,
+ long ledgerId);
+
+ /**
* @return whether bookie client object has been closed
*/
boolean isClosed();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index 8342821..c772a97 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -38,6 +38,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
@@ -56,6 +57,7 @@ import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.FutureGetListOfEntriesOfLedger;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
@@ -66,6 +68,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
import org.apache.bookkeeper.util.ByteBufList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -317,6 +320,33 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
ledgerId);
}
+ @Override
+ public CompletableFuture<AvailabilityOfEntriesOfLedger> getListOfEntriesOfLedger(BookieSocketAddress address,
+ long ledgerId) {
+ FutureGetListOfEntriesOfLedger futureResult = new FutureGetListOfEntriesOfLedger(ledgerId);
+ final PerChannelBookieClientPool client = lookupClient(address);
+ if (client == null) {
+ futureResult.getListOfEntriesOfLedgerComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+ ledgerId, null);
+ return futureResult;
+ }
+ client.obtain((rc, pcbc) -> {
+ if (rc != BKException.Code.OK) {
+ try {
+ executor.executeOrdered(ledgerId, safeRun(() -> {
+ futureResult.getListOfEntriesOfLedgerComplete(rc, ledgerId, null);
+ }));
+ } catch (RejectedExecutionException re) {
+ futureResult.getListOfEntriesOfLedgerComplete(getRc(BKException.Code.InterruptedException),
+ ledgerId, null);
+ }
+ } else {
+ pcbc.getListOfEntriesOfLedger(ledgerId, futureResult);
+ }
+ }, ledgerId);
+ return futureResult;
+ }
+
private void completeRead(final int rc,
final long ledgerId,
final long entryId,
@@ -415,6 +445,7 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
}
}
+ @Override
public void readLac(final BookieSocketAddress addr, final long ledgerId, final ReadLacCallback cb,
final Object ctx) {
final PerChannelBookieClientPool client = lookupClient(addr);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 02b0f56..f46d19c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -339,6 +339,9 @@ public class BookieRequestProcessor implements RequestProcessor {
case START_TLS:
processStartTLSRequestV3(r, c);
break;
+ case GET_LIST_OF_ENTRIES_OF_LEDGER:
+ processGetListOfEntriesOfLedgerProcessorV3(r, c);
+ break;
default:
LOG.info("Unknown operation type {}", header.getOperation());
BookkeeperProtocol.Response.Builder response =
@@ -587,6 +590,16 @@ public class BookieRequestProcessor implements RequestProcessor {
}
}
+ private void processGetListOfEntriesOfLedgerProcessorV3(final BookkeeperProtocol.Request r, final Channel c) {
+ GetListOfEntriesOfLedgerProcessorV3 getListOfEntriesOfLedger = new GetListOfEntriesOfLedgerProcessorV3(r, c,
+ this);
+ if (null == readThreadPool) {
+ getListOfEntriesOfLedger.run();
+ } else {
+ readThreadPool.submit(getListOfEntriesOfLedger);
+ }
+ }
+
private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Channel c) {
WriteEntryProcessor write = WriteEntryProcessor.create(r, c, this);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index 2ade9e9..fdfd379 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.zookeeper.AsyncCallback;
@@ -108,6 +109,53 @@ public class BookkeeperInternalCallbacks {
}
/**
+ * A callback interface for GetListOfEntriesOfLedger command.
+ */
+ public interface GetListOfEntriesOfLedgerCallback {
+ void getListOfEntriesOfLedgerComplete(int rc, long ledgerId,
+ AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger);
+ }
+
+ /**
+ * Handle the Response Code and transform it to a BKException.
+ *
+ * @param <T>
+ * @param rc
+ * @param result
+ * @param future
+ */
+ public static <T> void finish(int rc, T result, CompletableFuture<? super T> future) {
+ if (rc != BKException.Code.OK) {
+ future.completeExceptionally(BKException.create(rc).fillInStackTrace());
+ } else {
+ future.complete(result);
+ }
+ }
+
+ /**
+ * Future for GetListOfEntriesOfLedger.
+ */
+ public static class FutureGetListOfEntriesOfLedger extends CompletableFuture<AvailabilityOfEntriesOfLedger>
+ implements GetListOfEntriesOfLedgerCallback {
+ private final long ledgerIdOfTheRequest;
+
+ FutureGetListOfEntriesOfLedger(long ledgerId) {
+ this.ledgerIdOfTheRequest = ledgerId;
+ }
+
+ @Override
+ public void getListOfEntriesOfLedgerComplete(int rc, long ledgerIdOfTheResponse,
+ AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger) {
+ if ((rc == BKException.Code.OK) && (ledgerIdOfTheRequest != ledgerIdOfTheResponse)) {
+ LOG.error("For getListOfEntriesOfLedger expected ledgerId in the response: {} actual ledgerId: {}",
+ ledgerIdOfTheRequest, ledgerIdOfTheResponse);
+ rc = BKException.Code.ReadException;
+ }
+ finish(rc, availabilityOfEntriesOfLedger, this);
+ }
+ }
+
+ /**
* A generic callback interface.
*/
public interface GenericCallback<T> {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java
new file mode 100644
index 0000000..f78cdf2
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import com.google.protobuf.ByteString;
+import io.netty.channel.Channel;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A processor class for v3 entries of a ledger packets.
+ */
+public class GetListOfEntriesOfLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GetListOfEntriesOfLedgerProcessorV3.class);
+ protected final GetListOfEntriesOfLedgerRequest getListOfEntriesOfLedgerRequest;
+ protected final long ledgerId;
+
+ public GetListOfEntriesOfLedgerProcessorV3(Request request, Channel channel,
+ BookieRequestProcessor requestProcessor) {
+ super(request, channel, requestProcessor);
+ this.getListOfEntriesOfLedgerRequest = request.getGetListOfEntriesOfLedgerRequest();
+ this.ledgerId = getListOfEntriesOfLedgerRequest.getLedgerId();
+ }
+
+ private GetListOfEntriesOfLedgerResponse getListOfEntriesOfLedgerResponse() {
+ long startTimeNanos = MathUtils.nowInNano();
+
+ GetListOfEntriesOfLedgerResponse.Builder getListOfEntriesOfLedgerResponse = GetListOfEntriesOfLedgerResponse
+ .newBuilder();
+ getListOfEntriesOfLedgerResponse.setLedgerId(ledgerId);
+
+ if (!isVersionCompatible()) {
+ getListOfEntriesOfLedgerResponse.setStatus(StatusCode.EBADVERSION);
+ requestProcessor.getRequestStats().getGetListOfEntriesOfLedgerStats()
+ .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+ return getListOfEntriesOfLedgerResponse.build();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received new getListOfEntriesOfLedger request: {}", request);
+ }
+ StatusCode status = StatusCode.EOK;
+ AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = null;
+ try {
+ availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+ requestProcessor.bookie.getListOfEntriesOfLedger(ledgerId));
+ getListOfEntriesOfLedgerResponse.setAvailabilityOfEntriesOfLedger(
+ ByteString.copyFrom(availabilityOfEntriesOfLedger.serializeStateOfEntriesOfLedger()));
+
+ } catch (Bookie.NoLedgerException e) {
+ status = StatusCode.ENOLEDGER;
+ LOG.error("No ledger found while performing getListOfEntriesOfLedger from ledger: {}", ledgerId, e);
+ } catch (IOException e) {
+ status = StatusCode.EIO;
+ LOG.error("IOException while performing getListOfEntriesOfLedger from ledger: {}", ledgerId);
+ }
+
+ if (status == StatusCode.EOK) {
+ requestProcessor.getRequestStats().getListOfEntriesOfLedgerStats
+ .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+ } else {
+ requestProcessor.getRequestStats().getListOfEntriesOfLedgerStats
+ .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+ }
+ // Finally set the status and return
+ getListOfEntriesOfLedgerResponse.setStatus(status);
+ return getListOfEntriesOfLedgerResponse.build();
+ }
+
+ @Override
+ public void safeRun() {
+ GetListOfEntriesOfLedgerResponse listOfEntriesOfLedgerResponse = getListOfEntriesOfLedgerResponse();
+ Response.Builder response = Response.newBuilder().setHeader(getHeader())
+ .setStatus(listOfEntriesOfLedgerResponse.getStatus())
+ .setGetListOfEntriesOfLedgerResponse(listOfEntriesOfLedgerResponse);
+ Response resp = response.build();
+ sendResponse(listOfEntriesOfLedgerResponse.getStatus(), resp,
+ requestProcessor.getRequestStats().getListOfEntriesOfLedgerRequestStats);
+ }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 1fdb403..2f1659f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -96,6 +96,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetListOfEntriesOfLedgerCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
@@ -109,6 +110,8 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest;
@@ -128,6 +131,7 @@ import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.SafeRunnable;
@@ -257,6 +261,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
help = "channel stats of connect requests"
)
private final OpStatsLogger connectTimer;
+ private final OpStatsLogger getListOfEntriesOfLedgerCompletionOpLogger;
+ private final OpStatsLogger getListOfEntriesOfLedgerCompletionTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.NETTY_EXCEPTION_CNT,
help = "the number of exceptions received from this channel"
@@ -395,6 +401,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
forceLedgerOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_FORCE_OP);
readLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_LAC_OP);
getBookieInfoOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.GET_BOOKIE_INFO_OP);
+ getListOfEntriesOfLedgerCompletionOpLogger = statsLogger
+ .getOpStatsLogger(BookKeeperClientStats.GET_LIST_OF_ENTRIES_OF_LEDGER_OP);
readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ);
addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD);
writeLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC);
@@ -403,6 +411,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO);
startTLSOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_START_TLS_OP);
startTLSTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_START_TLS_OP);
+ getListOfEntriesOfLedgerCompletionTimeoutOpLogger = statsLogger
+ .getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_LIST_OF_ENTRIES_OF_LEDGER);
exceptionCounter = statsLogger.getCounter(BookKeeperClientStats.NETTY_EXCEPTION_CNT);
connectTimer = statsLogger.getOpStatsLogger(BookKeeperClientStats.CLIENT_CONNECT_TIMER);
addEntryOutstanding = statsLogger.getCounter(BookKeeperClientStats.ADD_OP_OUTSTANDING);
@@ -829,6 +839,24 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
writeAndFlush(channel, completionKey, request);
}
+ public void getListOfEntriesOfLedger(final long ledgerId, GetListOfEntriesOfLedgerCallback cb) {
+ final long txnId = getTxnId();
+ final CompletionKey completionKey = new V3CompletionKey(txnId, OperationType.GET_LIST_OF_ENTRIES_OF_LEDGER);
+ completionObjects.put(completionKey, new GetListOfEntriesOfLedgerCompletion(completionKey, cb, ledgerId));
+
+ // Build the request.
+ BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder().setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.GET_LIST_OF_ENTRIES_OF_LEDGER).setTxnId(txnId);
+
+ GetListOfEntriesOfLedgerRequest.Builder getListOfEntriesOfLedgerRequestBuilder =
+ GetListOfEntriesOfLedgerRequest.newBuilder().setLedgerId(ledgerId);
+
+ final Request getListOfEntriesOfLedgerRequest = Request.newBuilder().setHeader(headerBuilder)
+ .setGetListOfEntriesOfLedgerRequest(getListOfEntriesOfLedgerRequestBuilder).build();
+
+ writeAndFlush(channel, completionKey, getListOfEntriesOfLedgerRequest);
+ }
+
/**
* Long Poll Reads.
*/
@@ -1983,6 +2011,61 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
}
}
+ class GetListOfEntriesOfLedgerCompletion extends CompletionValue {
+ final GetListOfEntriesOfLedgerCallback cb;
+
+ public GetListOfEntriesOfLedgerCompletion(final CompletionKey key,
+ final GetListOfEntriesOfLedgerCallback origCallback, final long ledgerId) {
+ super("GetListOfEntriesOfLedger", null, ledgerId, 0L, getListOfEntriesOfLedgerCompletionOpLogger,
+ getListOfEntriesOfLedgerCompletionTimeoutOpLogger);
+ this.cb = new GetListOfEntriesOfLedgerCallback() {
+ @Override
+ public void getListOfEntriesOfLedgerComplete(int rc, long ledgerId,
+ AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger) {
+ logOpResult(rc);
+ origCallback.getListOfEntriesOfLedgerComplete(rc, ledgerId, availabilityOfEntriesOfLedger);
+ key.release();
+ }
+ };
+ }
+
+ @Override
+ public void errorOut() {
+ errorOut(BKException.Code.BookieHandleNotAvailableException);
+ }
+
+ @Override
+ public void errorOut(final int rc) {
+ errorOutAndRunCallback(() -> cb.getListOfEntriesOfLedgerComplete(rc, ledgerId, null));
+ }
+
+ @Override
+ public void handleV3Response(BookkeeperProtocol.Response response) {
+ GetListOfEntriesOfLedgerResponse getListOfEntriesOfLedgerResponse = response
+ .getGetListOfEntriesOfLedgerResponse();
+ ByteBuf availabilityOfEntriesOfLedgerBuffer = Unpooled.EMPTY_BUFFER;
+ StatusCode status = response.getStatus() == StatusCode.EOK ? getListOfEntriesOfLedgerResponse.getStatus()
+ : response.getStatus();
+
+ if (getListOfEntriesOfLedgerResponse.hasAvailabilityOfEntriesOfLedger()) {
+ availabilityOfEntriesOfLedgerBuffer = Unpooled.wrappedBuffer(
+ getListOfEntriesOfLedgerResponse.getAvailabilityOfEntriesOfLedger().asReadOnlyByteBuffer());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ logResponse(status, "ledgerId", ledgerId);
+ }
+
+ int rc = convertStatus(status, BKException.Code.ReadException);
+ AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = null;
+ if (rc == BKException.Code.OK) {
+ availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+ availabilityOfEntriesOfLedgerBuffer.slice());
+ }
+ cb.getListOfEntriesOfLedgerComplete(rc, ledgerId, availabilityOfEntriesOfLedger);
+ }
+ }
+
private final Recycler<AddCompletion> addCompletionRecycler = new Recycler<AddCompletion>() {
@Override
protected AddCompletion newObject(Recycler.Handle<AddCompletion> handle) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java
index 1799e66..fd91957 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java
@@ -30,6 +30,8 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER_REQUEST;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_LIST_OF_ENTRIES_OF_LEDGER;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_LIST_OF_ENTRIES_OF_LEDGER_REQUEST;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED_WAIT;
@@ -210,6 +212,17 @@ public class RequestStats {
help = "operation stats of ReadEntry blocked on a bookie"
)
final OpStatsLogger readEntryBlockedStats;
+ @StatsDoc(
+ name = GET_LIST_OF_ENTRIES_OF_LEDGER_REQUEST,
+ help = "request stats of GetListOfEntriesOfLedger on a bookie"
+ )
+ final OpStatsLogger getListOfEntriesOfLedgerRequestStats;
+ @StatsDoc(
+ name = "GET_LIST_OF_ENTRIES_OF_LEDGER",
+ help = "operation stats of GetListOfEntriesOfLedger",
+ parent = GET_LIST_OF_ENTRIES_OF_LEDGER_REQUEST
+ )
+ final OpStatsLogger getListOfEntriesOfLedgerStats;
public RequestStats(StatsLogger statsLogger) {
this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
@@ -238,6 +251,10 @@ public class RequestStats {
this.addEntryBlockedStats = statsLogger.getOpStatsLogger(ADD_ENTRY_BLOCKED_WAIT);
this.readEntryBlockedStats = statsLogger.getOpStatsLogger(READ_ENTRY_BLOCKED_WAIT);
+ this.getListOfEntriesOfLedgerStats = statsLogger.getOpStatsLogger(GET_LIST_OF_ENTRIES_OF_LEDGER);
+ this.getListOfEntriesOfLedgerRequestStats =
+ statsLogger.getOpStatsLogger(GET_LIST_OF_ENTRIES_OF_LEDGER_REQUEST);
+
statsLogger.registerGauge(ADD_ENTRY_IN_PROGRESS, new Gauge<Number>() {
@Override
public Number getDefaultValue() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedger.java
new file mode 100644
index 0000000..6042bdb
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedger.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import io.netty.buffer.ByteBuf;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map.Entry;
+import java.util.PrimitiveIterator;
+import java.util.TreeMap;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+/**
+ * Ordered collection of SequenceGroups will represent entries of the ledger
+ * residing in a bookie.
+ *
+ * <p>In the byte array representation of AvailabilityOfEntriesOfLedger, for the
+ * sake of future extensibility it would be helpful to have reserved space for
+ * header at the beginning. So the first 64 bytes will be used for header, with
+ * the first four bytes specifying the int version number, next four bytes
+ * specifying the number of sequencegroups for now and the rest of the bytes in
+ * the reserved space will be 0's. The encoded format will be represented after
+ * the first 64 bytes. The ordered collection of SequenceGroups will be appended
+ * sequentially to this byte array, with each SequenceGroup taking 24 bytes.
+ */
+public class AvailabilityOfEntriesOfLedger {
+ public static final long INVALID_ENTRYID = -1;
+
+ /*
+ *
+ * Nomenclature:
+ *
+ * - Continuous entries are grouped as a ’Sequence’. - Number of continuous
+ * entries in a ‘Sequence’ is called ‘sequenceSize’. - Gap between
+ * Consecutive sequences is called ‘sequencePeriod’. - Consecutive sequences
+ * with same sequenceSize and same sequencePeriod in between consecutive
+ * sequences are grouped as a SequenceGroup. - ‘firstSequenceStart’ is the
+ * first entry in the first sequence of the SequenceGroup. -
+ * ‘lastSequenceStart’ is the first entry in the last sequence of the
+ * SequenceGroup.
+ *
+ * To represent a SequenceGroup, two long values and two int values are
+ * needed, so each SequenceGroup can be represented with (2 * 8 + 2 * 4 = 24
+ * bytes).
+ */
+ private static class SequenceGroup {
+ private static final int SEQUENCEGROUP_BYTES = 2 * Long.BYTES + 2 * Integer.BYTES;
+ private final long firstSequenceStart;
+ private final int sequenceSize;
+ private long lastSequenceStart = INVALID_ENTRYID;
+ private int sequencePeriod;
+ private boolean isSequenceGroupClosed = false;
+ private long numOfEntriesInSequenceGroup = 0;
+
+ private SequenceGroup(long firstSequenceStart, int sequenceSize) {
+ this.firstSequenceStart = firstSequenceStart;
+ this.lastSequenceStart = firstSequenceStart;
+ this.sequenceSize = sequenceSize;
+ this.sequencePeriod = 0;
+ }
+
+ private SequenceGroup(byte[] serializedSequenceGroup) {
+ ByteBuffer buffer = ByteBuffer.wrap(serializedSequenceGroup);
+ firstSequenceStart = buffer.getLong();
+ lastSequenceStart = buffer.getLong();
+ sequenceSize = buffer.getInt();
+ sequencePeriod = buffer.getInt();
+ setSequenceGroupClosed();
+ }
+
+ private boolean isSequenceGroupClosed() {
+ return isSequenceGroupClosed;
+ }
+
+ private void setSequenceGroupClosed() {
+ this.isSequenceGroupClosed = true;
+ numOfEntriesInSequenceGroup = (lastSequenceStart - firstSequenceStart) == 0 ? sequenceSize
+ : (((lastSequenceStart - firstSequenceStart) / sequencePeriod) + 1) * sequenceSize;
+ }
+
+ private long getNumOfEntriesInSequenceGroup() {
+ if (!isSequenceGroupClosed()) {
+ throw new IllegalStateException(
+ "SequenceGroup is not yet closed, it is illegal to call getNumOfEntriesInSequenceGroup");
+ }
+ return numOfEntriesInSequenceGroup;
+ }
+
+ private long getLastSequenceStart() {
+ return lastSequenceStart;
+ }
+
+ private void setLastSequenceStart(long lastSequenceStart) {
+ this.lastSequenceStart = lastSequenceStart;
+ }
+
+ private int getSequencePeriod() {
+ return sequencePeriod;
+ }
+
+ private void setSequencePeriod(int sequencePeriod) {
+ this.sequencePeriod = sequencePeriod;
+ }
+
+ private long getFirstSequenceStart() {
+ return firstSequenceStart;
+ }
+
+ private void serializeSequenceGroup(byte[] byteArrayForSerialization) {
+ if (!isSequenceGroupClosed()) {
+ throw new IllegalStateException(
+ "SequenceGroup is not yet closed, it is illegal to call serializeSequenceGroup");
+ }
+ ByteBuffer buffer = ByteBuffer.wrap(byteArrayForSerialization);
+ buffer.putLong(firstSequenceStart);
+ buffer.putLong(lastSequenceStart);
+ buffer.putInt(sequenceSize);
+ buffer.putInt(sequencePeriod);
+ }
+
+ private boolean isEntryAvailable(long entryId) {
+ if (!isSequenceGroupClosed()) {
+ throw new IllegalStateException(
+ "SequenceGroup is not yet closed, it is illegal to call isEntryAvailable");
+ }
+
+ if ((entryId >= firstSequenceStart) && (entryId <= (lastSequenceStart + sequenceSize))) {
+ if (sequencePeriod == 0) {
+ return ((entryId - firstSequenceStart) < sequenceSize);
+ } else {
+ return (((entryId - firstSequenceStart) % sequencePeriod) < sequenceSize);
+ }
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public static final int HEADER_SIZE = 64;
+ public static final int V0 = 0;
+ // current version of AvailabilityOfEntriesOfLedger header is V0
+ public static final int CURRENT_HEADER_VERSION = V0;
+ private final TreeMap<Long, SequenceGroup> sortedSequenceGroups = new TreeMap<Long, SequenceGroup>();
+ private MutableObject<SequenceGroup> curSequenceGroup = new MutableObject<SequenceGroup>(null);
+ private MutableLong curSequenceStartEntryId = new MutableLong(INVALID_ENTRYID);
+ private MutableInt curSequenceSize = new MutableInt(0);
+ private boolean availabilityOfEntriesOfLedgerClosed = false;
+ private long totalNumOfAvailableEntries = 0;
+
+ public AvailabilityOfEntriesOfLedger(PrimitiveIterator.OfLong entriesOfLedgerItr) {
+ while (entriesOfLedgerItr.hasNext()) {
+ this.addEntryToAvailabileEntriesOfLedger(entriesOfLedgerItr.nextLong());
+ }
+ this.closeStateOfEntriesOfALedger();
+ }
+
+ public AvailabilityOfEntriesOfLedger(byte[] serializeStateOfEntriesOfLedger) {
+ byte[] header = new byte[HEADER_SIZE];
+ byte[] serializedSequenceGroupByteArray = new byte[SequenceGroup.SEQUENCEGROUP_BYTES];
+ System.arraycopy(serializeStateOfEntriesOfLedger, 0, header, 0, HEADER_SIZE);
+
+ ByteBuffer headerByteBuf = ByteBuffer.wrap(header);
+ int headerVersion = headerByteBuf.getInt();
+ if (headerVersion > CURRENT_HEADER_VERSION) {
+ throw new IllegalArgumentException("Unsupported Header Version: " + headerVersion);
+ }
+ int numOfSequenceGroups = headerByteBuf.getInt();
+ SequenceGroup newSequenceGroup;
+ for (int i = 0; i < numOfSequenceGroups; i++) {
+ Arrays.fill(serializedSequenceGroupByteArray, (byte) 0);
+ System.arraycopy(serializeStateOfEntriesOfLedger, HEADER_SIZE + (i * SequenceGroup.SEQUENCEGROUP_BYTES),
+ serializedSequenceGroupByteArray, 0, SequenceGroup.SEQUENCEGROUP_BYTES);
+ newSequenceGroup = new SequenceGroup(serializedSequenceGroupByteArray);
+ sortedSequenceGroups.put(newSequenceGroup.getFirstSequenceStart(), newSequenceGroup);
+ }
+ setAvailabilityOfEntriesOfLedgerClosed();
+ }
+
+ public AvailabilityOfEntriesOfLedger(ByteBuf byteBuf) {
+ byte[] header = new byte[HEADER_SIZE];
+ byte[] serializedSequenceGroupByteArray = new byte[SequenceGroup.SEQUENCEGROUP_BYTES];
+ int readerIndex = byteBuf.readerIndex();
+ byteBuf.getBytes(readerIndex, header, 0, HEADER_SIZE);
+
+ ByteBuffer headerByteBuf = ByteBuffer.wrap(header);
+ int headerVersion = headerByteBuf.getInt();
+ if (headerVersion > CURRENT_HEADER_VERSION) {
+ throw new IllegalArgumentException("Unsupported Header Version: " + headerVersion);
+ }
+ int numOfSequenceGroups = headerByteBuf.getInt();
+ SequenceGroup newSequenceGroup;
+ for (int i = 0; i < numOfSequenceGroups; i++) {
+ Arrays.fill(serializedSequenceGroupByteArray, (byte) 0);
+ byteBuf.getBytes(readerIndex + HEADER_SIZE + (i * SequenceGroup.SEQUENCEGROUP_BYTES),
+ serializedSequenceGroupByteArray, 0, SequenceGroup.SEQUENCEGROUP_BYTES);
+ newSequenceGroup = new SequenceGroup(serializedSequenceGroupByteArray);
+ sortedSequenceGroups.put(newSequenceGroup.getFirstSequenceStart(), newSequenceGroup);
+ }
+ setAvailabilityOfEntriesOfLedgerClosed();
+ }
+
+ private void initializeCurSequence(long curSequenceStartEntryIdValue) {
+ curSequenceStartEntryId.setValue(curSequenceStartEntryIdValue);
+ curSequenceSize.setValue(1);
+ }
+
+ private void resetCurSequence() {
+ curSequenceStartEntryId.setValue(INVALID_ENTRYID);
+ curSequenceSize.setValue(0);
+ }
+
+ private boolean isCurSequenceInitialized() {
+ return curSequenceStartEntryId.longValue() != INVALID_ENTRYID;
+ }
+
+ private boolean isEntryExistingInCurSequence(long entryId) {
+ return (curSequenceStartEntryId.longValue() <= entryId)
+ && (entryId < (curSequenceStartEntryId.longValue() + curSequenceSize.intValue()));
+ }
+
+ private boolean isEntryAppendableToCurSequence(long entryId) {
+ return ((curSequenceStartEntryId.longValue() + curSequenceSize.intValue()) == entryId);
+ }
+
+ private void incrementCurSequenceSize() {
+ curSequenceSize.increment();
+ }
+
+ private void createNewSequenceGroupWithCurSequence() {
+ SequenceGroup curSequenceGroupValue = curSequenceGroup.getValue();
+ curSequenceGroupValue.setSequenceGroupClosed();
+ sortedSequenceGroups.put(curSequenceGroupValue.getFirstSequenceStart(), curSequenceGroupValue);
+ curSequenceGroup.setValue(new SequenceGroup(curSequenceStartEntryId.longValue(), curSequenceSize.intValue()));
+ }
+
+ private boolean isCurSequenceGroupInitialized() {
+ return curSequenceGroup.getValue() != null;
+ }
+
+ private void initializeCurSequenceGroupWithCurSequence() {
+ curSequenceGroup.setValue(new SequenceGroup(curSequenceStartEntryId.longValue(), curSequenceSize.intValue()));
+ }
+
+ private boolean doesCurSequenceBelongToCurSequenceGroup() {
+ long curSequenceStartEntryIdValue = curSequenceStartEntryId.longValue();
+ int curSequenceSizeValue = curSequenceSize.intValue();
+ boolean belongsToThisSequenceGroup = false;
+ SequenceGroup curSequenceGroupValue = curSequenceGroup.getValue();
+ if ((curSequenceGroupValue.sequenceSize == curSequenceSizeValue)
+ && ((curSequenceGroupValue.getLastSequenceStart() == INVALID_ENTRYID) || ((curSequenceStartEntryIdValue
+ - curSequenceGroupValue.getLastSequenceStart()) == curSequenceGroupValue
+ .getSequencePeriod()))) {
+ belongsToThisSequenceGroup = true;
+ }
+ return belongsToThisSequenceGroup;
+ }
+
+ private void appendCurSequenceToCurSequenceGroup() {
+ SequenceGroup curSequenceGroupValue = curSequenceGroup.getValue();
+ curSequenceGroupValue.setLastSequenceStart(curSequenceStartEntryId.longValue());
+ if (curSequenceGroupValue.getSequencePeriod() == 0) {
+ curSequenceGroupValue.setSequencePeriod(
+ ((int) (curSequenceGroupValue.getLastSequenceStart() - curSequenceGroupValue.firstSequenceStart)));
+ }
+ }
+
+ private void addCurSequenceToSequenceGroup() {
+ if (!isCurSequenceGroupInitialized()) {
+ initializeCurSequenceGroupWithCurSequence();
+ } else if (doesCurSequenceBelongToCurSequenceGroup()) {
+ appendCurSequenceToCurSequenceGroup();
+ } else {
+ createNewSequenceGroupWithCurSequence();
+ }
+ }
+
+ private void addEntryToAvailabileEntriesOfLedger(long entryId) {
+ if (!isCurSequenceInitialized()) {
+ initializeCurSequence(entryId);
+ } else if (isEntryExistingInCurSequence(entryId)) {
+ /* this entry is already added so do nothing */
+ } else if (isEntryAppendableToCurSequence(entryId)) {
+ incrementCurSequenceSize();
+ } else {
+ addCurSequenceToSequenceGroup();
+ initializeCurSequence(entryId);
+ }
+ }
+
+ private void closeStateOfEntriesOfALedger() {
+ if (isCurSequenceInitialized()) {
+ addCurSequenceToSequenceGroup();
+ resetCurSequence();
+ }
+ SequenceGroup curSequenceGroupValue = curSequenceGroup.getValue();
+ if (curSequenceGroupValue != null) {
+ curSequenceGroupValue.setSequenceGroupClosed();
+ sortedSequenceGroups.put(curSequenceGroupValue.getFirstSequenceStart(), curSequenceGroupValue);
+ }
+ setAvailabilityOfEntriesOfLedgerClosed();
+ }
+
+ private boolean isAvailabilityOfEntriesOfLedgerClosed() {
+ return availabilityOfEntriesOfLedgerClosed;
+ }
+
+ private void setAvailabilityOfEntriesOfLedgerClosed() {
+ this.availabilityOfEntriesOfLedgerClosed = true;
+ for (Entry<Long, SequenceGroup> seqGroupEntry : sortedSequenceGroups.entrySet()) {
+ totalNumOfAvailableEntries += seqGroupEntry.getValue().getNumOfEntriesInSequenceGroup();
+ }
+ }
+
+ public byte[] serializeStateOfEntriesOfLedger() {
+ if (!isAvailabilityOfEntriesOfLedgerClosed()) {
+ throw new IllegalStateException("AvailabilityOfEntriesOfLedger is not yet closed,"
+ + "it is illegal to call serializeStateOfEntriesOfLedger");
+ }
+ byte[] header = new byte[HEADER_SIZE];
+ ByteBuffer headerByteBuf = ByteBuffer.wrap(header);
+ byte[] serializedSequenceGroupByteArray = new byte[SequenceGroup.SEQUENCEGROUP_BYTES];
+ byte[] serializedStateByteArray = new byte[HEADER_SIZE
+ + (sortedSequenceGroups.size() * SequenceGroup.SEQUENCEGROUP_BYTES)];
+ final int numOfSequenceGroups = sortedSequenceGroups.size();
+ headerByteBuf.putInt(CURRENT_HEADER_VERSION);
+ headerByteBuf.putInt(numOfSequenceGroups);
+ System.arraycopy(header, 0, serializedStateByteArray, 0, HEADER_SIZE);
+ int seqNum = 0;
+ for (Entry<Long, SequenceGroup> seqGroupEntry : sortedSequenceGroups.entrySet()) {
+ SequenceGroup seqGroup = seqGroupEntry.getValue();
+ Arrays.fill(serializedSequenceGroupByteArray, (byte) 0);
+ seqGroup.serializeSequenceGroup(serializedSequenceGroupByteArray);
+ System.arraycopy(serializedSequenceGroupByteArray, 0, serializedStateByteArray,
+ HEADER_SIZE + ((seqNum++) * SequenceGroup.SEQUENCEGROUP_BYTES), SequenceGroup.SEQUENCEGROUP_BYTES);
+ }
+ return serializedStateByteArray;
+ }
+
+ public boolean isEntryAvailable(long entryId) {
+ if (!isAvailabilityOfEntriesOfLedgerClosed()) {
+ throw new IllegalStateException(
+ "AvailabilityOfEntriesOfLedger is not yet closed, it is illegal to call isEntryAvailable");
+ }
+ Entry<Long, SequenceGroup> seqGroup = sortedSequenceGroups.floorEntry(entryId);
+ if (seqGroup == null) {
+ return false;
+ }
+ return seqGroup.getValue().isEntryAvailable(entryId);
+ }
+
+ public long getTotalNumOfAvailableEntries() {
+ if (!isAvailabilityOfEntriesOfLedgerClosed()) {
+ throw new IllegalStateException("AvailabilityOfEntriesOfLedger is not yet closed,"
+ + " it is illegal to call getTotalNumOfAvailableEntries");
+ }
+ return totalNumOfAvailableEntries;
+ }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/IteratorUtility.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/IteratorUtility.java
new file mode 100644
index 0000000..701d31a
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/IteratorUtility.java
@@ -0,0 +1,171 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.PrimitiveIterator;
+import java.util.PrimitiveIterator.OfLong;
+import java.util.function.ToLongFunction;
+
+/**
+ * Utility class to merge iterators.
+ */
+public class IteratorUtility {
+
+ private static final long INVALID_ELEMENT = -1;
+
+ /**
+ * Merges two long primitive sorted iterators and returns merged iterator.
+ * It expects
+ * - input iterators to be sorted
+ * - input iterators to be non-repetitive for merged iterator to be non-repetitive
+ * It removes duplicates from the input iterators.
+ *
+ * @param iter1
+ * first primitive oflong input iterator
+ * @param iter2
+ * second primitive oflong input iterator
+ * @return merged primitive oflong iterator.
+ */
+ public static OfLong mergePrimitiveLongIterator(OfLong iter1, OfLong iter2) {
+ return new PrimitiveIterator.OfLong() {
+ private long curIter1Element = INVALID_ELEMENT;
+ private long curIter2Element = INVALID_ELEMENT;
+ private boolean hasToPreFetch = true;
+
+ @Override
+ public boolean hasNext() {
+ if (hasToPreFetch) {
+ if (curIter1Element == INVALID_ELEMENT) {
+ curIter1Element = iter1.hasNext() ? iter1.nextLong() : INVALID_ELEMENT;
+ }
+ if (curIter2Element == INVALID_ELEMENT) {
+ curIter2Element = iter2.hasNext() ? iter2.nextLong() : INVALID_ELEMENT;
+ }
+ }
+ hasToPreFetch = false;
+ return (curIter1Element != INVALID_ELEMENT || curIter2Element != INVALID_ELEMENT);
+ }
+
+ @Override
+ public long nextLong() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ long returnEntryId = INVALID_ELEMENT;
+ if (curIter1Element != INVALID_ELEMENT && curIter2Element != INVALID_ELEMENT) {
+ if (curIter1Element == curIter2Element) {
+ returnEntryId = curIter1Element;
+ curIter1Element = INVALID_ELEMENT;
+ curIter2Element = INVALID_ELEMENT;
+ } else if (curIter1Element < curIter2Element) {
+ returnEntryId = curIter1Element;
+ curIter1Element = INVALID_ELEMENT;
+ } else {
+ returnEntryId = curIter2Element;
+ curIter2Element = INVALID_ELEMENT;
+ }
+ } else if (curIter1Element != INVALID_ELEMENT) {
+ returnEntryId = curIter1Element;
+ curIter1Element = INVALID_ELEMENT;
+ } else {
+ returnEntryId = curIter2Element;
+ curIter2Element = INVALID_ELEMENT;
+ }
+ hasToPreFetch = true;
+ return returnEntryId;
+ }
+ };
+ }
+
+ /**
+ * Merges two sorted iterators and returns merged iterator sorted using
+ * comparator. It uses 'function' to convert T type to long, to return long
+ * iterator.
+ * It expects
+ * - input iterators to be sorted
+ * - input iterators to be non-repetitive for merged iterator to be non-repetitive
+ * It removes duplicates from the input iterators.
+ *
+ * @param iter1
+ * first iterator of type T
+ * @param iter2
+ * second iterator of type T
+ * @param comparator
+ * @param function
+ * @return
+ */
+ public static <T> OfLong mergeIteratorsForPrimitiveLongIterator(Iterator<T> iter1, Iterator<T> iter2,
+ Comparator<T> comparator, ToLongFunction<T> function) {
+ return new PrimitiveIterator.OfLong() {
+ private T curIter1Entry = null;
+ private T curIter2Entry = null;
+ private boolean hasToPreFetch = true;
+
+ @Override
+ public boolean hasNext() {
+ if (hasToPreFetch) {
+ if (curIter1Entry == null) {
+ curIter1Entry = iter1.hasNext() ? iter1.next() : null;
+ }
+ if (curIter2Entry == null) {
+ curIter2Entry = iter2.hasNext() ? iter2.next() : null;
+ }
+ }
+ hasToPreFetch = false;
+ return (curIter1Entry != null || curIter2Entry != null);
+ }
+
+ @Override
+ public long nextLong() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ T returnEntry = null;
+ if (curIter1Entry != null && curIter2Entry != null) {
+ int compareValue = comparator.compare(curIter1Entry, curIter2Entry);
+ if (compareValue == 0) {
+ returnEntry = curIter1Entry;
+ curIter1Entry = null;
+ curIter2Entry = null;
+ } else if (compareValue < 0) {
+ returnEntry = curIter1Entry;
+ curIter1Entry = null;
+ } else {
+ returnEntry = curIter2Entry;
+ curIter2Entry = null;
+ }
+ } else if (curIter1Entry != null) {
+ returnEntry = curIter1Entry;
+ curIter1Entry = null;
+ } else {
+ returnEntry = curIter2Entry;
+ curIter2Entry = null;
+ }
+ hasToPreFetch = true;
+ return function.applyAsLong(returnEntry);
+ }
+ };
+ }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
index c12ed91..6416173 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
@@ -23,6 +23,8 @@ package org.apache.bookkeeper.bookie;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_PAGE_RETRIES;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -34,11 +36,15 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import java.util.PrimitiveIterator.OfLong;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.IntStream;
+
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -135,7 +141,9 @@ public class InterleavedLedgerStorageTest {
TestableEntryLogger entryLogger;
InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
final long numWrites = 2000;
+ final long moreNumOfWrites = 3000;
final long entriesPerWrite = 2;
+ final long numOfLedgers = 5;
@Before
public void setUp() throws Exception {
@@ -158,7 +166,7 @@ public class InterleavedLedgerStorageTest {
// Insert some ledger & entries in the interleaved storage
for (long entryId = 0; entryId < numWrites; entryId++) {
- for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+ for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
if (entryId == 0) {
interleavedStorage.setMasterKey(ledgerId, ("ledger-" + ledgerId).getBytes());
interleavedStorage.setFenced(ledgerId);
@@ -192,6 +200,52 @@ public class InterleavedLedgerStorageTest {
}
@Test
+ public void testGetListOfEntriesOfLedger() throws IOException {
+ for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
+ OfLong entriesOfLedger = interleavedStorage.getListOfEntriesOfLedger(ledgerId);
+ ArrayList<Long> arrayList = new ArrayList<Long>();
+ Consumer<Long> addMethod = arrayList::add;
+ entriesOfLedger.forEachRemaining(addMethod);
+ assertEquals("Number of entries", numWrites, arrayList.size());
+ assertTrue("Entries of Ledger", IntStream.range(0, arrayList.size()).allMatch(i -> {
+ return arrayList.get(i).longValue() == (i * entriesPerWrite);
+ }));
+ }
+
+ long nonExistingLedger = 456789L;
+ OfLong entriesOfLedger = interleavedStorage.getListOfEntriesOfLedger(nonExistingLedger);
+ assertFalse("There shouldn't be any entry", entriesOfLedger.hasNext());
+ }
+
+ @Test
+ public void testGetListOfEntriesOfLedgerAfterFlush() throws IOException {
+ interleavedStorage.flush();
+
+ // Insert some more ledger & entries in the interleaved storage
+ for (long entryId = numWrites; entryId < moreNumOfWrites; entryId++) {
+ for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
+ ByteBuf entry = Unpooled.buffer(128);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId * entriesPerWrite);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ interleavedStorage.addEntry(entry);
+ }
+ }
+
+ for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
+ OfLong entriesOfLedger = interleavedStorage.getListOfEntriesOfLedger(ledgerId);
+ ArrayList<Long> arrayList = new ArrayList<Long>();
+ Consumer<Long> addMethod = arrayList::add;
+ entriesOfLedger.forEachRemaining(addMethod);
+ assertEquals("Number of entries", moreNumOfWrites, arrayList.size());
+ assertTrue("Entries of Ledger", IntStream.range(0, arrayList.size()).allMatch(i -> {
+ return arrayList.get(i).longValue() == (i * entriesPerWrite);
+ }));
+ }
+ }
+
+ @Test
public void testConsistencyCheckConcurrentGC() throws Exception {
final long signalDone = -1;
final List<Exception> asyncErrors = new ArrayList<>();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
index d028f70..76244b7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
@@ -28,7 +28,11 @@ import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.PrimitiveIterator.OfLong;
import java.util.concurrent.CountDownLatch;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -37,6 +41,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.TestUtils;
+import org.junit.Assert;
import org.junit.Test;
/**
@@ -277,4 +282,44 @@ public class LedgerStorageTest extends BookKeeperClusterTestCase {
fi.readHeader();
return fi;
}
+
+ @Test
+ public void testGetListOfEntriesOfLedger() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ int numOfBookies = bs.size();
+ int numOfEntries = 5;
+ BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
+ BookKeeper bkc = new BookKeeper(conf);
+ LedgerHandle lh = bkc.createLedger(numOfBookies, numOfBookies, digestType, "testPasswd".getBytes());
+ long lId = lh.getId();
+ for (int i = 0; i < numOfEntries; i++) {
+ lh.addEntry("000".getBytes());
+ }
+
+ ServerConfiguration newBookieConf = new ServerConfiguration(bsConfs.get(0));
+ /*
+ * by reusing bookieServerConfig and setting metadataServiceUri to null
+ * we can create/start new Bookie instance using the same data
+ * (journal/ledger/index) of the existing BookeieServer for our testing
+ * purpose.
+ */
+ newBookieConf.setMetadataServiceUri(null);
+ Bookie newbookie = new Bookie(newBookieConf);
+ /*
+ * since 'newbookie' uses the same data as original Bookie, it should be
+ * able to read journal of the original bookie.
+ */
+ newbookie.readJournal();
+
+ OfLong listOfEntriesItr = newbookie.getListOfEntriesOfLedger(lId);
+ ArrayList<Long> arrayList = new ArrayList<Long>();
+ Consumer<Long> addMethod = arrayList::add;
+ listOfEntriesItr.forEachRemaining(addMethod);
+
+ assertEquals("Num Of Entries", numOfEntries, arrayList.size());
+ Assert.assertTrue("Iterator should be sorted",
+ IntStream.range(0, arrayList.size() - 1).allMatch(k -> arrayList.get(k) <= arrayList.get(k + 1)));
+ bkc.close();
+ }
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageTest.java
new file mode 100644
index 0000000..01383e2
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageTest.java
@@ -0,0 +1,194 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.PrimitiveIterator.OfLong;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Testing SortedLedgerStorage.
+ */
+@RunWith(Parameterized.class)
+public class SortedLedgerStorageTest {
+
+ TestStatsProvider statsProvider = new TestStatsProvider();
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+ LedgerDirsManager ledgerDirsManager;
+ SortedLedgerStorage sortedLedgerStorage = new SortedLedgerStorage();
+
+ final long numWrites = 2000;
+ final long moreNumOfWrites = 3000;
+ final long entriesPerWrite = 2;
+ final long numOfLedgers = 5;
+
+ @Parameterized.Parameters
+ public static Iterable<Boolean> elplSetting() {
+ return Arrays.asList(true, false);
+ }
+
+ public SortedLedgerStorageTest(boolean elplSetting) {
+ conf.setEntryLogSizeLimit(2048);
+ conf.setEntryLogPerLedgerEnabled(elplSetting);
+ }
+
+ CheckpointSource checkpointSource = new CheckpointSource() {
+ @Override
+ public Checkpoint newCheckpoint() {
+ return Checkpoint.MAX;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IOException {
+ }
+ };
+
+ Checkpointer checkpointer = new Checkpointer() {
+ @Override
+ public void startCheckpoint(Checkpoint checkpoint) {
+ // No-op
+ }
+
+ @Override
+ public void start() {
+ // no-op
+ }
+ };
+
+ @Before
+ public void setUp() throws Exception {
+ File tmpDir = File.createTempFile("bkTest", ".dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File curDir = Bookie.getCurrentDirectory(tmpDir);
+ Bookie.checkDirectoryStructure(curDir);
+
+ conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+ ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+ sortedLedgerStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, null, checkpointSource,
+ checkpointer, statsProvider.getStatsLogger(BOOKIE_SCOPE), UnpooledByteBufAllocator.DEFAULT);
+ }
+
+ @Test
+ public void testGetListOfEntriesOfLedger() throws Exception {
+ long nonExistingLedgerId = 123456L;
+ OfLong entriesItr = sortedLedgerStorage.getListOfEntriesOfLedger(nonExistingLedgerId);
+ assertFalse("There shouldn't be any entries for this ledger", entriesItr.hasNext());
+ // Insert some ledger & entries in the interleaved storage
+ for (long entryId = 0; entryId < numWrites; entryId++) {
+ for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
+ if (entryId == 0) {
+ sortedLedgerStorage.setMasterKey(ledgerId, ("ledger-" + ledgerId).getBytes());
+ sortedLedgerStorage.setFenced(ledgerId);
+ }
+ ByteBuf entry = Unpooled.buffer(128);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId * entriesPerWrite);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ sortedLedgerStorage.addEntry(entry);
+ }
+ }
+
+ for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
+ OfLong entriesOfLedger = sortedLedgerStorage.getListOfEntriesOfLedger(ledgerId);
+ ArrayList<Long> arrayList = new ArrayList<Long>();
+ Consumer<Long> addMethod = arrayList::add;
+ entriesOfLedger.forEachRemaining(addMethod);
+ assertEquals("Number of entries", numWrites, arrayList.size());
+ assertTrue("Entries of Ledger", IntStream.range(0, arrayList.size()).allMatch(i -> {
+ return arrayList.get(i).longValue() == (i * entriesPerWrite);
+ }));
+ }
+
+ nonExistingLedgerId = 456789L;
+ entriesItr = sortedLedgerStorage.getListOfEntriesOfLedger(nonExistingLedgerId);
+ assertFalse("There shouldn't be any entry", entriesItr.hasNext());
+ }
+
+ @Test
+ public void testGetListOfEntriesOfLedgerAfterFlush() throws IOException {
+ // Insert some ledger & entries in the interleaved storage
+ for (long entryId = 0; entryId < numWrites; entryId++) {
+ for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
+ if (entryId == 0) {
+ sortedLedgerStorage.setMasterKey(ledgerId, ("ledger-" + ledgerId).getBytes());
+ sortedLedgerStorage.setFenced(ledgerId);
+ }
+ ByteBuf entry = Unpooled.buffer(128);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId * entriesPerWrite);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ sortedLedgerStorage.addEntry(entry);
+ }
+ }
+
+ sortedLedgerStorage.flush();
+
+ // Insert some more ledger & entries in the interleaved storage
+ for (long entryId = numWrites; entryId < moreNumOfWrites; entryId++) {
+ for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
+ ByteBuf entry = Unpooled.buffer(128);
+ entry.writeLong(ledgerId);
+ entry.writeLong(entryId * entriesPerWrite);
+ entry.writeBytes(("entry-" + entryId).getBytes());
+
+ sortedLedgerStorage.addEntry(entry);
+ }
+ }
+
+ for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
+ OfLong entriesOfLedger = sortedLedgerStorage.getListOfEntriesOfLedger(ledgerId);
+ ArrayList<Long> arrayList = new ArrayList<Long>();
+ Consumer<Long> addMethod = arrayList::add;
+ entriesOfLedger.forEachRemaining(addMethod);
+ assertEquals("Number of entries", moreNumOfWrites, arrayList.size());
+ assertTrue("Entries of Ledger", IntStream.range(0, arrayList.size()).allMatch(i -> {
+ return arrayList.get(i).longValue() == (i * entriesPerWrite);
+ }));
+ }
+ }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
index 68e3eeb..9e6c559 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
@@ -18,6 +18,8 @@
*/
package org.apache.bookkeeper.bookie;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -32,9 +34,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.PrimitiveIterator.OfLong;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -50,6 +57,7 @@ import org.junit.runners.Parameterized.Parameters;
/**
* Test the EntryMemTable class.
*/
+@Slf4j
@RunWith(Parameterized.class)
public class TestEntryMemTable implements CacheCallback, SkipListFlusher, CheckpointSource {
@@ -297,5 +305,151 @@ public class TestEntryMemTable implements CacheCallback, SkipListFlusher, Checkp
}
}
+
+ @Test
+ public void testGetListOfEntriesOfLedger() throws IOException {
+ Set<EntryKeyValue> flushedKVs = Collections.newSetFromMap(new ConcurrentHashMap<EntryKeyValue, Boolean>());
+ KVFLusher flusher = new KVFLusher(flushedKVs);
+ int numofEntries = 100;
+ int numOfLedgers = 5;
+ byte[] data = new byte[10];
+ for (long entryId = 1; entryId <= numofEntries; entryId++) {
+ for (long ledgerId = 1; ledgerId <= numOfLedgers; ledgerId++) {
+ random.nextBytes(data);
+ assertTrue(ledgerId + ":" + entryId + " is duplicate in mem-table!",
+ memTable.addEntry(ledgerId, entryId, ByteBuffer.wrap(data), this) != 0);
+ }
+ }
+ for (long ledgerId = 1; ledgerId <= numOfLedgers; ledgerId++) {
+ OfLong entriesItr = memTable.getListOfEntriesOfLedger((random.nextInt((int) ledgerId) + 1));
+ ArrayList<Long> listOfEntries = new ArrayList<Long>();
+ Consumer<Long> addMethod = listOfEntries::add;
+ entriesItr.forEachRemaining(addMethod);
+ assertEquals("Number of Entries", numofEntries, listOfEntries.size());
+ for (int i = 0; i < numofEntries; i++) {
+ assertEquals("listOfEntries should be sorted", Long.valueOf(i + 1), listOfEntries.get(i));
+ }
+ }
+ assertTrue("Snapshot is expected to be empty since snapshot is not done", memTable.snapshot.isEmpty());
+ assertTrue("Take snapshot and returned checkpoint should not be empty", memTable.snapshot() != null);
+ assertFalse("After taking snapshot, snapshot should not be empty ", memTable.snapshot.isEmpty());
+ for (long ledgerId = 1; ledgerId <= numOfLedgers; ledgerId++) {
+ OfLong entriesItr = memTable.getListOfEntriesOfLedger((random.nextInt((int) ledgerId) + 1));
+ ArrayList<Long> listOfEntries = new ArrayList<Long>();
+ Consumer<Long> addMethod = listOfEntries::add;
+ entriesItr.forEachRemaining(addMethod);
+ assertEquals("Number of Entries should be the same even after taking snapshot", numofEntries,
+ listOfEntries.size());
+ for (int i = 0; i < numofEntries; i++) {
+ assertEquals("listOfEntries should be sorted", Long.valueOf(i + 1), listOfEntries.get(i));
+ }
+ }
+
+ memTable.flush(flusher);
+ for (long ledgerId = 1; ledgerId <= numOfLedgers; ledgerId++) {
+ OfLong entriesItr = memTable.getListOfEntriesOfLedger((random.nextInt((int) ledgerId) + 1));
+ assertFalse("After flushing there shouldn't be entries in memtable", entriesItr.hasNext());
+ }
+ }
+
+ @Test
+ public void testGetListOfEntriesOfLedgerFromBothKVMapAndSnapshot() throws IOException {
+ int numofEntries = 100;
+ int newNumOfEntries = 200;
+ int numOfLedgers = 5;
+ byte[] data = new byte[10];
+ for (long entryId = 1; entryId <= numofEntries; entryId++) {
+ for (long ledgerId = 1; ledgerId <= numOfLedgers; ledgerId++) {
+ random.nextBytes(data);
+ assertTrue(ledgerId + ":" + entryId + " is duplicate in mem-table!",
+ memTable.addEntry(ledgerId, entryId, ByteBuffer.wrap(data), this) != 0);
+ }
+ }
+
+ assertTrue("Snapshot is expected to be empty since snapshot is not done", memTable.snapshot.isEmpty());
+ assertTrue("Take snapshot and returned checkpoint should not be empty", memTable.snapshot() != null);
+ assertFalse("After taking snapshot, snapshot should not be empty ", memTable.snapshot.isEmpty());
+
+ for (long entryId = numofEntries + 1; entryId <= newNumOfEntries; entryId++) {
+ for (long ledgerId = 1; ledgerId <= numOfLedgers; ledgerId++) {
+ random.nextBytes(data);
+ assertTrue(ledgerId + ":" + entryId + " is duplicate in mem-table!",
+ memTable.addEntry(ledgerId, entryId, ByteBuffer.wrap(data), this) != 0);
+ }
+ }
+
+ for (long ledgerId = 1; ledgerId <= numOfLedgers; ledgerId++) {
+ OfLong entriesItr = memTable.getListOfEntriesOfLedger((random.nextInt((int) ledgerId) + 1));
+ ArrayList<Long> listOfEntries = new ArrayList<Long>();
+ Consumer<Long> addMethod = listOfEntries::add;
+ entriesItr.forEachRemaining(addMethod);
+ assertEquals("Number of Entries should be the same", newNumOfEntries, listOfEntries.size());
+ for (int i = 0; i < newNumOfEntries; i++) {
+ assertEquals("listOfEntries should be sorted", Long.valueOf(i + 1), listOfEntries.get(i));
+ }
+ }
+ }
+
+ @Test
+ public void testGetListOfEntriesOfLedgerWhileAddingConcurrently() throws IOException, InterruptedException {
+ final int numofEntries = 100;
+ final int newNumOfEntries = 200;
+ final int concurrentAddOfEntries = 300;
+ long ledgerId = 5;
+ byte[] data = new byte[10];
+ for (long entryId = 1; entryId <= numofEntries; entryId++) {
+ random.nextBytes(data);
+ assertTrue(ledgerId + ":" + entryId + " is duplicate in mem-table!",
+ memTable.addEntry(ledgerId, entryId, ByteBuffer.wrap(data), this) != 0);
+ }
+
+ assertTrue("Snapshot is expected to be empty since snapshot is not done", memTable.snapshot.isEmpty());
+ assertTrue("Take snapshot and returned checkpoint should not be empty", memTable.snapshot() != null);
+ assertFalse("After taking snapshot, snapshot should not be empty ", memTable.snapshot.isEmpty());
+
+ for (long entryId = numofEntries + 1; entryId <= newNumOfEntries; entryId++) {
+ random.nextBytes(data);
+ assertTrue(ledgerId + ":" + entryId + " is duplicate in mem-table!",
+ memTable.addEntry(ledgerId, entryId, ByteBuffer.wrap(data), this) != 0);
+ }
+
+ AtomicBoolean successfullyAdded = new AtomicBoolean(true);
+
+ Thread threadToAdd = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ for (long entryId = newNumOfEntries + 1; entryId <= concurrentAddOfEntries; entryId++) {
+ random.nextBytes(data);
+ boolean thisEntryAddedSuccessfully = (memTable.addEntry(ledgerId, entryId,
+ ByteBuffer.wrap(data), TestEntryMemTable.this) != 0);
+ successfullyAdded.set(successfullyAdded.get() && thisEntryAddedSuccessfully);
+ Thread.sleep(10);
+ }
+ } catch (IOException e) {
+ log.error("Got Unexpected exception while adding entries");
+ successfullyAdded.set(false);
+ } catch (InterruptedException e) {
+ log.error("Got InterruptedException while waiting");
+ successfullyAdded.set(false);
+ }
+ }
+ });
+ threadToAdd.start();
+
+ Thread.sleep(200);
+ OfLong entriesItr = memTable.getListOfEntriesOfLedger(ledgerId);
+ ArrayList<Long> listOfEntries = new ArrayList<Long>();
+ while (entriesItr.hasNext()) {
+ listOfEntries.add(entriesItr.next());
+ Thread.sleep(5);
+ }
+ threadToAdd.join(5000);
+ assertTrue("Entries should be added successfully in the spawned thread", successfullyAdded.get());
+
+ for (int i = 0; i < newNumOfEntries; i++) {
+ assertEquals("listOfEntries should be sorted", Long.valueOf(i + 1), listOfEntries.get(i));
+ }
+ }
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index f57d388..22535f3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -29,6 +29,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.io.IOException;
+import java.util.PrimitiveIterator.OfLong;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -369,6 +370,11 @@ public class TestSyncThread {
@Override
public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
}
+
+ @Override
+ public OfLong getListOfEntriesOfLedger(long ledgerId) {
+ return null;
+ }
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
index 6e8cf32..2f419b5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Charsets.UTF_8;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -33,6 +34,9 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -43,6 +47,7 @@ import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.CreateMode;
@@ -399,4 +404,97 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
}
bk.close();
}
+
+ @Test
+ public void testGetListOfEntriesOfClosedLedger() throws Exception {
+ testGetListOfEntriesOfLedger(true);
+ }
+
+ @Test
+ public void testGetListOfEntriesOfNotClosedLedger() throws Exception {
+ testGetListOfEntriesOfLedger(false);
+ }
+
+ @Test
+ public void testGetListOfEntriesOfNonExistingLedger() throws Exception {
+ long nonExistingLedgerId = 56789L;
+ try (BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString())) {
+ for (int i = 0; i < bs.size(); i++) {
+ CompletableFuture<AvailabilityOfEntriesOfLedger> futureResult = bkAdmin
+ .asyncGetListOfEntriesOfLedger(bs.get(i).getLocalAddress(), nonExistingLedgerId);
+ try {
+ futureResult.get();
+ fail("asyncGetListOfEntriesOfLedger is supposed to be failed with NoSuchLedgerExistsException");
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof BKException);
+ BKException e = (BKException) ee.getCause();
+ assertEquals(e.getCode(), BKException.Code.NoSuchLedgerExistsException);
+ }
+ }
+ }
+ }
+
+ public void testGetListOfEntriesOfLedger(boolean isLedgerClosed) throws Exception {
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ int numOfEntries = 6;
+ BookKeeper bkc = new BookKeeper(conf);
+ LedgerHandle lh = bkc.createLedger(numOfBookies, numOfBookies, digestType, "testPasswd".getBytes());
+ long lId = lh.getId();
+ for (int i = 0; i < numOfEntries; i++) {
+ lh.addEntry("000".getBytes());
+ }
+ if (isLedgerClosed) {
+ lh.close();
+ }
+ try (BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString())) {
+ for (int i = 0; i < bs.size(); i++) {
+ CompletableFuture<AvailabilityOfEntriesOfLedger> futureResult = bkAdmin
+ .asyncGetListOfEntriesOfLedger(bs.get(i).getLocalAddress(), lId);
+ AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = futureResult.get();
+ assertEquals("Number of entries", numOfEntries,
+ availabilityOfEntriesOfLedger.getTotalNumOfAvailableEntries());
+ for (int j = 0; j < numOfEntries; j++) {
+ assertTrue("Entry should be available: " + j, availabilityOfEntriesOfLedger.isEntryAvailable(j));
+ }
+ assertFalse("Entry should not be available: " + numOfEntries,
+ availabilityOfEntriesOfLedger.isEntryAvailable(numOfEntries));
+ }
+ }
+ bkc.close();
+ }
+
+ @Test
+ public void testGetListOfEntriesOfLedgerWithJustOneBookieInWriteQuorum() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ int numOfEntries = 6;
+ BookKeeper bkc = new BookKeeper(conf);
+ /*
+ * in this testsuite there are going to be 2 (numOfBookies) and if
+ * writeQuorum is 1 then it will stripe entries to those two bookies.
+ */
+ LedgerHandle lh = bkc.createLedger(2, 1, digestType, "testPasswd".getBytes());
+ long lId = lh.getId();
+ for (int i = 0; i < numOfEntries; i++) {
+ lh.addEntry("000".getBytes());
+ }
+
+ try (BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString())) {
+ for (int i = 0; i < bs.size(); i++) {
+ CompletableFuture<AvailabilityOfEntriesOfLedger> futureResult = bkAdmin
+ .asyncGetListOfEntriesOfLedger(bs.get(i).getLocalAddress(), lId);
+ AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = futureResult.get();
+ /*
+ * since num of bookies in the ensemble is 2 and
+ * writeQuorum/ackQuorum is 1, it will stripe to these two
+ * bookies and hence in each bookie there will be only
+ * numOfEntries/2 entries.
+ */
+ assertEquals("Number of entries", numOfEntries / 2,
+ availabilityOfEntriesOfLedger.getTotalNumOfAvailableEntries());
+ }
+ }
+ bkc.close();
+ }
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 10f7048..30121b9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -40,6 +40,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.PrimitiveIterator.OfLong;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
@@ -685,5 +686,10 @@ public class GcLedgersTest extends LedgerManagerTestCase {
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
}
+
+ @Override
+ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
+ return null;
+ }
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index a673d8c..4acc3e7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -31,6 +31,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
+import java.util.PrimitiveIterator.OfLong;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.CheckpointSource;
@@ -288,5 +289,10 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
public ByteBuf getExplicitLac(long ledgerId) {
return null;
}
+
+ @Override
+ public OfLong getListOfEntriesOfLedger(long ledgerId) {
+ return null;
+ }
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
index 2c349a0..1cbddd3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
@@ -42,6 +42,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.FutureGetListOfEntriesOfLedger;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
@@ -49,6 +50,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
import org.apache.bookkeeper.util.ByteBufList;
import org.slf4j.Logger;
@@ -257,6 +259,17 @@ public class MockBookieClient implements BookieClient {
}
@Override
+ public CompletableFuture<AvailabilityOfEntriesOfLedger> getListOfEntriesOfLedger(BookieSocketAddress address,
+ long ledgerId) {
+ FutureGetListOfEntriesOfLedger futureResult = new FutureGetListOfEntriesOfLedger(ledgerId);
+ executor.executeOrdered(address, safeRun(() -> {
+ futureResult
+ .completeExceptionally(BKException.create(BKException.Code.IllegalOpException).fillInStackTrace());
+ }));
+ return futureResult;
+ }
+
+ @Override
public boolean isClosed() {
return false;
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedgerTest.java
new file mode 100644
index 0000000..4368f89
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedgerTest.java
@@ -0,0 +1,192 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.PrimitiveIterator;
+import java.util.Set;
+
+import org.junit.Test;
+
+/**
+ * Testsuite for AvailabilityOfEntriesOfLedger.
+ */
+public class AvailabilityOfEntriesOfLedgerTest {
+ @Test
+ public void testWithItrConstructor() {
+ long[][] arrays = {
+ { 0, 1, 2 },
+ { 1, 2},
+ { 1, 2, 3, 5, 6, 7, 8 },
+ { 0, 1, 5 },
+ { 3 },
+ { 1, 2, 4, 5, 7, 8 },
+ {},
+ {0},
+ { 1, 2, 3, 5, 6, 11, 12, 13, 14, 15, 16, 17, 100, 1000, 1001, 10000, 20000, 20001 }
+ };
+ for (int i = 0; i < arrays.length; i++) {
+ long[] tempArray = arrays[i];
+ PrimitiveIterator.OfLong primitiveIterator = Arrays.stream(tempArray).iterator();
+ AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+ primitiveIterator);
+ assertEquals("Expected total number of entries", tempArray.length,
+ availabilityOfEntriesOfLedger.getTotalNumOfAvailableEntries());
+ for (int j = 0; j < tempArray.length; j++) {
+ assertTrue(tempArray[j] + " is supposed to be available",
+ availabilityOfEntriesOfLedger.isEntryAvailable(tempArray[j]));
+ }
+ }
+ }
+
+ @Test
+ public void testWithItrConstructorWithDuplicates() {
+ long[][] arrays = {
+ { 1, 2, 2, 3 },
+ { 1, 2, 3, 5, 5, 6, 7, 8, 8 },
+ { 1, 1, 5, 5 },
+ { 3, 3 },
+ { 1, 1, 2, 4, 5, 8, 9, 9, 9, 9, 9 },
+ {},
+ { 1, 2, 3, 5, 6, 11, 12, 13, 14, 15, 16, 17, 17, 100, 1000, 1000, 1001, 10000, 10000, 20000, 20001 }
+ };
+ for (int i = 0; i < arrays.length; i++) {
+ long[] tempArray = arrays[i];
+ Set<Long> tempSet = new HashSet<Long>();
+ for (int k = 0; k < tempArray.length; k++) {
+ tempSet.add(tempArray[k]);
+ }
+ PrimitiveIterator.OfLong primitiveIterator = Arrays.stream(tempArray).iterator();
+ AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+ primitiveIterator);
+ assertEquals("Expected total number of entries", tempSet.size(),
+ availabilityOfEntriesOfLedger.getTotalNumOfAvailableEntries());
+ for (int j = 0; j < tempArray.length; j++) {
+ assertTrue(tempArray[j] + " is supposed to be available",
+ availabilityOfEntriesOfLedger.isEntryAvailable(tempArray[j]));
+ }
+ }
+ }
+
+ @Test
+ public void testSerializeDeserialize() {
+ long[][] arrays = {
+ { 0, 1, 2 },
+ { 1, 2 },
+ { 1, 2, 3, 5, 6, 7, 8 },
+ { 0, 1, 5 },
+ { 3 },
+ { 1, 2, 4, 5, 7, 8 },
+ { },
+ { 0 },
+ { 1, 2, 3, 5, 6, 11, 12, 13, 14, 15, 16, 17, 100, 1000, 1001, 10000, 20000, 20001 }
+ };
+ for (int i = 0; i < arrays.length; i++) {
+ long[] tempArray = arrays[i];
+ PrimitiveIterator.OfLong primitiveIterator = Arrays.stream(tempArray).iterator();
+ AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+ primitiveIterator);
+ byte[] serializedState = availabilityOfEntriesOfLedger.serializeStateOfEntriesOfLedger();
+ AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedgerUsingSer = new AvailabilityOfEntriesOfLedger(
+ serializedState);
+ assertEquals("Expected total number of entries", tempArray.length,
+ availabilityOfEntriesOfLedgerUsingSer.getTotalNumOfAvailableEntries());
+ for (int j = 0; j < tempArray.length; j++) {
+ assertTrue(tempArray[j] + " is supposed to be available",
+ availabilityOfEntriesOfLedgerUsingSer.isEntryAvailable(tempArray[j]));
+ }
+ }
+ }
+
+ @Test
+ public void testSerializeDeserializeWithItrConstructorWithDuplicates() {
+ long[][] arrays = {
+ { 1, 2, 2, 3 },
+ { 1, 2, 3, 5, 5, 6, 7, 8, 8 },
+ { 1, 1, 5, 5 },
+ { 3, 3 },
+ { 1, 1, 2, 4, 5, 8, 9, 9, 9, 9, 9 },
+ {},
+ { 1, 2, 3, 5, 6, 11, 12, 13, 14, 15, 16, 17, 17, 100, 1000, 1000, 1001, 10000, 10000, 20000, 20001 }
+ };
+ for (int i = 0; i < arrays.length; i++) {
+ long[] tempArray = arrays[i];
+ Set<Long> tempSet = new HashSet<Long>();
+ for (int k = 0; k < tempArray.length; k++) {
+ tempSet.add(tempArray[k]);
+ }
+ PrimitiveIterator.OfLong primitiveIterator = Arrays.stream(tempArray).iterator();
+ AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+ primitiveIterator);
+ byte[] serializedState = availabilityOfEntriesOfLedger.serializeStateOfEntriesOfLedger();
+ AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedgerUsingSer = new AvailabilityOfEntriesOfLedger(
+ serializedState);
+ assertEquals("Expected total number of entries", tempSet.size(),
+ availabilityOfEntriesOfLedgerUsingSer.getTotalNumOfAvailableEntries());
+ for (int j = 0; j < tempArray.length; j++) {
+ assertTrue(tempArray[j] + " is supposed to be available",
+ availabilityOfEntriesOfLedgerUsingSer.isEntryAvailable(tempArray[j]));
+ }
+ }
+ }
+
+ @Test
+ public void testNonExistingEntries() {
+ long[][] arrays = {
+ { 0, 1, 2 },
+ { 1, 2, 3, 5, 6, 7, 8 },
+ { 1, 5 },
+ { 3 },
+ { 1, 2, 4, 5, 7, 8 },
+ {},
+ { 1, 2, 3, 5, 6, 11, 12, 13, 14, 15, 16, 17, 100, 1000, 1001, 10000, 20000, 20001 }
+ };
+ /**
+ * corresponding non-existing entries for 'arrays'
+ */
+ long[][] nonExistingEntries = {
+ { 3 },
+ { 0, 4, 9, 100, 101 },
+ { 0, 2, 3, 6, 9 },
+ { 0, 1, 2, 4, 5, 6 },
+ { 0, 3, 9, 10, 11, 100, 1000 },
+ { 0, 1, 2, 3, 4, 5 },
+ { 4, 18, 1002, 19999, 20003 }
+ };
+ for (int i = 0; i < arrays.length; i++) {
+ long[] tempArray = arrays[i];
+ long[] nonExistingElementsTempArray = nonExistingEntries[i];
+ PrimitiveIterator.OfLong primitiveIterator = Arrays.stream(tempArray).iterator();
+ AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+ primitiveIterator);
+
+ for (int j = 0; j < nonExistingElementsTempArray.length; j++) {
+ assertFalse(nonExistingElementsTempArray[j] + " is not supposed to be available",
+ availabilityOfEntriesOfLedger.isEntryAvailable(nonExistingElementsTempArray[j]));
+ }
+ }
+ }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/IteratorUtilityTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/IteratorUtilityTest.java
new file mode 100644
index 0000000..55c1db3
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/IteratorUtilityTest.java
@@ -0,0 +1,141 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.PrimitiveIterator;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Testsuite for IteratorUtility methods.
+ */
+public class IteratorUtilityTest {
+
+ @Test
+ public void testWithPrimitiveItrMerge() {
+ long[][] arrays = {
+ { 0, 1, 2 },
+ { 0, 1 },
+ { 1, 2 },
+ { 1, 2, 3, 5, 6, 7, 8 },
+ { 1, 2, 3, 5, 6, 7, 8 },
+ { 0, 1, 5 },
+ { 3 },
+ { 1, 2, 4, 5, 7, 8 },
+ {},
+ {},
+ { 0 },
+ { 1, 2, 3, 5, 6, 11, 12, 13, 14, 15, 16, 17, 100, 1000, 1001, 10000, 20000, 20001 },
+ { 201, 202, 203, 205, 206, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 20100, 201000,
+ 201001, 2010000, 2020000, 2020001 }
+ };
+ for (int i = 0; i < arrays.length; i++) {
+ for (int j = i + 1; j < arrays.length; j++) {
+ long[] tempArray1 = arrays[i];
+ long[] tempArray2 = arrays[j];
+ HashSet<Long> unionSet = new HashSet<Long>();
+ for (int k = 0; k < tempArray1.length; k++) {
+ unionSet.add(tempArray1[k]);
+ }
+ for (int k = 0; k < tempArray2.length; k++) {
+ unionSet.add(tempArray2[k]);
+ }
+
+ PrimitiveIterator.OfLong primitiveIterator1 = Arrays.stream(tempArray1).iterator();
+ PrimitiveIterator.OfLong primitiveIterator2 = Arrays.stream(tempArray2).iterator();
+
+ PrimitiveIterator.OfLong mergedItr = IteratorUtility.mergePrimitiveLongIterator(primitiveIterator1,
+ primitiveIterator2);
+ ArrayList<Long> mergedArrayList = new ArrayList<Long>();
+ Consumer<Long> addMethod = mergedArrayList::add;
+ mergedItr.forEachRemaining(addMethod);
+ int mergedListSize = mergedArrayList.size();
+ Assert.assertEquals("Size of the mergedArrayList", unionSet.size(), mergedArrayList.size());
+ Assert.assertTrue("mergedArrayList should contain all elements in unionSet",
+ mergedArrayList.containsAll(unionSet));
+ Assert.assertTrue("Merged Iterator should be sorted", IntStream.range(0, mergedListSize - 1)
+ .allMatch(k -> mergedArrayList.get(k) <= mergedArrayList.get(k + 1)));
+ Assert.assertTrue("All elements of tempArray1 should be in mergedArrayList",
+ IntStream.range(0, tempArray1.length).allMatch(k -> mergedArrayList.contains(tempArray1[k])));
+ Assert.assertTrue("All elements of tempArray2 should be in mergedArrayList",
+ IntStream.range(0, tempArray2.length).allMatch(k -> mergedArrayList.contains(tempArray2[k])));
+ }
+ }
+ }
+
+ @Test
+ public void testWithItrMerge() {
+ long[][] arrays = {
+ { 0, 1, 2 },
+ { 0, 1 },
+ { 1, 2 },
+ { 1, 2, 3, 5, 6, 7, 8 },
+ { 1, 2, 3, 5, 6, 7, 8 },
+ { 0, 1, 5 },
+ { 3 },
+ { 1, 2, 4, 5, 7, 8 },
+ {},
+ {},
+ { 0 },
+ { 1, 2, 3, 5, 6, 11, 12, 13, 14, 15, 16, 17, 100, 1000, 1001, 10000, 20000, 20001 },
+ { 201, 202, 203, 205, 206, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 20100, 201000,
+ 201001, 2010000, 2020000, 2020001 }
+ };
+ for (int i = 0; i < arrays.length; i++) {
+ for (int j = i + 1; j < arrays.length; j++) {
+ long[] tempArray1 = arrays[i];
+ ArrayList<Long> tempArrayList1 = new ArrayList<Long>();
+ IntStream.range(0, tempArray1.length).forEach((k) -> tempArrayList1.add(tempArray1[k]));
+ long[] tempArray2 = arrays[j];
+ ArrayList<Long> tempArrayList2 = new ArrayList<Long>();
+ IntStream.range(0, tempArray2.length).forEach((k) -> tempArrayList2.add(tempArray2[k]));
+ HashSet<Long> unionSet = new HashSet<Long>();
+ unionSet.addAll(tempArrayList1);
+ unionSet.addAll(tempArrayList2);
+
+ Iterator<Long> itr1 = tempArrayList1.iterator();
+ Iterator<Long> itr2 = tempArrayList2.iterator();
+
+ Iterator<Long> mergedItr = IteratorUtility.mergeIteratorsForPrimitiveLongIterator(itr1, itr2,
+ Long::compare, (l) -> l);
+ ArrayList<Long> mergedArrayList = new ArrayList<Long>();
+ Consumer<Long> addMethod = mergedArrayList::add;
+ mergedItr.forEachRemaining(addMethod);
+ int mergedListSize = mergedArrayList.size();
+ Assert.assertEquals("Size of the mergedArrayList", unionSet.size(), mergedArrayList.size());
+ Assert.assertTrue("mergedArrayList should contain all elements in unionSet",
+ mergedArrayList.containsAll(unionSet));
+ Assert.assertTrue("Merged Iterator should be sorted", IntStream.range(0, mergedListSize - 1)
+ .allMatch(k -> mergedArrayList.get(k) <= mergedArrayList.get(k + 1)));
+ Assert.assertTrue("All elements of tempArray1 should be in mergedArrayList",
+ IntStream.range(0, tempArray1.length).allMatch(k -> mergedArrayList.contains(tempArray1[k])));
+ Assert.assertTrue("All elements of tempArray2 should be in mergedArrayList",
+ IntStream.range(0, tempArray2.length).allMatch(k -> mergedArrayList.contains(tempArray2[k])));
+ }
+ }
+ }
+}