You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/01/05 19:27:59 UTC
[bookkeeper] branch master updated: ISSUE #877: add
verifier.BookkeeperVerifier
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 904857b ISSUE #877: add verifier.BookkeeperVerifier
904857b is described below
commit 904857b9a67c7ae4afae38bfeeb3e1eda3daba5a
Author: Samuel Just <sj...@salesforce.com>
AuthorDate: Fri Jan 5 11:27:53 2018 -0800
ISSUE #877: add verifier.BookkeeperVerifier
Plays and verifies a non-determinstic read/write stream against a
bookkeeper cluster.
Master Issue: #877
Author: Samuel Just <sj...@salesforce.com>
Reviewers: Jia Zhai <None>, Sijie Guo <si...@apache.org>
This closes #890 from athanatos/forupstream/issue-877, closes #877
---
.../bookkeeper/verifier/BookkeeperVerifier.java | 694 +++++++++++++++++++++
.../verifier/BookkeeperVerifierMain.java | 178 ++++++
.../verifier/DirectBookkeeperDriver.java | 116 ++++
.../apache/bookkeeper/verifier/package-info.java | 20 +
.../verifier/BookkeeperVerifierTest.java | 60 ++
5 files changed, 1068 insertions(+)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/BookkeeperVerifier.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/BookkeeperVerifier.java
new file mode 100644
index 0000000..821d260
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/BookkeeperVerifier.java
@@ -0,0 +1,694 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.verifier;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.client.BKException;
+
+/**
+ * Encapsulates logic for playing and verifying operations against a bookkeeper-like
+ * interface. The test state consists of a set of ledgers in 1 of several states:
+ * 1) opening -- waiting for driver to complete open
+ * 2) open -- valid targets for reads and writes
+ * 3) live -- valid targets for reads
+ * 4) deleting
+ * Each ledger moves in sequence through these states. See startWrite for the
+ * code driving the lifecycle.
+ */
+public class BookkeeperVerifier {
+ private final Queue<Exception> errors = new LinkedList<>();
+
+ private synchronized boolean checkReturn(long ledgerID, int rc) {
+ if (BKException.Code.OK != rc) {
+ String error = String.format("Got error %d on ledger %d", rc, ledgerID);
+ System.out.println(error);
+ propagateExceptionToMain(BKException.create(rc));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private synchronized void propagateExceptionToMain(Exception e) {
+ errors.add(e);
+ this.notifyAll();
+ }
+
+ private synchronized void printThrowExceptions() throws Exception {
+ if (!errors.isEmpty()) {
+ for (Exception e: errors) {
+ System.out.format("Error found: %s%n", e.toString());
+ e.printStackTrace();
+ }
+ throw errors.poll();
+ }
+ }
+
+ /**
+ * Provides an interface for translating test operations into operations on a
+ * cluster.
+ */
+ public interface BookkeeperDriver {
+ void createLedger(
+ long ledgerID, int enSize, int writeQSize, int ackQSize,
+ Consumer<Integer> cb
+ );
+
+ void closeLedger(
+ long ledgerID,
+ Consumer<Integer> cb
+ );
+
+ void deleteLedger(
+ long ledgerID,
+ Consumer<Integer> cb
+ );
+
+ void writeEntry(
+ long ledgerID,
+ long entryID,
+ byte[] data,
+ Consumer<Integer> cb
+ );
+
+ /**
+ * Callback for reads.
+ */
+ interface ReadCallback {
+ void complete(
+ long ledgerID,
+ ArrayList<byte[]> results
+ );
+ }
+
+ void readEntries(
+ long ledgerID,
+ long firstEntryID,
+ long lastEntryID,
+ BiConsumer<Integer, ArrayList<byte[]>> cb);
+ }
+
+ private final BookkeeperDriver driver;
+
+ private final int ensembleSize;
+ private final int writeQuorum;
+ private final int ackQuorum;
+ private final int duration;
+ private final int drainTimeout;
+ private final int targetConcurrentLedgers;
+ private final int targetConcurrentWrites;
+ private final int targetWriteGroup;
+ private final int targetReadGroup;
+ private final int targetLedgers;
+ private final int targetEntrySize;
+ private final int targetConcurrentReads;
+ private final double coldToHotRatio;
+
+ private final long targetLedgerEntries;
+
+ BookkeeperVerifier(
+ BookkeeperDriver driver,
+ int ensembleSize,
+ int writeQuorum,
+ int ackQuorum,
+ int duration,
+ int drainTimeout,
+ int targetConcurrentLedgers,
+ int targetConcurrentWrites,
+ int targetWriteGroup,
+ int targetReadGroup,
+ int targetLedgers,
+ long targetLedgerSize,
+ int targetEntrySize,
+ int targetConcurrentReads,
+ double coldToHotRatio) {
+ this.driver = driver;
+ this.ensembleSize = ensembleSize;
+ this.writeQuorum = writeQuorum;
+ this.ackQuorum = ackQuorum;
+ this.duration = duration;
+ this.drainTimeout = drainTimeout;
+ this.targetConcurrentLedgers = targetConcurrentLedgers;
+ this.targetConcurrentWrites = targetConcurrentWrites;
+ this.targetWriteGroup = targetWriteGroup;
+ this.targetReadGroup = targetReadGroup;
+ this.targetLedgers = targetLedgers;
+ this.targetEntrySize = targetEntrySize;
+ this.targetConcurrentReads = targetConcurrentReads;
+ this.coldToHotRatio = coldToHotRatio;
+
+ this.targetLedgerEntries = targetLedgerSize / targetEntrySize;
+ }
+
+ private int outstandingWriteCount = 0;
+ private int outstandingReadCount = 0;
+ private long nextLedger = 0;
+ private long getNextLedgerID() {
+ return nextLedger++;
+ }
+
+ /**
+ * State required to regenerate an entry.
+ */
+ class EntryInfo {
+ private final long entryID;
+ private final long seed;
+ EntryInfo(long entryID, long seed) {
+ this.entryID = entryID;
+ this.seed = seed;
+ }
+ byte[] getBuffer() {
+ Random rand = new Random(seed);
+ byte[] ret = new byte[targetEntrySize];
+ rand.nextBytes(ret);
+ return ret;
+ }
+ long getEntryID() {
+ return entryID;
+ }
+ }
+
+ /**
+ * Contains the state required to reconstruct the contents of any entry in the ledger.
+ * The seed value passed into the constructor fully determines the contents of the
+ * ledger. Each EntryInfo has its own seed generated sequentially from a Random instance
+ * seeded from the original seed. It then uses that seed to generate a secondary Random
+ * instance for generating the bytes within the entry. See EntryIterator for details.
+ * Random(seed)
+ * |
+ * E0 -> Random(E0) -> getBuffer()
+ * |
+ * E1 -> Random(E1) -> getBuffer()
+ * |
+ * E2 -> Random(E2) -> getBuffer()
+ * |
+ * E3 -> Random(E3) -> getBuffer()
+ * |
+ * E4 -> Random(E4) -> getBuffer()
+ * |
+ * ...
+ */
+ class LedgerInfo {
+ private final long ledgerID;
+ private final long seed;
+ private long lastEntryIDCompleted = -1;
+ private long confirmedLAC = -1;
+ private boolean closed = false;
+
+ final TreeSet<Long> writesInProgress = new TreeSet<>();
+ final TreeSet<Long> writesCompleted = new TreeSet<>();
+ int readsInProgress = 0;
+ Consumer<Consumer<Integer>> onLastOp = null;
+ Consumer<Consumer<Integer>> onLastWrite = null;
+
+ EntryIterator iter;
+
+ LedgerInfo(long ledgerID, long seed) {
+ this.ledgerID = ledgerID;
+ this.seed = seed;
+ iter = new EntryIterator();
+ }
+
+ long getLastEntryIDCompleted() {
+ return lastEntryIDCompleted;
+ }
+
+ long getConfirmedLAC() {
+ return confirmedLAC;
+ }
+
+ ArrayList<EntryInfo> getNextEntries(int num) {
+ ArrayList<EntryInfo> ret = new ArrayList<>();
+ for (int i = 0; i < num && iter.hasNext(); ++i) {
+ ret.add(iter.next());
+ }
+ return ret;
+ }
+
+ class EntryIterator implements Iterator<EntryInfo> {
+ Random rand;
+ long currentID;
+ long currentSeed;
+
+ EntryIterator() {
+ seek(-1);
+ }
+
+ void seek(long entryID) {
+ currentID = -1;
+ currentSeed = seed;
+ rand = new Random(seed);
+ while (currentID < entryID) {
+ advance();
+ }
+ }
+
+ void advance() {
+ currentSeed = rand.nextLong();
+ currentID++;
+ }
+
+ EntryInfo get() {
+ return new EntryInfo(currentID, currentSeed);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currentID < targetLedgerEntries;
+ }
+
+
+ @Override
+ public EntryInfo next() {
+ advance();
+ return get();
+ }
+ }
+
+ EntryIterator getIterator() {
+ return new EntryIterator();
+ }
+
+ void openWrite(long entryID) {
+ writesInProgress.add(entryID);
+ System.out.format("Open writes, %s%n", writesInProgress.toString());
+ }
+
+ void incReads() {
+ readsInProgress++;
+ System.out.format("Inc reads to %d%n", readsInProgress);
+ }
+
+ /**
+ * The idea here is that we may need to register an operation which needs to run
+ * whenever the final op completes on this Ledger (like deletion). If there
+ * are none, newOnLastOp should be called synchronously with cb. Otherwise,
+ * cb should be called synchronously with cb and newOnLastOp should be called
+ * with the cb passed in with the decReads or closeWrite.
+ * In the deletion case, cb would be the callback for the error from
+ * the deletion operation (if it happens). The reason for all of this is that
+ * the delete case will need to chain an async call to delete into the async callback
+ * chain for whatever the last operation to complete on this Ledger. newOnLastOp
+ * would invoke that delete. The cb passed in allows it to pick up and continue
+ * the original chain.
+ * @param cb Callback to get result of newOnLastOp if called now
+ * @param newOnLastOp Callback to be invoked on the last decReads or closeWrite,
+ * should be passed the cb passed in with the final closeWrite
+ * or decReads
+ */
+ void onLastOpComplete(
+ Consumer<Integer> cb,
+ Consumer<Consumer<Integer>> newOnLastOp) {
+ checkState(onLastOp == null);
+ onLastOp = newOnLastOp;
+ checkOpComplete(cb);
+ }
+
+ /**
+ * Very similar to onLastOpComplete, but gets called on the final call to closeWrite.
+ * @param cb Callback to get result of newOnLastWrite if called now
+ * @param newOnLastWrite Callback to be invoked on the last closeWrite,
+ * should be passed the cb passed in with the final closeWrite.
+ */
+ void onLastWriteComplete(
+ Consumer<Integer> cb,
+ Consumer<Consumer<Integer>> newOnLastWrite) {
+ assert (onLastWrite == null);
+ onLastWrite = newOnLastWrite;
+ checkWriteComplete(cb);
+ }
+
+ void closeWrite(long entryID, Consumer<Integer> cb) {
+ writesInProgress.remove(entryID);
+ writesCompleted.add(entryID);
+ long completedTo = writesInProgress.isEmpty() ? Long.MAX_VALUE : writesInProgress.first();
+ while (!writesCompleted.isEmpty() && writesCompleted.first() < completedTo) {
+ lastEntryIDCompleted = writesCompleted.first();
+ writesCompleted.remove(writesCompleted.first());
+ }
+ checkWriteComplete((rc) -> {
+ checkReturn(ledgerID, rc);
+ checkOpComplete(cb);
+ });
+ }
+
+ void updateLAC(long lac) {
+ if (lac > confirmedLAC) {
+ confirmedLAC = lac;
+ }
+ }
+
+ void decReads(Consumer<Integer> cb) {
+ --readsInProgress;
+ checkOpComplete(cb);
+ }
+
+ private void checkWriteComplete(Consumer<Integer> cb) {
+ if (writesInProgress.isEmpty() && onLastWrite != null) {
+ System.out.format("checkWriteComplete: done%n");
+ onLastWrite.accept(cb);
+ onLastWrite = null;
+ } else {
+ System.out.format(
+ "checkWriteComplete: ledger %d, writesInProgress %s%n",
+ ledgerID,
+ writesInProgress.toString());
+ cb.accept(0);
+ }
+ }
+
+ private void checkOpComplete(Consumer<Integer> cb) {
+ if (readsInProgress == 0 && writesInProgress.isEmpty() && onLastOp != null) {
+ System.out.format("checkOpComplete: done%n");
+ onLastOp.accept(cb);
+ onLastOp = null;
+ } else {
+ System.out.format(
+ "checkOpComplete: ledger %d, writesInProgress %s, readsInProgress %d%n",
+ ledgerID,
+ writesInProgress.toString(), readsInProgress);
+ cb.accept(0);
+ }
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+ public void setClosed() {
+ closed = true;
+ confirmedLAC = lastEntryIDCompleted;
+ }
+ }
+
+ private final Set<LedgerInfo> openingLedgers = new HashSet<>();
+ private final Set<LedgerInfo> openLedgers = new HashSet<>();
+ private final Set<LedgerInfo> liveLedgers = new HashSet<>();
+ private final Random opRand = new Random();
+
+ private LedgerInfo getRandomLedger(Collection<LedgerInfo> ledgerCollection) {
+ int elem = opRand.nextInt(ledgerCollection.size());
+ Iterator<LedgerInfo> iter = ledgerCollection.iterator();
+ for (int i = 0; i < elem; ++i) {
+ iter.next();
+ }
+ return iter.next();
+ }
+
+ private synchronized boolean startRead() {
+ if (outstandingReadCount > targetConcurrentReads) {
+ System.out.format("Not starting another read, enough in progress%n");
+ /* Caller should exit and wait for outstandingReadCount to fall */
+ return false;
+ }
+ LedgerInfo ledger;
+ if (!openLedgers.isEmpty() && (opRand.nextDouble() > coldToHotRatio)) {
+ ledger = getRandomLedger(openLedgers);
+ System.out.format("Reading from open ledger %d%n", ledger.ledgerID);
+ } else if (!liveLedgers.isEmpty()) {
+ ledger = getRandomLedger(liveLedgers);
+ System.out.format("Reading from cold ledger %d%n", ledger.ledgerID);
+ } else {
+ /* No readable ledgers, either startWrite can make progress, or there are already ledgers
+ * opening.
+ */
+ return false;
+ }
+ long lastEntryCompleted = ledger.getConfirmedLAC();
+ if (lastEntryCompleted <= 0) {
+ System.out.format("No readable entries in ledger %d, let's wait%n", ledger.ledgerID);
+ /* Either startWrite can make progress or there are already a bunch in progress */
+ return false;
+ }
+ long start = Math.abs(opRand.nextLong() % lastEntryCompleted);
+ long end = start + targetReadGroup > lastEntryCompleted ? lastEntryCompleted : start + targetReadGroup;
+ System.out.format("Reading %d -> %d from ledger %d%n", start, end, ledger.ledgerID);
+ LedgerInfo finalLedger = ledger;
+ ledger.incReads();
+ driver.readEntries(ledger.ledgerID, start, end, (rc, results) -> {
+ synchronized (BookkeeperVerifier.this) {
+ if (checkReturn(ledger.ledgerID, rc)) {
+ return;
+ }
+ System.out.format("Read %d -> %d from ledger %d complete%n", start, end, ledger.ledgerID);
+ long current = start;
+ LedgerInfo.EntryIterator iterator = finalLedger.getIterator();
+ iterator.seek(current - 1);
+ for (byte[] result : results) {
+ byte[] check = iterator.next().getBuffer();
+ if (result.length != check.length) {
+ propagateExceptionToMain(new Exception(String.format(
+ "Mismatched entry length on entry %d for ledger %d, read returned %d, should be %d",
+ current, ledger.ledgerID, result.length, check.length)
+ ));
+ }
+ /* Verify contents */
+ if (!Arrays.equals(check, result)) {
+ int i = 0;
+ for (; i < check.length; ++i) {
+ if (check[i] != result[i]) {
+ break;
+ }
+ }
+ propagateExceptionToMain(new Exception(String.format(
+ "Mismatched entry contents on entry %d for ledger %d at offset %d, length %d",
+ current, ledger.ledgerID, i, check.length)
+ ));
+ }
+ current++;
+ }
+ finalLedger.decReads((rc2) -> {
+ synchronized (BookkeeperVerifier.this) {
+ checkReturn(ledger.ledgerID, rc2);
+ System.out.format("Read %d -> %d from ledger %d releasing read%n", start, end, ledger.ledgerID);
+ outstandingReadCount--;
+ BookkeeperVerifier.this.notifyAll();
+ }
+ });
+ }
+ });
+ ++outstandingReadCount;
+ return true;
+ }
+
+ class WriteCallback implements Consumer<Integer> {
+ private int completed = 0;
+ private final int toWaitFor;
+ private final LedgerInfo ledger;
+ private final long lastEntry;
+ private final long pendingLAC;
+ WriteCallback(LedgerInfo ledger, long lastEntry, long pendingLAC, int toWaitFor) {
+ this.toWaitFor = toWaitFor;
+ this.ledger = ledger;
+ this.lastEntry = lastEntry;
+ this.pendingLAC = pendingLAC;
+ }
+
+ @Override
+ public void accept(Integer rc) {
+ synchronized (BookkeeperVerifier.this) {
+ if (checkReturn(ledger.ledgerID, rc)) {
+ return;
+ }
+ ++completed;
+ if (toWaitFor == completed) {
+ System.out.format("Writes ending at %d complete on ledger %d%n", lastEntry, ledger.ledgerID);
+ ledger.closeWrite(lastEntry, (rc2) -> {
+ synchronized (BookkeeperVerifier.this) {
+ checkReturn(ledger.ledgerID, rc2);
+ System.out.format("Writes ending at %d complete on ledger %d releasing write%n",
+ lastEntry, ledger.ledgerID);
+ --outstandingWriteCount;
+ BookkeeperVerifier.this.notifyAll();
+ }
+ });
+ ledger.updateLAC(pendingLAC);
+ }
+ }
+ }
+ }
+
+ /**
+ * Attempt to start one more write, return false if too many are in progress.
+ * @return false if unable to start more
+ */
+ private synchronized boolean startWrite() {
+ if (outstandingWriteCount > targetConcurrentWrites) {
+ System.out.format("Write paused, too many outstanding writes%n");
+ /* Caller should release lock and wait for outstandingWriteCount to fall */
+ return false;
+ }
+ if (openLedgers.size() + openingLedgers.size() < targetConcurrentLedgers) {
+ /* Not enough open ledgers, open a new one -- counts as a write */
+ long newID = getNextLedgerID();
+ System.out.format("Creating new ledger %d%n", newID);
+ LedgerInfo ledger = new LedgerInfo(newID, opRand.nextLong());
+ openingLedgers.add(ledger);
+ driver.createLedger(newID, ensembleSize, writeQuorum, ackQuorum, (rc) -> {
+ synchronized (BookkeeperVerifier.this) {
+ checkReturn(newID, rc);
+ System.out.format("Created new ledger %d%n", newID);
+ openingLedgers.remove(ledger);
+ openLedgers.add(ledger);
+ --outstandingWriteCount;
+ BookkeeperVerifier.this.notifyAll();
+ }
+ });
+ ++outstandingWriteCount;
+ return true;
+ } else if (openLedgers.isEmpty()) {
+ System.out.format("Not starting a write, no open ledgers, already opening the limit%n");
+ /* Caller should release lock and wait for openLedgers to be populated */
+ return false;
+ } else {
+ LedgerInfo ledger = getRandomLedger(openLedgers);
+ ArrayList<EntryInfo> toWrite = ledger.getNextEntries(targetWriteGroup);
+ long lastEntry = toWrite.get(toWrite.size() - 1).getEntryID();
+ System.out.format(
+ "Writing entries %d -> %d to ledger %d%n",
+ toWrite.get(0).getEntryID(),
+ lastEntry,
+ ledger.ledgerID);
+ ledger.openWrite(lastEntry);
+
+ WriteCallback writeCB = new WriteCallback(
+ ledger, lastEntry, ledger.getLastEntryIDCompleted(), toWrite.size());
+ for (EntryInfo entry: toWrite) {
+ driver.writeEntry(ledger.ledgerID, entry.getEntryID(), entry.getBuffer(), writeCB);
+ }
+ ++outstandingWriteCount;
+
+ if (lastEntry >= targetLedgerEntries) {
+ /* Remove this ledger from the writable list, mark for closing once all open writes complete */
+ System.out.format("Marking ledger %d for close%n", ledger.ledgerID);
+ openLedgers.remove(ledger);
+ liveLedgers.add(ledger);
+ ledger.onLastWriteComplete((rc) -> checkReturn(ledger.ledgerID, rc), (Consumer<Integer> cb) -> {
+ System.out.format("Closing ledger %d%n", ledger.ledgerID);
+ driver.closeLedger(ledger.ledgerID, (Integer rc) -> {
+ synchronized (BookkeeperVerifier.this) {
+ ledger.setClosed();
+ System.out.format("Closed ledger %d%n", ledger.ledgerID);
+
+ if (liveLedgers.size() >= targetLedgers) {
+ /* We've closed the ledger, but now we have too many closed but readable ledgers,
+ * start deleting one. */
+ LedgerInfo toDelete = getRandomLedger(liveLedgers);
+ final long ledgerID = toDelete.ledgerID;
+ System.out.format("Marking ledger %d for deletion%n", ledgerID);
+ liveLedgers.remove(toDelete);
+ toDelete.onLastOpComplete(cb, (Consumer<Integer> cb2) -> {
+ System.out.format("Deleting ledger %d%n", ledgerID);
+ driver.deleteLedger(ledgerID, (rc2) -> {
+ synchronized (BookkeeperVerifier.this) {
+ System.out.format("Deleted ledger %d%n", ledgerID);
+ cb2.accept(rc2);
+ }
+ });
+ });
+ } else {
+ cb.accept(rc);
+ }
+ }
+ });
+ });
+ }
+
+ Collections.shuffle(toWrite);
+ return true;
+ }
+ }
+
+ /**
+ * This is the method used to invoke the main loop of the IO driver. run() will loop
+ * starting IO requests until the time runs out on the test and all outstanding requests
+ * complete. Test execution state is accessed only under the instance lock for 'this'.
+ * There is no fine grained locking, hence run() simply needs to be synchronized and
+ * can wait for IOs to complete atomically with startWrite and startRead returning
+ * false (see those comments).
+ *
+ * @throws Exception
+ */
+ public synchronized void run() throws Exception {
+ long start = System.currentTimeMillis();
+ long testEnd = start + (duration * 1000);
+ long testDrainEnd = testEnd + (drainTimeout * 1000);
+
+ /* Keep IO running until testEnd */
+ while (System.currentTimeMillis() < testEnd) {
+
+ /* see startRead and startWrite, they return false once no more IO can be started */
+ while (startRead() || startWrite()) {}
+ long toWait = testEnd - System.currentTimeMillis();
+
+ /* atomically wait for either IO to complete or the test to end */
+ this.wait(toWait < 0 ? 0 : toWait);
+ printThrowExceptions();
+ }
+
+ /* Wait for all in progress ops to complete, outstanding*Count is updated under the lock */
+ while ((System.currentTimeMillis() < testDrainEnd)
+ && (outstandingReadCount > 0 || outstandingWriteCount > 0)) {
+ System.out.format("reads: %d, writes: %d%n", outstandingReadCount, outstandingWriteCount);
+ System.out.format("openingLedgers:%n");
+ for (LedgerInfo li: openingLedgers) {
+ System.out.format(
+ "Ledger %d has reads: %d, writes: %d%n",
+ li.ledgerID,
+ li.readsInProgress,
+ li.writesInProgress.size());
+ }
+ System.out.format("openLedgers:%n");
+ for (LedgerInfo li: openLedgers) {
+ System.out.format(
+ "Ledger %d has reads: %d, writes: %d%n",
+ li.ledgerID,
+ li.readsInProgress,
+ li.writesInProgress.size());
+ }
+ System.out.format("liveLedgers:%n");
+ for (LedgerInfo li: liveLedgers) {
+ System.out.format(
+ "Ledger %d has reads: %d, writes: %d%n",
+ li.ledgerID,
+ li.readsInProgress,
+ li.writesInProgress.size());
+ }
+ long toWait = testDrainEnd - System.currentTimeMillis();
+ this.wait(toWait < 0 ? 0 : toWait);
+ printThrowExceptions();
+ }
+ if (outstandingReadCount > 0 || outstandingWriteCount > 0) {
+ throw new Exception("Failed to drain ops before timeout%n");
+ }
+ }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/BookkeeperVerifierMain.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/BookkeeperVerifierMain.java
new file mode 100644
index 0000000..ede3504
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/BookkeeperVerifierMain.java
@@ -0,0 +1,178 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.verifier;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+
+/**
+ * Performs a configurable IO stream against a bookkeeper client while
+ * validating results.
+ */
+public class BookkeeperVerifierMain {
+
+ private static void printHelpAndExit(Options options, String header, int code) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(
+ "BookkeeperVerifierMain",
+ header,
+ options, "", true);
+ System.exit(code);
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ Options options = new Options();
+ options.addOption(
+ "ledger_path", true, "Hostname or IP of bookie to benchmark");
+ options.addOption(
+ "zookeeper",
+ true,
+ "Zookeeper ensemble, (default \"localhost:2181\")");
+ options.addOption(
+ "ensemble_size", true, "Bookkeeper client ensemble size");
+ options.addOption(
+ "write_quorum", true, "Bookkeeper client write quorum size");
+ options.addOption("ack_quorum", true, "Bookkeeper client ack quorum size");
+ options.addOption("duration", true, "Run duration in seconds");
+ options.addOption("drainTimeout", true, "Seconds to wait for in progress ops to end");
+ options.addOption(
+ "target_concurrent_ledgers",
+ true,
+ "target number ledgers to write to concurrently");
+ options.addOption(
+ "target_concurrent_writes",
+ true,
+ "target number of concurrent writes per ledger");
+ options.addOption(
+ "target_write_group",
+ true,
+ "target number of entries to write at a time");
+ options.addOption(
+ "target_read_group",
+ true,
+ "target number of entries to read at a time");
+ options.addOption("target_ledgers", true, "Target number of ledgers");
+ options.addOption("target_ledger_size", true, "Target size per ledger");
+ options.addOption("target_entry_size", true, "Target size per entry");
+ options.addOption(
+ "target_concurrent_reads", true, "Number of reads to maintain");
+ options.addOption(
+ "cold_to_hot_ratio", true, "Ratio of reads on open ledgers");
+ options.addOption("help", false, "Print this help message");
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = null;
+ try {
+ cmd = parser.parse(options, args);
+ } catch (ParseException e) {
+ printHelpAndExit(options, "Unable to parse command line", 1);
+ }
+
+ if (cmd.hasOption("help")) {
+ printHelpAndExit(options, "Help:", 0);
+ }
+
+ String ledgerPath = cmd.getOptionValue("ledger_path", "/ledgers");
+ String zkString = cmd.getOptionValue("zookeeper", "localhost:2181");
+ int ensembleSize = 0;
+ int writeQuorum = 0;
+ int ackQuorum = 0;
+ int duration = 0;
+ int drainTimeout = 0;
+ int targetConcurrentLedgers = 0;
+ int targetConcurrentWrites = 0;
+ int targetWriteGroup = 0;
+ int targetReadGroup = 0;
+ int targetLedgers = 0;
+ long targetLedgerSize = 0;
+ int targetEntrySize = 0;
+ int targetConcurrentReads = 0;
+ double coldToHotRatio = 0;
+
+ try {
+ ensembleSize = Integer.parseInt(cmd.getOptionValue("ensemble_size", "3"));
+ writeQuorum = Integer.parseInt(cmd.getOptionValue("write_quorum", "3"));
+ ackQuorum = Integer.parseInt(cmd.getOptionValue("ack_quorum", "2"));
+ duration = Integer.parseInt(cmd.getOptionValue("duration", "600"));
+ drainTimeout = Integer.parseInt(cmd.getOptionValue("drain_timeout", "10"));
+ targetConcurrentLedgers =
+ Integer.parseInt(cmd.getOptionValue("target_concurrent_ledgers", "4"));
+ targetConcurrentWrites =
+ Integer.parseInt(cmd.getOptionValue("target_concurrent_writes", "12"));
+ targetWriteGroup =
+ Integer.parseInt(cmd.getOptionValue("target_write_group", "4"));
+ targetReadGroup =
+ Integer.parseInt(cmd.getOptionValue("target_read_group", "4"));
+ targetLedgers = Integer.parseInt(cmd.getOptionValue("target_ledgers", "32"));
+ targetLedgerSize = Long.parseLong(cmd.getOptionValue(
+ "target_ledger_size",
+ "33554432"));
+ targetEntrySize = Integer.parseInt(cmd.getOptionValue(
+ "target_entry_size",
+ "16384"));
+ targetConcurrentReads = Integer.parseInt(cmd.getOptionValue(
+ "target_concurrent_reads",
+ "16"));
+ coldToHotRatio = Double.parseDouble(
+ cmd.getOptionValue("cold_to_hot_ratio", "0.5"));
+ } catch (NumberFormatException e) {
+ printHelpAndExit(options, "Invalid argument", 0);
+ }
+
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setZkServers(zkString);
+ conf.setZkLedgersRootPath(ledgerPath);
+ BookKeeper bkclient = new BookKeeper(conf);
+
+ BookkeeperVerifier verifier = new BookkeeperVerifier(
+ new DirectBookkeeperDriver(bkclient),
+ ensembleSize,
+ writeQuorum,
+ ackQuorum,
+ duration,
+ drainTimeout,
+ targetConcurrentLedgers,
+ targetConcurrentWrites,
+ targetWriteGroup,
+ targetReadGroup,
+ targetLedgers,
+ targetLedgerSize,
+ targetEntrySize,
+ targetConcurrentReads,
+ coldToHotRatio);
+ try {
+ verifier.run();
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.exit(1);
+ } finally {
+ bkclient.close();
+ }
+ }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/DirectBookkeeperDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/DirectBookkeeperDriver.java
new file mode 100644
index 0000000..9611082
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/DirectBookkeeperDriver.java
@@ -0,0 +1,116 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.verifier;
+
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+
+/**
+ * Driver for a normal Bookkeeper cluster.
+ */
+class DirectBookkeeperDriver implements BookkeeperVerifier.BookkeeperDriver {
+ private final ConcurrentHashMap<Long, LedgerHandle> openHandles = new ConcurrentHashMap<>();
+ private BookKeeper client;
+
+ DirectBookkeeperDriver(BookKeeper client) {
+ this.client = client;
+ }
+
+ @Override
+ public void createLedger(long ledgerID, int enSize, int writeQSize, int ackQSize, Consumer<Integer> cb) {
+ client.asyncCreateLedgerAdv(
+ ledgerID,
+ enSize,
+ writeQSize,
+ ackQSize,
+ BookKeeper.DigestType.CRC32,
+ new byte[0],
+ (rc, lh, ctx) -> {
+ openHandles.put(ledgerID, lh);
+ cb.accept(rc);
+ },
+ null,
+ null);
+ }
+
+ @Override
+ public void closeLedger(long ledgerID, Consumer<Integer> cb) {
+ LedgerHandle handle = openHandles.remove(ledgerID);
+ handle.asyncClose(
+ (rc, lh, ctx) -> cb.accept(rc),
+ null);
+ }
+
+ @Override
+ public void deleteLedger(long ledgerID, Consumer<Integer> cb) {
+ client.asyncDeleteLedger(ledgerID, (rc, ctx) -> {
+ cb.accept(rc);
+ }, null);
+ }
+
+ @Override
+ public void writeEntry(long ledgerID, long entryID, byte[] data, Consumer<Integer> cb) {
+ LedgerHandle handle;
+ handle = openHandles.get(ledgerID);
+ if (handle == null) {
+ cb.accept(BKException.Code.WriteException);
+ return;
+ }
+ handle.asyncAddEntry(entryID, data, (rc, lh, entryId, ctx) -> {
+ cb.accept(rc);
+ }, null);
+ }
+
+ @Override
+ public void readEntries(
+ long ledgerID, long firstEntryID, long lastEntryID, BiConsumer<Integer, ArrayList<byte[]>> cb) {
+ client.asyncOpenLedgerNoRecovery(ledgerID, BookKeeper.DigestType.CRC32, new byte[0], (rc, lh, ctx) -> {
+ if (rc != 0) {
+ cb.accept(rc, null);
+ return;
+ }
+ System.out.format("Got handle for read %d -> %d on ledger %d%n", firstEntryID, lastEntryID, ledgerID);
+ lh.asyncReadEntries(firstEntryID, lastEntryID, (rc1, lh1, seq, ctx1) -> {
+ System.out.format("Read cb %d -> %d on ledger %d%n", firstEntryID, lastEntryID, ledgerID);
+ ArrayList<byte[]> results = new ArrayList<>();
+ if (rc1 == 0) {
+ while (seq.hasMoreElements()) {
+ results.add(seq.nextElement().getEntry());
+ }
+ System.out.format("About to close handle for read %d -> %d on ledger %d%n",
+ firstEntryID, lastEntryID, ledgerID);
+ }
+ lh.asyncClose((rc2, lh2, ctx2) -> {
+ System.out.format("Closed handle for read %d -> %d on ledger %d result %d%n",
+ firstEntryID, lastEntryID, ledgerID, rc2);
+ cb.accept(rc1 == 0 ? rc2 : rc1, results);
+ }, null);
+ }, null);
+ }, null);
+ }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/package-info.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/package-info.java
new file mode 100644
index 0000000..c76d72d
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+/**
+ * Simple self-verifying workload generator.
+ */
+package org.apache.bookkeeper.verifier;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/verifier/BookkeeperVerifierTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/verifier/BookkeeperVerifierTest.java
new file mode 100644
index 0000000..d282bfd
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/verifier/BookkeeperVerifierTest.java
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.verifier;
+
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+/**
+ * Test for BookKeeperVerifier.
+ */
+public class BookkeeperVerifierTest extends BookKeeperClusterTestCase {
+ public BookkeeperVerifierTest() {
+ super(3);
+ }
+
+ /**
+ * Simple test to verify that the verifier works against a local cluster.
+ */
+ @Test(timeout = 30000)
+ public void testBasic() throws Exception {
+ DirectBookkeeperDriver driver = new DirectBookkeeperDriver(bkc);
+ BookkeeperVerifier verifier = new BookkeeperVerifier(
+ driver,
+ 3,
+ 3,
+ 2,
+ 10,
+ 5,
+ 16,
+ 4,
+ 2,
+ 2,
+ 32,
+ 16 << 10,
+ 4 << 10,
+ 4,
+ 0.5
+ );
+ verifier.run();
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].