You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/01/05 19:28:06 UTC

[GitHub] sijie closed pull request #890: ISSUE #877: add verifier.BookkeeperVerifier

sijie closed pull request #890: ISSUE #877: add verifier.BookkeeperVerifier
URL: https://github.com/apache/bookkeeper/pull/890
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/BookkeeperVerifier.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/BookkeeperVerifier.java
new file mode 100644
index 000000000..821d260c7
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/BookkeeperVerifier.java
@@ -0,0 +1,694 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.verifier;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.client.BKException;
+
+/**
+ * Encapsulates logic for playing and verifying operations against a bookkeeper-like
+ * interface. The test state consists of a set of ledgers in 1 of several states:
+ * 1) opening -- waiting for driver to complete open
+ * 2) open -- valid targets for reads and writes
+ * 3) live -- valid targets for reads
+ * 4) deleting
+ * Each ledger moves in sequence through these states.  See startWrite for the
+ * code driving the lifecycle.
+ */
+public class BookkeeperVerifier {
+    private final Queue<Exception> errors = new LinkedList<>();
+
+    private synchronized boolean checkReturn(long ledgerID, int rc) {
+        if (BKException.Code.OK != rc) {
+            String error = String.format("Got error %d on ledger %d", rc, ledgerID);
+            System.out.println(error);
+            propagateExceptionToMain(BKException.create(rc));
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private synchronized void propagateExceptionToMain(Exception e) {
+        errors.add(e);
+        this.notifyAll();
+    }
+
+    private synchronized void printThrowExceptions() throws Exception {
+        if (!errors.isEmpty()) {
+            for (Exception e: errors) {
+                System.out.format("Error found: %s%n", e.toString());
+                e.printStackTrace();
+            }
+            throw errors.poll();
+        }
+    }
+
+    /**
+     * Provides an interface for translating test operations into operations on a
+     * cluster.
+     */
+    public interface BookkeeperDriver {
+        void createLedger(
+                long ledgerID, int enSize, int writeQSize, int ackQSize,
+                Consumer<Integer> cb
+        );
+
+        void closeLedger(
+                long ledgerID,
+                Consumer<Integer> cb
+        );
+
+        void deleteLedger(
+                long ledgerID,
+                Consumer<Integer> cb
+        );
+
+        void writeEntry(
+                long ledgerID,
+                long entryID,
+                byte[] data,
+                Consumer<Integer> cb
+        );
+
+        /**
+         * Callback for reads.
+         */
+        interface ReadCallback {
+            void complete(
+                    long ledgerID,
+                    ArrayList<byte[]> results
+            );
+        }
+
+        void readEntries(
+                long ledgerID,
+                long firstEntryID,
+                long lastEntryID,
+                BiConsumer<Integer, ArrayList<byte[]>> cb);
+    }
+
+    private final BookkeeperDriver driver;
+
+    private final int ensembleSize;
+    private final int writeQuorum;
+    private final int ackQuorum;
+    private final int duration;
+    private final int drainTimeout;
+    private final int targetConcurrentLedgers;
+    private final int targetConcurrentWrites;
+    private final int targetWriteGroup;
+    private final int targetReadGroup;
+    private final int targetLedgers;
+    private final int targetEntrySize;
+    private final int targetConcurrentReads;
+    private final double coldToHotRatio;
+
+    private final long targetLedgerEntries;
+
+    BookkeeperVerifier(
+            BookkeeperDriver driver,
+            int ensembleSize,
+            int writeQuorum,
+            int ackQuorum,
+            int duration,
+            int drainTimeout,
+            int targetConcurrentLedgers,
+            int targetConcurrentWrites,
+            int targetWriteGroup,
+            int targetReadGroup,
+            int targetLedgers,
+            long targetLedgerSize,
+            int targetEntrySize,
+            int targetConcurrentReads,
+            double coldToHotRatio) {
+        this.driver = driver;
+        this.ensembleSize = ensembleSize;
+        this.writeQuorum = writeQuorum;
+        this.ackQuorum = ackQuorum;
+        this.duration = duration;
+        this.drainTimeout = drainTimeout;
+        this.targetConcurrentLedgers = targetConcurrentLedgers;
+        this.targetConcurrentWrites = targetConcurrentWrites;
+        this.targetWriteGroup = targetWriteGroup;
+        this.targetReadGroup = targetReadGroup;
+        this.targetLedgers = targetLedgers;
+        this.targetEntrySize = targetEntrySize;
+        this.targetConcurrentReads = targetConcurrentReads;
+        this.coldToHotRatio = coldToHotRatio;
+
+        this.targetLedgerEntries = targetLedgerSize / targetEntrySize;
+    }
+
+    private int outstandingWriteCount = 0;
+    private int outstandingReadCount = 0;
+    private long nextLedger = 0;
+    private long getNextLedgerID() {
+        return nextLedger++;
+    }
+
+    /**
+     * State required to regenerate an entry.
+     */
+    class EntryInfo {
+        private final long entryID;
+        private final long seed;
+        EntryInfo(long entryID, long seed) {
+            this.entryID = entryID;
+            this.seed = seed;
+        }
+        byte[] getBuffer() {
+            Random rand = new Random(seed);
+            byte[] ret = new byte[targetEntrySize];
+            rand.nextBytes(ret);
+            return ret;
+        }
+        long getEntryID() {
+            return entryID;
+        }
+    }
+
+    /**
+     * Contains the state required to reconstruct the contents of any entry in the ledger.
+     * The seed value passed into the constructor fully determines the contents of the
+     * ledger.  Each EntryInfo has its own seed generated sequentially from a Random instance
+     * seeded from the original seed.  It then uses that seed to generate a secondary Random
+     * instance for generating the bytes within the entry.  See EntryIterator for details.
+     * Random(seed)
+     *  |
+     *  E0 -> Random(E0) -> getBuffer()
+     *  |
+     *  E1 -> Random(E1) -> getBuffer()
+     *  |
+     *  E2 -> Random(E2) -> getBuffer()
+     *  |
+     *  E3 -> Random(E3) -> getBuffer()
+     *  |
+     *  E4 -> Random(E4) -> getBuffer()
+     *  |
+     *  ...
+     */
+    class LedgerInfo {
+        private final long ledgerID;
+        private final long seed;
+        private long lastEntryIDCompleted = -1;
+        private long confirmedLAC = -1;
+        private boolean closed = false;
+
+        final TreeSet<Long> writesInProgress = new TreeSet<>();
+        final TreeSet<Long> writesCompleted = new TreeSet<>();
+        int readsInProgress = 0;
+        Consumer<Consumer<Integer>> onLastOp = null;
+        Consumer<Consumer<Integer>> onLastWrite = null;
+
+        EntryIterator iter;
+
+        LedgerInfo(long ledgerID, long seed) {
+            this.ledgerID = ledgerID;
+            this.seed = seed;
+            iter = new EntryIterator();
+        }
+
+        long getLastEntryIDCompleted() {
+            return lastEntryIDCompleted;
+        }
+
+        long getConfirmedLAC() {
+            return confirmedLAC;
+        }
+
+        ArrayList<EntryInfo> getNextEntries(int num) {
+            ArrayList<EntryInfo> ret = new ArrayList<>();
+            for (int i = 0; i < num && iter.hasNext(); ++i) {
+                ret.add(iter.next());
+            }
+            return ret;
+        }
+
+        class EntryIterator implements Iterator<EntryInfo> {
+            Random rand;
+            long currentID;
+            long currentSeed;
+
+            EntryIterator() {
+                seek(-1);
+            }
+
+            void seek(long entryID) {
+                currentID = -1;
+                currentSeed = seed;
+                rand = new Random(seed);
+                while (currentID < entryID) {
+                    advance();
+                }
+            }
+
+            void advance() {
+                currentSeed = rand.nextLong();
+                currentID++;
+            }
+
+            EntryInfo get() {
+                return new EntryInfo(currentID, currentSeed);
+            }
+
+            @Override
+            public boolean hasNext() {
+                return currentID < targetLedgerEntries;
+            }
+
+
+            @Override
+            public EntryInfo next() {
+                advance();
+                return get();
+            }
+        }
+
+        EntryIterator getIterator() {
+            return new EntryIterator();
+        }
+
+        void openWrite(long entryID) {
+            writesInProgress.add(entryID);
+            System.out.format("Open writes, %s%n", writesInProgress.toString());
+        }
+
+        void incReads() {
+            readsInProgress++;
+            System.out.format("Inc reads to %d%n", readsInProgress);
+        }
+
+        /**
+         * The idea here is that we may need to register an operation which needs to run
+         * whenever the final op completes on this Ledger (like deletion).  If there
+         * are none, newOnLastOp should be called synchronously with cb.  Otherwise,
+         * cb should be called synchronously with cb and newOnLastOp should be called
+         * with the cb passed in with the decReads or closeWrite.
+         * In the deletion case, cb would be the callback for the error from
+         * the deletion operation (if it happens).  The reason for all of this is that
+         * the delete case will need to chain an async call to delete into the async callback
+         * chain for whatever the last operation to complete on this Ledger.  newOnLastOp
+         * would invoke that delete.  The cb passed in allows it to pick up and continue
+         * the original chain.
+         * @param cb Callback to get result of newOnLastOp if called now
+         * @param newOnLastOp Callback to be invoked on the last decReads or closeWrite,
+         *                    should be passed the cb passed in with the final closeWrite
+         *                    or decReads
+         */
+        void onLastOpComplete(
+                Consumer<Integer> cb,
+                Consumer<Consumer<Integer>> newOnLastOp) {
+            checkState(onLastOp == null);
+            onLastOp = newOnLastOp;
+            checkOpComplete(cb);
+        }
+
+        /**
+         * Very similar to onLastOpComplete, but gets called on the final call to closeWrite.
+         * @param cb Callback to get result of newOnLastWrite if called now
+         * @param newOnLastWrite Callback to be invoked on the last closeWrite,
+         *                       should be passed the cb passed in with the final closeWrite.
+         */
+        void onLastWriteComplete(
+                Consumer<Integer> cb,
+                Consumer<Consumer<Integer>> newOnLastWrite) {
+            assert (onLastWrite == null);
+            onLastWrite = newOnLastWrite;
+            checkWriteComplete(cb);
+        }
+
+        void closeWrite(long entryID, Consumer<Integer> cb) {
+            writesInProgress.remove(entryID);
+            writesCompleted.add(entryID);
+            long completedTo = writesInProgress.isEmpty() ? Long.MAX_VALUE : writesInProgress.first();
+            while (!writesCompleted.isEmpty() && writesCompleted.first() < completedTo) {
+                lastEntryIDCompleted = writesCompleted.first();
+                writesCompleted.remove(writesCompleted.first());
+            }
+            checkWriteComplete((rc) -> {
+                checkReturn(ledgerID, rc);
+                checkOpComplete(cb);
+            });
+        }
+
+        void updateLAC(long lac) {
+            if (lac > confirmedLAC) {
+                confirmedLAC = lac;
+            }
+        }
+
+        void decReads(Consumer<Integer> cb) {
+            --readsInProgress;
+            checkOpComplete(cb);
+        }
+
+        private void checkWriteComplete(Consumer<Integer> cb) {
+            if (writesInProgress.isEmpty() && onLastWrite != null) {
+                System.out.format("checkWriteComplete: done%n");
+                onLastWrite.accept(cb);
+                onLastWrite = null;
+            } else {
+                System.out.format(
+                        "checkWriteComplete: ledger %d, writesInProgress %s%n",
+                        ledgerID,
+                        writesInProgress.toString());
+                cb.accept(0);
+            }
+        }
+
+        private void checkOpComplete(Consumer<Integer> cb) {
+            if (readsInProgress == 0 && writesInProgress.isEmpty() && onLastOp != null) {
+                System.out.format("checkOpComplete: done%n");
+                onLastOp.accept(cb);
+                onLastOp = null;
+            } else {
+                System.out.format(
+                        "checkOpComplete: ledger %d, writesInProgress %s, readsInProgress %d%n",
+                        ledgerID,
+                        writesInProgress.toString(), readsInProgress);
+                cb.accept(0);
+            }
+        }
+
+        public boolean isClosed() {
+            return closed;
+        }
+        public void setClosed() {
+            closed = true;
+            confirmedLAC = lastEntryIDCompleted;
+        }
+    }
+
+    private final Set<LedgerInfo> openingLedgers = new HashSet<>();
+    private final Set<LedgerInfo> openLedgers = new HashSet<>();
+    private final Set<LedgerInfo> liveLedgers = new HashSet<>();
+    private final Random opRand = new Random();
+
+    private LedgerInfo getRandomLedger(Collection<LedgerInfo> ledgerCollection) {
+        int elem = opRand.nextInt(ledgerCollection.size());
+        Iterator<LedgerInfo> iter = ledgerCollection.iterator();
+        for (int i = 0; i < elem; ++i) {
+            iter.next();
+        }
+        return iter.next();
+    }
+
+    private synchronized boolean startRead() {
+        if (outstandingReadCount > targetConcurrentReads) {
+            System.out.format("Not starting another read, enough in progress%n");
+            /* Caller should exit and wait for outstandingReadCount to fall */
+            return false;
+        }
+        LedgerInfo ledger;
+        if (!openLedgers.isEmpty() && (opRand.nextDouble() > coldToHotRatio)) {
+            ledger = getRandomLedger(openLedgers);
+            System.out.format("Reading from open ledger %d%n", ledger.ledgerID);
+        } else if (!liveLedgers.isEmpty()) {
+            ledger = getRandomLedger(liveLedgers);
+            System.out.format("Reading from cold ledger %d%n", ledger.ledgerID);
+        } else {
+            /* No readable ledgers, either startWrite can make progress, or there are already ledgers
+             * opening.
+             */
+            return false;
+        }
+        long lastEntryCompleted = ledger.getConfirmedLAC();
+        if (lastEntryCompleted <= 0) {
+            System.out.format("No readable entries in ledger %d, let's wait%n", ledger.ledgerID);
+            /* Either startWrite can make progress or there are already a bunch in progress */
+            return false;
+        }
+        long start = Math.abs(opRand.nextLong() % lastEntryCompleted);
+        long end = start + targetReadGroup > lastEntryCompleted ? lastEntryCompleted : start + targetReadGroup;
+        System.out.format("Reading %d -> %d from ledger %d%n", start, end, ledger.ledgerID);
+        LedgerInfo finalLedger = ledger;
+        ledger.incReads();
+        driver.readEntries(ledger.ledgerID, start, end, (rc, results) -> {
+            synchronized (BookkeeperVerifier.this) {
+                if (checkReturn(ledger.ledgerID, rc)) {
+                    return;
+                }
+                System.out.format("Read %d -> %d from ledger %d complete%n", start, end, ledger.ledgerID);
+                long current = start;
+                LedgerInfo.EntryIterator iterator = finalLedger.getIterator();
+                iterator.seek(current - 1);
+                for (byte[] result : results) {
+                    byte[] check = iterator.next().getBuffer();
+                    if (result.length != check.length) {
+                        propagateExceptionToMain(new Exception(String.format(
+                                "Mismatched entry length on entry %d for ledger %d, read returned %d, should be %d",
+                                current, ledger.ledgerID, result.length, check.length)
+                        ));
+                    }
+                        /* Verify contents */
+                    if (!Arrays.equals(check, result)) {
+                        int i = 0;
+                        for (; i < check.length; ++i) {
+                            if (check[i] != result[i]) {
+                                break;
+                            }
+                        }
+                        propagateExceptionToMain(new Exception(String.format(
+                                "Mismatched entry contents on entry %d for ledger %d at offset %d, length %d",
+                                current, ledger.ledgerID, i, check.length)
+                        ));
+                    }
+                    current++;
+                }
+                finalLedger.decReads((rc2) -> {
+                    synchronized (BookkeeperVerifier.this) {
+                        checkReturn(ledger.ledgerID, rc2);
+                        System.out.format("Read %d -> %d from ledger %d releasing read%n", start, end, ledger.ledgerID);
+                        outstandingReadCount--;
+                        BookkeeperVerifier.this.notifyAll();
+                    }
+                });
+            }
+        });
+        ++outstandingReadCount;
+        return true;
+    }
+
+    class WriteCallback implements Consumer<Integer> {
+        private int completed = 0;
+        private final int toWaitFor;
+        private final LedgerInfo ledger;
+        private final long lastEntry;
+        private final long pendingLAC;
+        WriteCallback(LedgerInfo ledger, long lastEntry, long pendingLAC, int toWaitFor) {
+            this.toWaitFor = toWaitFor;
+            this.ledger = ledger;
+            this.lastEntry = lastEntry;
+            this.pendingLAC = pendingLAC;
+        }
+
+        @Override
+        public void accept(Integer rc) {
+            synchronized (BookkeeperVerifier.this) {
+                if (checkReturn(ledger.ledgerID, rc)) {
+                    return;
+                }
+                ++completed;
+                if (toWaitFor == completed) {
+                    System.out.format("Writes ending at %d complete on ledger %d%n", lastEntry, ledger.ledgerID);
+                    ledger.closeWrite(lastEntry, (rc2) -> {
+                        synchronized (BookkeeperVerifier.this) {
+                            checkReturn(ledger.ledgerID, rc2);
+                            System.out.format("Writes ending at %d complete on ledger %d releasing write%n",
+                                    lastEntry, ledger.ledgerID);
+                            --outstandingWriteCount;
+                            BookkeeperVerifier.this.notifyAll();
+                        }
+                    });
+                    ledger.updateLAC(pendingLAC);
+                }
+            }
+        }
+    }
+
+    /**
+     * Attempt to start one more write, return false if too many are in progress.
+     * @return false if unable to start more
+     */
+    private synchronized boolean startWrite() {
+        if (outstandingWriteCount > targetConcurrentWrites) {
+            System.out.format("Write paused, too many outstanding writes%n");
+            /* Caller should release lock and wait for outstandingWriteCount to fall */
+            return false;
+        }
+        if (openLedgers.size() + openingLedgers.size() < targetConcurrentLedgers) {
+            /* Not enough open ledgers, open a new one -- counts as a write */
+            long newID = getNextLedgerID();
+            System.out.format("Creating new ledger %d%n", newID);
+            LedgerInfo ledger = new LedgerInfo(newID, opRand.nextLong());
+            openingLedgers.add(ledger);
+            driver.createLedger(newID, ensembleSize, writeQuorum, ackQuorum, (rc) -> {
+                synchronized (BookkeeperVerifier.this) {
+                    checkReturn(newID, rc);
+                    System.out.format("Created new ledger %d%n", newID);
+                    openingLedgers.remove(ledger);
+                    openLedgers.add(ledger);
+                    --outstandingWriteCount;
+                    BookkeeperVerifier.this.notifyAll();
+                }
+            });
+            ++outstandingWriteCount;
+            return true;
+        } else if (openLedgers.isEmpty()) {
+            System.out.format("Not starting a write, no open ledgers, already opening the limit%n");
+            /* Caller should release lock and wait for openLedgers to be populated */
+            return false;
+        } else {
+            LedgerInfo ledger = getRandomLedger(openLedgers);
+            ArrayList<EntryInfo> toWrite = ledger.getNextEntries(targetWriteGroup);
+            long lastEntry = toWrite.get(toWrite.size() - 1).getEntryID();
+            System.out.format(
+                    "Writing entries %d -> %d to ledger %d%n",
+                    toWrite.get(0).getEntryID(),
+                    lastEntry,
+                    ledger.ledgerID);
+            ledger.openWrite(lastEntry);
+
+            WriteCallback writeCB = new WriteCallback(
+                    ledger, lastEntry, ledger.getLastEntryIDCompleted(), toWrite.size());
+            for (EntryInfo entry: toWrite) {
+                driver.writeEntry(ledger.ledgerID, entry.getEntryID(), entry.getBuffer(), writeCB);
+            }
+            ++outstandingWriteCount;
+
+            if (lastEntry >= targetLedgerEntries) {
+                /* Remove this ledger from the writable list, mark for closing once all open writes complete */
+                System.out.format("Marking ledger %d for close%n", ledger.ledgerID);
+                openLedgers.remove(ledger);
+                liveLedgers.add(ledger);
+                ledger.onLastWriteComplete((rc) -> checkReturn(ledger.ledgerID, rc), (Consumer<Integer> cb) -> {
+                    System.out.format("Closing ledger %d%n", ledger.ledgerID);
+                    driver.closeLedger(ledger.ledgerID, (Integer rc) -> {
+                        synchronized (BookkeeperVerifier.this) {
+                            ledger.setClosed();
+                            System.out.format("Closed ledger %d%n", ledger.ledgerID);
+
+                            if (liveLedgers.size() >= targetLedgers) {
+                                /* We've closed the ledger, but now we have too many closed but readable ledgers,
+                                 * start deleting one. */
+                                LedgerInfo toDelete = getRandomLedger(liveLedgers);
+                                final long ledgerID = toDelete.ledgerID;
+                                System.out.format("Marking ledger %d for deletion%n", ledgerID);
+                                liveLedgers.remove(toDelete);
+                                toDelete.onLastOpComplete(cb, (Consumer<Integer> cb2) -> {
+                                    System.out.format("Deleting ledger %d%n", ledgerID);
+                                    driver.deleteLedger(ledgerID, (rc2) -> {
+                                        synchronized (BookkeeperVerifier.this) {
+                                            System.out.format("Deleted ledger %d%n", ledgerID);
+                                            cb2.accept(rc2);
+                                        }
+                                    });
+                                });
+                            } else {
+                                cb.accept(rc);
+                            }
+                        }
+                    });
+                });
+            }
+
+            Collections.shuffle(toWrite);
+            return true;
+        }
+    }
+
+    /**
+     * This is the method used to invoke the main loop of the IO driver.  run() will loop
+     * starting IO requests until the time runs out on the test and all outstanding requests
+     * complete.  Test execution state is accessed only under the instance lock for 'this'.
+     * There is no fine grained locking, hence run() simply needs to be synchronized and
+     * can wait for IOs to complete atomically with startWrite and startRead returning
+     * false (see those comments).
+     *
+     * @throws Exception
+     */
+    public synchronized void run() throws Exception {
+        long start = System.currentTimeMillis();
+        long testEnd = start + (duration * 1000);
+        long testDrainEnd = testEnd + (drainTimeout * 1000);
+
+        /* Keep IO running until testEnd */
+        while (System.currentTimeMillis() < testEnd) {
+
+            /* see startRead and startWrite, they return false once no more IO can be started */
+            while (startRead() || startWrite()) {}
+            long toWait = testEnd - System.currentTimeMillis();
+
+            /* atomically wait for either IO to complete or the test to end */
+            this.wait(toWait < 0 ? 0 : toWait);
+            printThrowExceptions();
+        }
+
+        /* Wait for all in progress ops to complete, outstanding*Count is updated under the lock */
+        while ((System.currentTimeMillis() < testDrainEnd)
+               && (outstandingReadCount > 0 || outstandingWriteCount > 0)) {
+            System.out.format("reads: %d, writes: %d%n", outstandingReadCount, outstandingWriteCount);
+            System.out.format("openingLedgers:%n");
+            for (LedgerInfo li: openingLedgers) {
+                System.out.format(
+                        "Ledger %d has reads: %d, writes: %d%n",
+                        li.ledgerID,
+                        li.readsInProgress,
+                        li.writesInProgress.size());
+            }
+            System.out.format("openLedgers:%n");
+            for (LedgerInfo li: openLedgers) {
+                System.out.format(
+                        "Ledger %d has reads: %d, writes: %d%n",
+                        li.ledgerID,
+                        li.readsInProgress,
+                        li.writesInProgress.size());
+            }
+            System.out.format("liveLedgers:%n");
+            for (LedgerInfo li: liveLedgers) {
+                System.out.format(
+                        "Ledger %d has reads: %d, writes: %d%n",
+                        li.ledgerID,
+                        li.readsInProgress,
+                        li.writesInProgress.size());
+            }
+            long toWait = testDrainEnd - System.currentTimeMillis();
+            this.wait(toWait < 0 ? 0 : toWait);
+            printThrowExceptions();
+        }
+        if (outstandingReadCount > 0 || outstandingWriteCount > 0) {
+            throw new Exception("Failed to drain ops before timeout%n");
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/BookkeeperVerifierMain.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/BookkeeperVerifierMain.java
new file mode 100644
index 000000000..ede3504bb
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/BookkeeperVerifierMain.java
@@ -0,0 +1,178 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.verifier;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+
+/**
+ * Performs a configurable IO stream against a bookkeeper client while
+ * validating results.
+ */
+public class BookkeeperVerifierMain {
+
+    private static void printHelpAndExit(Options options, String header, int code) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp(
+                "BookkeeperVerifierMain",
+                header,
+                options, "", true);
+        System.exit(code);
+    }
+
+    public static void main(String[] args) throws Exception {
+
+        Options options = new Options();
+        options.addOption(
+                "ledger_path", true, "Hostname or IP of bookie to benchmark");
+        options.addOption(
+                "zookeeper",
+                true,
+                "Zookeeper ensemble, (default \"localhost:2181\")");
+        options.addOption(
+                "ensemble_size", true, "Bookkeeper client ensemble size");
+        options.addOption(
+                "write_quorum", true, "Bookkeeper client write quorum size");
+        options.addOption("ack_quorum", true, "Bookkeeper client ack quorum size");
+        options.addOption("duration", true, "Run duration in seconds");
+        options.addOption("drainTimeout", true, "Seconds to wait for in progress ops to end");
+        options.addOption(
+                "target_concurrent_ledgers",
+                true,
+                "target number ledgers to write to concurrently");
+        options.addOption(
+                "target_concurrent_writes",
+                true,
+                "target number of concurrent writes per ledger");
+        options.addOption(
+                "target_write_group",
+                true,
+                "target number of entries to write at a time");
+        options.addOption(
+                "target_read_group",
+                true,
+                "target number of entries to read at a time");
+        options.addOption("target_ledgers", true, "Target number of ledgers");
+        options.addOption("target_ledger_size", true, "Target size per ledger");
+        options.addOption("target_entry_size", true, "Target size per entry");
+        options.addOption(
+                "target_concurrent_reads", true, "Number of reads to maintain");
+        options.addOption(
+                "cold_to_hot_ratio", true, "Ratio of reads on open ledgers");
+        options.addOption("help", false, "Print this help message");
+
+        CommandLineParser parser = new PosixParser();
+        CommandLine cmd = null;
+        try {
+            cmd = parser.parse(options, args);
+        } catch (ParseException e) {
+            printHelpAndExit(options, "Unable to parse command line", 1);
+        }
+
+        if (cmd.hasOption("help")) {
+            printHelpAndExit(options, "Help:", 0);
+        }
+
+        String ledgerPath = cmd.getOptionValue("ledger_path", "/ledgers");
+        String zkString = cmd.getOptionValue("zookeeper", "localhost:2181");
+        int ensembleSize = 0;
+        int writeQuorum = 0;
+        int ackQuorum = 0;
+        int duration = 0;
+        int drainTimeout = 0;
+        int targetConcurrentLedgers = 0;
+        int targetConcurrentWrites = 0;
+        int targetWriteGroup = 0;
+        int targetReadGroup = 0;
+        int targetLedgers = 0;
+        long targetLedgerSize = 0;
+        int targetEntrySize = 0;
+        int targetConcurrentReads = 0;
+        double coldToHotRatio = 0;
+
+        try {
+            ensembleSize = Integer.parseInt(cmd.getOptionValue("ensemble_size", "3"));
+            writeQuorum = Integer.parseInt(cmd.getOptionValue("write_quorum", "3"));
+            ackQuorum = Integer.parseInt(cmd.getOptionValue("ack_quorum", "2"));
+            duration = Integer.parseInt(cmd.getOptionValue("duration", "600"));
+            drainTimeout = Integer.parseInt(cmd.getOptionValue("drain_timeout", "10"));
+            targetConcurrentLedgers =
+                    Integer.parseInt(cmd.getOptionValue("target_concurrent_ledgers", "4"));
+            targetConcurrentWrites =
+                    Integer.parseInt(cmd.getOptionValue("target_concurrent_writes", "12"));
+            targetWriteGroup =
+                    Integer.parseInt(cmd.getOptionValue("target_write_group", "4"));
+            targetReadGroup =
+                    Integer.parseInt(cmd.getOptionValue("target_read_group", "4"));
+            targetLedgers = Integer.parseInt(cmd.getOptionValue("target_ledgers", "32"));
+            targetLedgerSize = Long.parseLong(cmd.getOptionValue(
+                    "target_ledger_size",
+                    "33554432"));
+            targetEntrySize = Integer.parseInt(cmd.getOptionValue(
+                    "target_entry_size",
+                    "16384"));
+            targetConcurrentReads = Integer.parseInt(cmd.getOptionValue(
+                    "target_concurrent_reads",
+                    "16"));
+            coldToHotRatio = Double.parseDouble(
+                    cmd.getOptionValue("cold_to_hot_ratio", "0.5"));
+        } catch (NumberFormatException e) {
+            printHelpAndExit(options, "Invalid argument", 0);
+        }
+
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setZkServers(zkString);
+        conf.setZkLedgersRootPath(ledgerPath);
+        BookKeeper bkclient = new BookKeeper(conf);
+
+        BookkeeperVerifier verifier = new BookkeeperVerifier(
+                new DirectBookkeeperDriver(bkclient),
+                ensembleSize,
+                writeQuorum,
+                ackQuorum,
+                duration,
+                drainTimeout,
+                targetConcurrentLedgers,
+                targetConcurrentWrites,
+                targetWriteGroup,
+                targetReadGroup,
+                targetLedgers,
+                targetLedgerSize,
+                targetEntrySize,
+                targetConcurrentReads,
+                coldToHotRatio);
+        try {
+            verifier.run();
+        } catch (Exception e) {
+            e.printStackTrace();
+            System.exit(1);
+        } finally {
+            bkclient.close();
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/DirectBookkeeperDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/DirectBookkeeperDriver.java
new file mode 100644
index 000000000..9611082cc
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/DirectBookkeeperDriver.java
@@ -0,0 +1,116 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.verifier;
+
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+
+/**
+ * Driver for a normal Bookkeeper cluster.
+ */
+class DirectBookkeeperDriver implements BookkeeperVerifier.BookkeeperDriver {
+    private final ConcurrentHashMap<Long, LedgerHandle> openHandles = new ConcurrentHashMap<>();
+    private BookKeeper client;
+
+    DirectBookkeeperDriver(BookKeeper client) {
+        this.client = client;
+    }
+
+    @Override
+    public void createLedger(long ledgerID, int enSize, int writeQSize, int ackQSize, Consumer<Integer> cb) {
+        client.asyncCreateLedgerAdv(
+                ledgerID,
+                enSize,
+                writeQSize,
+                ackQSize,
+                BookKeeper.DigestType.CRC32,
+                new byte[0],
+                (rc, lh, ctx) -> {
+                    openHandles.put(ledgerID, lh);
+                    cb.accept(rc);
+                },
+                null,
+                null);
+    }
+
+    @Override
+    public void closeLedger(long ledgerID, Consumer<Integer> cb) {
+        LedgerHandle handle = openHandles.remove(ledgerID);
+        handle.asyncClose(
+                (rc, lh, ctx) -> cb.accept(rc),
+                null);
+    }
+
+    @Override
+    public void deleteLedger(long ledgerID, Consumer<Integer> cb) {
+        client.asyncDeleteLedger(ledgerID, (rc, ctx) -> {
+            cb.accept(rc);
+        }, null);
+    }
+
+    @Override
+    public void writeEntry(long ledgerID, long entryID, byte[] data, Consumer<Integer> cb) {
+        LedgerHandle handle;
+        handle = openHandles.get(ledgerID);
+        if (handle == null) {
+            cb.accept(BKException.Code.WriteException);
+            return;
+        }
+        handle.asyncAddEntry(entryID, data, (rc, lh, entryId, ctx) -> {
+            cb.accept(rc);
+        }, null);
+    }
+
+    @Override
+    public void readEntries(
+            long ledgerID, long firstEntryID, long lastEntryID, BiConsumer<Integer, ArrayList<byte[]>> cb) {
+        client.asyncOpenLedgerNoRecovery(ledgerID, BookKeeper.DigestType.CRC32, new byte[0], (rc, lh, ctx) -> {
+            if (rc != 0) {
+                cb.accept(rc, null);
+                return;
+            }
+            System.out.format("Got handle for read %d -> %d on ledger %d%n", firstEntryID, lastEntryID, ledgerID);
+            lh.asyncReadEntries(firstEntryID, lastEntryID, (rc1, lh1, seq, ctx1) -> {
+                System.out.format("Read cb %d -> %d on ledger %d%n", firstEntryID, lastEntryID, ledgerID);
+                ArrayList<byte[]> results = new ArrayList<>();
+                if (rc1 == 0) {
+                    while (seq.hasMoreElements()) {
+                        results.add(seq.nextElement().getEntry());
+                    }
+                    System.out.format("About to close handle for read %d -> %d on ledger %d%n",
+                            firstEntryID, lastEntryID, ledgerID);
+                }
+                lh.asyncClose((rc2, lh2, ctx2) -> {
+                    System.out.format("Closed handle for read %d -> %d on ledger %d result %d%n",
+                            firstEntryID, lastEntryID, ledgerID, rc2);
+                    cb.accept(rc1 == 0 ? rc2 : rc1, results);
+                }, null);
+            }, null);
+        }, null);
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/package-info.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/package-info.java
new file mode 100644
index 000000000..c76d72d80
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/verifier/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+/**
+ * Simple self-verifying workload generator.
+ */
+package org.apache.bookkeeper.verifier;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/verifier/BookkeeperVerifierTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/verifier/BookkeeperVerifierTest.java
new file mode 100644
index 000000000..d282bfdc0
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/verifier/BookkeeperVerifierTest.java
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.verifier;
+
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+/**
+ * Test for BookKeeperVerifier.
+ */
+public class BookkeeperVerifierTest extends BookKeeperClusterTestCase {
+    public BookkeeperVerifierTest() {
+        super(3);
+    }
+
+    /**
+     * Simple test to verify that the verifier works against a local cluster.
+     */
+    @Test(timeout = 30000)
+    public void testBasic() throws Exception {
+        DirectBookkeeperDriver driver = new DirectBookkeeperDriver(bkc);
+        BookkeeperVerifier verifier = new BookkeeperVerifier(
+                driver,
+                3,
+                3,
+                2,
+                10,
+                5,
+                16,
+                4,
+                2,
+                2,
+                32,
+                16 << 10,
+                4 << 10,
+                4,
+                0.5
+        );
+        verifier.run();
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services