You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2013/10/12 05:42:22 UTC

svn commit: r1531494 [1/2] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ bookke...

Author: sijie
Date: Sat Oct 12 03:42:21 2013
New Revision: 1531494

URL: http://svn.apache.org/r1531494
Log:
BOOKKEEPER-657: Journal Improvement (Robin Dhamankar via sijie)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieThread.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DaemonThreadFactory.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat Oct 12 03:42:21 2013
@@ -166,6 +166,8 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-658: ledger cache refactor (Robin Dhamankar via sijie)
 
+      BOOKKEEPER-657: Journal Improvement (Robin Dhamankar via sijie)
+
     NEW FEATURE:
 
       BOOKKEEPER-562: Ability to tell if a ledger is closed or not (fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Sat Oct 12 03:42:21 2013
@@ -21,27 +21,26 @@
 
 package org.apache.bookkeeper.bookie;
 
+import static com.google.common.base.Charsets.UTF_8;
+
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.io.FilenameFilter;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.meta.LedgerManagerFactory;
-import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.GarbageCollectorThread.SafeEntryAdder;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
@@ -49,28 +48,29 @@ import org.apache.bookkeeper.bookie.Ledg
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.jmx.BKMBeanInfo;
 import org.apache.bookkeeper.jmx.BKMBeanRegistry;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.util.StringUtils;
+import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.util.net.DNS;
 import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
 import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static com.google.common.base.Charsets.UTF_8;
 import com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -78,7 +78,7 @@ import com.google.common.annotations.Vis
  *
  */
 
-public class Bookie extends Thread {
+public class Bookie extends BookieThread {
 
     static Logger LOG = LoggerFactory.getLogger(Bookie.class);
 

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieThread.java?rev=1531494&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieThread.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieThread.java Sat Oct 12 03:42:21 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+* Wrapper that wraps bookie threads
+* Any common handing that we require for all bookie threads
+* should be implemented here
+*/
+public class BookieThread extends Thread {
+
+    private static class BookieUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+        static Logger logger = LoggerFactory.getLogger(BookieUncaughtExceptionHandler.class);
+
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+            logger.error("Uncaught exception in thread " + t.getName(), e);
+            Runtime.getRuntime().exit(1);
+        }
+
+    }
+
+    public BookieThread (String name) {
+        super(name);
+        setUncaughtExceptionHandler(new BookieUncaughtExceptionHandler());
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java Sat Oct 12 03:42:21 2013
@@ -100,21 +100,38 @@ public class BufferedChannel
         return bc.size();
     }
 
-    public void flush(boolean sync) throws IOException {
+    /**
+     * Write any data in the buffer to the file. If sync is set to true, force a sync operation so that
+     * data is persisted to the disk.
+     * @param shouldForceWrite
+     * @throws IOException if the write or sync operation fails.
+     */
+    public void flush(boolean shouldForceWrite) throws IOException {
         synchronized(this) {
-            if (writeBuffer == null) {
-                return;
-            }
-            writeBuffer.flip();
-            bc.write(writeBuffer);
-            writeBuffer.clear();
-            writeBufferStartPosition = bc.position();
+            flushInternal();
         }
-        if (sync) {
+        if (shouldForceWrite) {
             forceWrite(false);
         }
     }
 
+    /**
+     * Write any data in the buffer to the file and advance the writeBufferPosition
+     * Callers are expected to synchronize appropriately
+     * @throws IOException if the write fails.
+     */
+    private void flushInternal() throws IOException {
+        if (writeBuffer == null) {
+            return;
+        }
+        writeBuffer.flip();
+        do {
+            bc.write(writeBuffer);
+        } while (writeBuffer.hasRemaining());
+        writeBuffer.clear();
+        writeBufferStartPosition = bc.position();
+    }
+
     public long forceWrite(boolean forceMetadata) throws IOException {
         // This is the point up to which we had flushed to the file system page cache
         // before issuing this force write hence is guaranteed to be made durable by

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java Sat Oct 12 03:42:21 2013
@@ -30,19 +30,24 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.util.DaemonThreadFactory;
 import org.apache.bookkeeper.util.IOUtils;
+import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Provide journal related management.
  */
-class Journal extends Thread implements CheckpointSource {
+class Journal extends BookieThread implements CheckpointSource {
 
     static Logger LOG = LoggerFactory.getLogger(Journal.class);
 
@@ -115,13 +120,19 @@ class Journal extends Thread implements 
         public int hashCode() {
             return mark.hashCode();
         }
+
+        @Override
+        public String toString() {
+            return mark.toString();
+        }
     }
 
     /**
      * Last Log Mark
      */
     class LastLogMark {
-        private LogMark curMark;
+        private final LogMark curMark;
+
         LastLogMark(long logId, long logPosition) {
             this.curMark = new LogMark(logId, logPosition);
         }
@@ -200,6 +211,11 @@ class Journal extends Thread implements 
                 }
             }
         }
+
+        @Override
+        public String toString() {
+            return curMark.toString();
+        }
     }
 
     /**
@@ -245,52 +261,240 @@ class Journal extends Thread implements 
      * Journal Entry to Record
      */
     private static class QueueEntry {
+        ByteBuffer entry;
+        long ledgerId;
+        long entryId;
+        WriteCallback cb;
+        Object ctx;
+        long enqueueTime;
+
         QueueEntry(ByteBuffer entry, long ledgerId, long entryId,
-                   WriteCallback cb, Object ctx) {
+                   WriteCallback cb, Object ctx, long enqueueTime) {
             this.entry = entry.duplicate();
             this.cb = cb;
             this.ctx = ctx;
             this.ledgerId = ledgerId;
             this.entryId = entryId;
+            this.enqueueTime = enqueueTime;
         }
 
-        ByteBuffer entry;
+        public void callback() {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
+            }
+            cb.writeComplete(0, ledgerId, entryId, null, ctx);
+        }
+    }
 
-        long ledgerId;
+    private class ForceWriteRequest implements Runnable {
+        private final JournalChannel logFile;
+        private final LinkedList<QueueEntry> forceWriteWaiters;
+        private boolean shouldClose;
+        private final boolean isMarker;
+        private final long lastFlushedPosition;
+        private final long logId;
 
-        long entryId;
+        private ForceWriteRequest(JournalChannel logFile,
+                          long logId,
+                          long lastFlushedPosition,
+                          LinkedList<QueueEntry> forceWriteWaiters,
+                          boolean shouldClose,
+                          boolean isMarker) {
+            this.forceWriteWaiters = forceWriteWaiters;
+            this.logFile = logFile;
+            this.logId = logId;
+            this.lastFlushedPosition = lastFlushedPosition;
+            this.shouldClose = shouldClose;
+            this.isMarker = isMarker;
+        }
 
-        WriteCallback cb;
+        public int process(boolean shouldForceWrite) throws IOException {
+            if (isMarker) {
+                return 0;
+            }
 
-        Object ctx;
+            try {
+                if (shouldForceWrite) {
+                    this.logFile.forceWrite(false);
+                }
+                lastLogMark.setCurLogMark(this.logId, this.lastFlushedPosition);
+
+                // Notify the waiters that the force write succeeded
+                cbThreadPool.submit(this);
+
+                return this.forceWriteWaiters.size();
+            }
+            finally {
+                closeFileIfNecessary();
+            }
+        }
+
+        @Override
+        public void run() {
+            for (QueueEntry e : this.forceWriteWaiters) {
+                e.callback();    // Process cbs inline
+            }
+        }
+
+        public void closeFileIfNecessary() {
+            // Close if shouldClose is set
+            if (shouldClose) {
+                // We should guard against exceptions so its
+                // safe to call in catch blocks
+                try {
+                    logFile.close();
+                    // Call close only once
+                    shouldClose = false;
+                }
+                catch (IOException ioe) {
+                    LOG.error("I/O exception while closing file", ioe);
+                }
+            }
+        }
+    }
+
+    /**
+     * ForceWriteThread is a background thread which makes the journal durable periodically
+     *
+     */
+    private class ForceWriteThread extends BookieThread {
+        volatile boolean running = true;
+        // This holds the queue entries that should be notified after a
+        // successful force write
+        Thread threadToNotifyOnEx;
+        // should we group force writes
+        private final boolean enableGroupForceWrites;
+        // make flush interval as a parameter
+        public ForceWriteThread(Thread threadToNotifyOnEx, boolean enableGroupForceWrites) {
+            super("ForceWriteThread");
+            this.threadToNotifyOnEx = threadToNotifyOnEx;
+            this.enableGroupForceWrites = enableGroupForceWrites;
+        }
+        @Override
+        public void run() {
+            LOG.info("ForceWrite Thread started");
+            boolean shouldForceWrite = true;
+            int numReqInLastForceWrite = 0;
+            while(running) {
+                ForceWriteRequest req = null;
+                try {
+                    req = forceWriteRequests.take();
+
+                    // Force write the file and then notify the write completions
+                    //
+                    if (!req.isMarker) {
+                        if (shouldForceWrite) {
+                            // if we are going to force write, any request that is already in the
+                            // queue will benefit from this force write - post a marker prior to issuing
+                            // the flush so until this marker is encountered we can skip the force write
+                            if (enableGroupForceWrites) {
+                                forceWriteRequests.put(new ForceWriteRequest(req.logFile, 0, 0, null, false, true));
+                            }
+
+                            // If we are about to issue a write, record the number of requests in
+                            // the last force write and then reset the counter so we can accumulate
+                            // requests in the write we are about to issue
+                            if (numReqInLastForceWrite > 0) {
+                                numReqInLastForceWrite = 0;
+                            }
+                        }
+                        numReqInLastForceWrite += req.process(shouldForceWrite);
+                    }
+
+                    if (enableGroupForceWrites &&
+                        // if its a marker we should switch back to flushing
+                        !req.isMarker &&
+                        // This indicates that this is the last request in a given file
+                        // so subsequent requests will go to a different file so we should
+                        // flush on the next request
+                        !req.shouldClose) {
+                        shouldForceWrite = false;
+                    }
+                    else {
+                        shouldForceWrite = true;
+                    }
+                } catch (IOException ioe) {
+                    LOG.error("I/O exception in ForceWrite thread", ioe);
+                    running = false;
+                } catch (InterruptedException e) {
+                    LOG.error("ForceWrite thread interrupted", e);
+                    // close is idempotent
+                    if (null != req) {
+                        req.closeFileIfNecessary();
+                    }
+                    running = false;
+                }
+            }
+            // Regardless of what caused us to exit, we should notify the
+            // the parent thread as it should either exit or be in the process
+            // of exiting else we will have write requests hang
+            threadToNotifyOnEx.interrupt();
+        }
+        // shutdown sync thread
+        void shutdown() throws InterruptedException {
+            running = false;
+            this.interrupt();
+            this.join();
+        }
     }
 
     final static long MB = 1024 * 1024L;
+    final static int KB = 1024;
     // max journal file size
     final long maxJournalSize;
+    // pre-allocation size for the journal files
+    final long journalPreAllocSize;
+    // write buffer size for the journal files
+    final int journalWriteBufferSize;
     // number journal files kept before marked journal
     final int maxBackupJournals;
 
     final File journalDirectory;
     final ServerConfiguration conf;
+    ForceWriteThread forceWriteThread;
+    // should we group force writes
+    private final boolean enableGroupForceWrites;
+    // Time after which we will stop grouping and issue the flush
+    private final long maxGroupWaitInMSec;
+    // Threshold after which we flush any buffered journal writes
+    private final long bufferedWritesThreshold;
+    // should we flush if the queue is empty
+    private final boolean flushWhenQueueEmpty;
     // should we hint the filesystem to remove pages from cache after force write
     private final boolean removePagesFromCache;
 
-    private LastLogMark lastLogMark = new LastLogMark(0, 0);
+    private final LastLogMark lastLogMark = new LastLogMark(0, 0);
+
+    /**
+     * The thread pool used to handle callback.
+     */
+    private final ExecutorService cbThreadPool;
 
     // journal entry queue to commit
     LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
+    LinkedBlockingQueue<ForceWriteRequest> forceWriteRequests = new LinkedBlockingQueue<ForceWriteRequest>();
 
     volatile boolean running = true;
-    private LedgerDirsManager ledgerDirsManager;
+    private final LedgerDirsManager ledgerDirsManager;
 
     public Journal(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager) {
         super("BookieJournal-" + conf.getBookiePort());
         this.ledgerDirsManager = ledgerDirsManager;
         this.conf = conf;
         this.journalDirectory = Bookie.getCurrentDirectory(conf.getJournalDir());
-        this.maxJournalSize = conf.getMaxJournalSize() * MB;
+        this.maxJournalSize = conf.getMaxJournalSizeMB() * MB;
+        this.journalPreAllocSize = conf.getJournalPreAllocSizeMB() * MB;
+        this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB;
         this.maxBackupJournals = conf.getMaxBackupJournals();
+        this.enableGroupForceWrites = conf.getJournalAdaptiveGroupWrites();
+        this.forceWriteThread = new ForceWriteThread(this, enableGroupForceWrites);
+        this.maxGroupWaitInMSec = conf.getJournalMaxGroupWaitMSec();
+        this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold();
+        this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumAddWorkerThreads(), new DaemonThreadFactory());
+
+        // Unless there is a cap on the max wait (which requires group force writes)
+        // we cannot skip flushing for queue empty
+        this.flushWhenQueueEmpty = !enableGroupForceWrites || conf.getJournalFlushWhenQueueEmpty();
 
         this.removePagesFromCache = conf.getJournalRemovePagesFromCache();
         // read last log mark
@@ -362,9 +566,9 @@ class Journal extends Thread implements 
         throws IOException {
         JournalChannel recLog;
         if (journalPos <= 0) {
-            recLog = new JournalChannel(journalDirectory, journalId);
+            recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize);
         } else {
-            recLog = new JournalChannel(journalDirectory, journalId, journalPos);
+            recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, journalPos);
         }
         int journalVersion = recLog.getFormatVersion();
         try {
@@ -448,7 +652,7 @@ class Journal extends Thread implements 
         long ledgerId = entry.getLong();
         long entryId = entry.getLong();
         entry.rewind();
-        queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx));
+        queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano()));
     }
 
     /**
@@ -480,6 +684,7 @@ class Journal extends Thread implements 
         LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
         ByteBuffer lenBuff = ByteBuffer.allocate(4);
         JournalChannel logFile = null;
+        forceWriteThread.start();
         try {
             List<Long> journalIds = listJournalIds(journalDirectory, null);
             // Should not use MathUtils.now(), which use System.nanoTime() and
@@ -494,7 +699,11 @@ class Journal extends Thread implements 
                 // new journal file to write
                 if (null == logFile) {
                     logId = logId + 1;
-                    logFile = new JournalChannel(journalDirectory, logId, removePagesFromCache);
+                    logFile = new JournalChannel(journalDirectory,
+                                        logId,
+                                        journalPreAllocSize,
+                                        journalWriteBufferSize,
+                                        removePagesFromCache);
                     bc = logFile.getBufferedChannel();
 
                     lastFlushPosition = 0;
@@ -504,25 +713,43 @@ class Journal extends Thread implements 
                     if (toFlush.isEmpty()) {
                         qe = queue.take();
                     } else {
-                        qe = queue.poll();
-                        if (qe == null || bc.position() > lastFlushPosition + 512*1024) {
-                            //logFile.force(false);
+                        long pollWaitTime = maxGroupWaitInMSec - MathUtils.elapsedMSec(toFlush.getFirst().enqueueTime);
+                        if (flushWhenQueueEmpty || pollWaitTime < 0) {
+                            pollWaitTime = 0;
+                        }
+                        qe = queue.poll(pollWaitTime, TimeUnit.MILLISECONDS);
+                        boolean shouldFlush = false;
+                        // We should issue a forceWrite if any of the three conditions below holds good
+                        // 1. If the oldest pending entry has been pending for longer than the max wait time
+                        if (enableGroupForceWrites && (MathUtils.elapsedMSec(toFlush.getFirst().enqueueTime) > maxGroupWaitInMSec)) {
+                            shouldFlush = true;
+                        } else if ((bc.position() > lastFlushPosition + bufferedWritesThreshold)) {
+                            // 2. If we have buffered more than the buffWriteThreshold
+                            shouldFlush = true;
+                        } else if (qe == null) {
+                            // We should get here only if we flushWhenQueueEmpty is true else we would wait
+                            // for timeout that would put is past the maxWait threshold
+                            // 3. If the queue is empty i.e. no benefit of grouping. This happens when we have one
+                            // publish at a time - common case in tests.
+                            shouldFlush = true;
+                        }
+
+                        // toFlush is non null and not empty so should be safe to access getFirst
+                        if (shouldFlush) {
                             bc.flush(false);
-                            // This separation of flush and force is useful when adaptive group
-                            // force write is used where the flush thread does not block while
-                            // the force is issued by a separate thread
-                            logFile.forceWrite(false);
                             lastFlushPosition = bc.position();
-                            lastLogMark.setCurLogMark(logId, lastFlushPosition);
-                            for (QueueEntry e : toFlush) {
-                                e.cb.writeComplete(BookieException.Code.OK,
-                                                   e.ledgerId, e.entryId, null, e.ctx);
+
+                            // Trace the lifetime of entries through persistence
+                            if (LOG.isDebugEnabled()) {
+                                for (QueueEntry e : toFlush) {
+                                    LOG.debug("Written and queuing for flush Ledger:" + e.ledgerId + " Entry:" + e.entryId);
+                                }
                             }
-                            toFlush.clear();
 
+                            forceWriteRequests.put(new ForceWriteRequest(logFile, logId, lastFlushPosition, toFlush, (lastFlushPosition > maxJournalSize), false));
+                            toFlush = new LinkedList<QueueEntry>();
                             // check whether journal file is over file limit
                             if (bc.position() > maxJournalSize) {
-                                logFile.close();
                                 logFile = null;
                                 continue;
                             }
@@ -548,8 +775,10 @@ class Journal extends Thread implements 
                 bc.write(lenBuff);
                 bc.write(qe.entry);
 
+                // NOTE: preAlloc depends on the fact that we don't change file size while this is
+                // called or useful parts of the file will be zeroed out - in other words
+                // it depends on single threaded flushes to the JournalChannel
                 logFile.preAllocIfNeeded();
-
                 toFlush.add(qe);
                 qe = null;
             }
@@ -560,6 +789,11 @@ class Journal extends Thread implements 
         } catch (InterruptedException ie) {
             LOG.warn("Journal exits when shutting down", ie);
         } finally {
+            // There could be packets queued for forceWrite on this logFile
+            // That is fine as this exception is going to anyway take down the
+            // the bookie. If we execute this as a part of graceful shutdown,
+            // close will flush the file system cache making any previous
+            // cached writes durable so this is fine as well.
             IOUtils.close(LOG, logFile);
         }
         LOG.info("Journal exited loop!");
@@ -574,6 +808,13 @@ class Journal extends Thread implements 
                 return;
             }
             LOG.info("Shutting down Journal");
+            forceWriteThread.shutdown();
+            cbThreadPool.shutdown();
+            if (!cbThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
+                LOG.warn("Couldn't shutdown journal callback thread gracefully. Forcing");
+            }
+            cbThreadPool.shutdownNow();
+
             running = false;
             this.interrupt();
             this.join();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java Sat Oct 12 03:42:21 2013
@@ -33,6 +33,7 @@ import org.apache.bookkeeper.util.Native
 
 import static com.google.common.base.Charsets.UTF_8;
 
+import org.apache.bookkeeper.util.NativeIO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,25 +58,35 @@ class JournalChannel implements Closeabl
     int MIN_COMPAT_JOURNAL_FORMAT_VERSION = 1;
     int CURRENT_JOURNAL_FORMAT_VERSION = 4;
 
-    public final static long preAllocSize = 4*1024*1024;
-    public final static ByteBuffer zeros = ByteBuffer.allocate(512);
+    private long preAllocSize;
     private boolean fRemoveFromPageCache;
+    public final static ByteBuffer zeros = ByteBuffer.allocate(512);
+
     // The position of the file channel's last force write.
     private long lastForceWritePosition = 0;
 
+    // Mostly used by tests
     JournalChannel(File journalDirectory, long logId) throws IOException {
-        this(journalDirectory, logId, START_OF_FILE, false);
+        this(journalDirectory, logId, 4*1024*1024, 65536, START_OF_FILE);
     }
 
-    JournalChannel(File journalDirectory, long logId, long position) throws IOException {
-        this(journalDirectory, logId, position, false);
+    JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize) throws IOException {
+        this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE);
     }
 
-    JournalChannel(File journalDirectory, long logId, boolean fRemoveFromPageCache) throws IOException {
-        this(journalDirectory, logId, START_OF_FILE, fRemoveFromPageCache);
+    JournalChannel(File journalDirectory, long logId,
+                   long preAllocSize, int writeBufferSize, long position) throws IOException {
+         this(journalDirectory, logId, preAllocSize, writeBufferSize, position, false);
     }
 
-    JournalChannel(File journalDirectory, long logId, long position, boolean fRemoveFromPageCache) throws IOException {
+    JournalChannel(File journalDirectory, long logId,
+                   long preAllocSize, int writeBufferSize, boolean fRemoveFromPageCache) throws IOException {
+        this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE, fRemoveFromPageCache);
+    }
+
+    JournalChannel(File journalDirectory, long logId,
+                   long preAllocSize, int writeBufferSize, long position, boolean fRemoveFromPageCache) throws IOException {
+        this.preAllocSize = preAllocSize;
         this.fRemoveFromPageCache = fRemoveFromPageCache;
         File fn = new File(journalDirectory, Long.toHexString(logId) + ".txn");
 
@@ -97,8 +108,7 @@ class JournalChannel implements Closeabl
             bb.flip();
             fc.write(bb);
 
-            bc = new BufferedChannel(fc, 65536);
-
+            bc = new BufferedChannel(fc, writeBufferSize);
             forceWrite(true);
             nextPrealloc = preAllocSize;
             fc.write(zeros, nextPrealloc);
@@ -184,6 +194,9 @@ class JournalChannel implements Closeabl
     }
 
     public void forceWrite(boolean forceMetadata) throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Journal ForceWrite");
+        }
         long newForceWritePosition = bc.forceWrite(forceMetadata);
         if (newForceWritePosition > lastForceWritePosition) {
             if (fRemoveFromPageCache) {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java Sat Oct 12 03:42:21 2013
@@ -21,25 +21,22 @@
 
 package org.apache.bookkeeper.bookie;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Executors;
 
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
-
-import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
-
-import java.io.IOException;
-
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * SyncThread is a background thread which help checkpointing ledger storage
  * when a checkpoint is requested. After a ledger storage is checkpointed,

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Sat Oct 12 03:42:21 2013
@@ -24,6 +24,8 @@ import com.google.common.annotations.Bet
 
 import org.apache.commons.lang.StringUtils;
 
+import com.google.common.annotations.Beta;
+
 /**
  * Configuration manages server-side settings
  */
@@ -48,7 +50,13 @@ public class ServerConfiguration extends
     // Journal Parameters
     protected final static String MAX_JOURNAL_SIZE = "journalMaxSizeMB";
     protected final static String MAX_BACKUP_JOURNALS = "journalMaxBackups";
+    protected final static String JOURNAL_ADAPTIVE_GROUP_WRITES = "journalAdaptiveGroupWrites";
+    protected final static String JOURNAL_MAX_GROUP_WAIT_MSEC = "journalMaxGroupWaitMSec";
+    protected final static String JOURNAL_BUFFERED_WRITES_THRESHOLD = "journalBufferedWritesThreshold";
+    protected final static String JOURNAL_FLUSH_WHEN_QUEUE_EMPTY = "journalFlushWhenQueueEmpty";
     protected final static String JOURNAL_REMOVE_FROM_PAGE_CACHE = "journalRemoveFromPageCache";
+    protected final static String JOURNAL_PRE_ALLOC_SIZE = "journalPreAllocSizeMB";
+    protected final static String JOURNAL_WRITE_BUFFER_SIZE = "journalWriteBufferSizeKB";
     // Bookie Parameters
     protected final static String BOOKIE_PORT = "bookiePort";
     protected final static String LISTENING_INTERFACE = "listeningInterface";
@@ -72,6 +80,12 @@ public class ServerConfiguration extends
     protected final static String AUDITOR_PERIODIC_CHECK_INTERVAL = "auditorPeriodicCheckInterval";
     protected final static String AUTO_RECOVERY_DAEMON_ENABLED = "autoRecoveryDaemonEnabled";
 
+    // Worker Thread parameters.
+    protected final static String NUM_ADD_WORKER_THREADS = "numAddWorkerThreads";
+
+    protected final static String READ_BUFFER_SIZE = "readBufferSizeBytes";
+    protected final static String WRITE_BUFFER_SIZE = "writeBufferSizeBytes";
+
     /**
      * Construct a default configuration object
      */
@@ -231,7 +245,7 @@ public class ServerConfiguration extends
      *
      * @return max journal file size
      */
-    public long getMaxJournalSize() {
+    public long getMaxJournalSizeMB() {
         return this.getLong(MAX_JOURNAL_SIZE, 2 * 1024);
     }
 
@@ -242,12 +256,30 @@ public class ServerConfiguration extends
      *          new max journal file size
      * @return server configuration
      */
-    public ServerConfiguration setMaxJournalSize(long maxJournalSize) {
+    public ServerConfiguration setMaxJournalSizeMB(long maxJournalSize) {
         this.setProperty(MAX_JOURNAL_SIZE, Long.toString(maxJournalSize));
         return this;
     }
 
     /**
+     * How much space should we pre-allocate at a time in the journal
+     *
+     * @return journal pre-allocation size in MB
+     */
+    public int getJournalPreAllocSizeMB() {
+        return this.getInt(JOURNAL_PRE_ALLOC_SIZE, 16);
+    }
+
+    /**
+     * Size of the write buffers used for the journal
+     *
+     * @return journal write buffer size in KB
+     */
+    public int getJournalWriteBufferSizeKB() {
+        return this.getInt(JOURNAL_WRITE_BUFFER_SIZE, 64);
+    }
+
+    /**
      * Max number of older journal files kept
      *
      * @return max number of older journal files to kept
@@ -648,6 +680,125 @@ public class ServerConfiguration extends
     }
 
     /**
+     * Get the number of threads that should handle write requests.
+     * @return
+     */
+    public int getNumAddWorkerThreads() {
+        return getInt(NUM_ADD_WORKER_THREADS, 1);
+    }
+
+    /**
+     * Get the number of bytes we should use as capacity for the {@link
+     * org.apache.bookkeeper.bookie.BufferedReadChannel}
+     * Default is 512 bytes
+     * @return read buffer size
+     */
+    public int getReadBufferBytes() {
+        return getInt(READ_BUFFER_SIZE, 512);
+    }
+
+    /**
+     * Set the number of bytes we should use as capacity for the {@link
+     * org.apache.bookkeeper.bookie.BufferedReadChannel}
+     *
+     * @param readBufferSize
+     *          Read Buffer Size
+     * @return server configuration
+     */
+    public ServerConfiguration setReadBufferBytes(int readBufferSize) {
+        setProperty(READ_BUFFER_SIZE, readBufferSize);
+        return this;
+    }
+
+    /**
+     * Get the number of bytes used as capacity for the write buffer. Default is
+     * 64KB.
+     * NOTE: Make sure this value is greater than the maximum message size.
+     * @return
+     */
+    public int getWriteBufferBytes() {
+        return getInt(WRITE_BUFFER_SIZE, 65536);
+    }
+
+    /**
+     * Set the number of bytes used as capacity for the write buffer.
+     *
+     * @param writeBufferBytes
+     *          Write Buffer Bytes
+     * @return server configuration
+     */
+    public ServerConfiguration setWriteBufferBytes(int writeBufferBytes) {
+        setProperty(WRITE_BUFFER_SIZE, writeBufferBytes);
+        return this;
+    }
+
+
+    /**
+     * Should we group journal force writes
+     *
+     * @return group journal force writes
+     */
+    public boolean getJournalAdaptiveGroupWrites() {
+        return getBoolean(JOURNAL_ADAPTIVE_GROUP_WRITES, true);
+    }
+
+    /**
+     * Enable/disable group journal force writes
+     *
+     * @param enabled flag to enable/disable group journal force writes
+     */
+    public ServerConfiguration setJournalAdaptiveGroupWrites(boolean enabled) {
+        setProperty(JOURNAL_ADAPTIVE_GROUP_WRITES, enabled);
+        return this;
+    }
+
+    /**
+     * Maximum latency to impose on a journal write to achieve grouping
+     *
+     * @return max wait for grouping
+     */
+    public long getJournalMaxGroupWaitMSec() {
+        return getLong(JOURNAL_MAX_GROUP_WAIT_MSEC, 200);
+    }
+
+    /**
+     * Maximum latency to impose on a journal write to achieve grouping
+     *
+     * @return max wait for grouping
+     */
+    public long getJournalBufferedWritesThreshold() {
+        return getLong(JOURNAL_BUFFERED_WRITES_THRESHOLD, 512 * 1024);
+    }
+
+
+    /**
+     * Set if we should flush the journal when queue is empty
+     */
+    public ServerConfiguration setJournalFlushWhenQueueEmpty(boolean enabled) {
+        setProperty(JOURNAL_FLUSH_WHEN_QUEUE_EMPTY, enabled);
+        return this;
+    }
+
+    /**
+     * Should we flush the journal when queue is empty
+     *
+     * @return flush when queue is empty
+     */
+    public boolean getJournalFlushWhenQueueEmpty() {
+        return getBoolean(JOURNAL_FLUSH_WHEN_QUEUE_EMPTY, false);
+    }
+
+    /**
+     * Should we remove pages from page cache after force write
+     *
+     * @return remove pages from cache
+     */
+    @Beta
+    public boolean getJournalRemovePagesFromCache() {
+        return getBoolean(JOURNAL_REMOVE_FROM_PAGE_CACHE, false);
+    }
+
+    /**
      * Set whether the bookie is able to go into read-only mode.
      * If this is set to false, the bookie will shutdown on encountering
      * an error condition.
@@ -757,25 +908,4 @@ public class ServerConfiguration extends
         return getBoolean(AUTO_RECOVERY_DAEMON_ENABLED, false);
     }
 
-    /**
-     * Should we remove pages from page cache after force write
-     *
-     * @return remove pages from cache
-     */
-    @Beta
-    public boolean getJournalRemovePagesFromCache() {
-        return getBoolean(JOURNAL_REMOVE_FROM_PAGE_CACHE, false);
-    }
-
-    /**
-     * Sets that whether should we remove pages from page cache after force write.
-     *
-     * @param enabled
-     *            - true if we need to remove pages from page cache. otherwise, false
-     * @return ServerConfiguration
-     */
-    public ServerConfiguration setJournalRemovePagesFromCache(boolean enabled) {
-        setProperty(JOURNAL_REMOVE_FROM_PAGE_CACHE, enabled);
-        return this;
-    }
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DaemonThreadFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DaemonThreadFactory.java?rev=1531494&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DaemonThreadFactory.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DaemonThreadFactory.java Sat Oct 12 03:42:21 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.util;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.Executors;
+
+public class DaemonThreadFactory implements ThreadFactory {
+    private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
+    private int priority = Thread.NORM_PRIORITY;
+    public DaemonThreadFactory() {
+    }
+    public DaemonThreadFactory(int priority) {
+        assert priority >= Thread.MIN_PRIORITY && priority <= Thread.MAX_PRIORITY;
+        this.priority = priority;
+    }
+    public Thread newThread(Runnable r) {
+        Thread thread = defaultThreadFactory.newThread(r);
+        thread.setDaemon(true);
+        thread.setPriority(priority);
+        return thread;
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java Sat Oct 12 03:42:21 2013
@@ -48,4 +48,29 @@ public class MathUtils {
         return System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
     }
 
+    /**
+     * Current time from some arbitrary time base in the past, counting in
+     * nanoseconds, and not affected by settimeofday or similar system clock
+     * changes. This is appropriate to use when computing how much longer to
+     * wait for an interval to expire.
+     *
+     * NOTE: only use it for measuring.
+     * http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
+     *
+     * @return current time in nanoseconds.
+     */
+    public static long nowInNano() {
+        return System.nanoTime();
+    }
+
+    /**
+     * Milliseconds elapsed since the time specified, the input is nanoTime
+     * the only conversion happens when computing the elapsed time
+     *
+     * @param startNanoTime the start of the interval that we are measuring
+     * @return elapsed time in milliseconds.
+     */
+    public static long elapsedMSec (long startNanoTime) {
+       return (System.nanoTime() - startNanoTime)/ NANOSECONDS_PER_MILLISECOND;
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java Sat Oct 12 03:42:21 2013
@@ -35,6 +35,7 @@ import junit.framework.Assert;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
@@ -132,7 +133,7 @@ public class BookieInitializationTest {
         tmpDir.delete();
         tmpDir.mkdir();
 
-        final ServerConfiguration conf = new ServerConfiguration()
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
                 .setZkServers(null).setJournalDirName(tmpDir.getPath())
                 .setLedgerDirNames(new String[] { tmpDir.getPath() });
 
@@ -198,7 +199,7 @@ public class BookieInitializationTest {
         tmpDir.delete();
         tmpDir.mkdir();
 
-        ServerConfiguration conf = new ServerConfiguration().setZkServers(null)
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration().setZkServers(null)
                 .setJournalDirName(tmpDir.getPath()).setLedgerDirNames(
                         new String[] { tmpDir.getPath() });
 
@@ -253,7 +254,7 @@ public class BookieInitializationTest {
         tmpDir.delete();
         tmpDir.mkdir();
 
-        ServerConfiguration conf = new ServerConfiguration();
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         int port = 12555;
         conf.setZkServers(null).setBookiePort(port).setJournalDirName(
                 tmpDir.getPath()).setLedgerDirNames(
@@ -285,7 +286,7 @@ public class BookieInitializationTest {
         tmpDir.delete();
         tmpDir.mkdir();
 
-        final ServerConfiguration conf = new ServerConfiguration()
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
                 .setZkServers(zkutil.getZooKeeperConnectString())
                 .setZkTimeout(5000).setJournalDirName(tmpDir.getPath())
                 .setLedgerDirNames(new String[] { tmpDir.getPath() });
@@ -310,7 +311,7 @@ public class BookieInitializationTest {
         tmpDir.mkdir();
         final String ZK_ROOT = "/ledgers2";
 
-        final ServerConfiguration conf = new ServerConfiguration()
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(zkutil.getZooKeeperConnectString())
             .setZkTimeout(5000).setJournalDirName(tmpDir.getPath())
             .setLedgerDirNames(new String[] { tmpDir.getPath() });

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java Sat Oct 12 03:42:21 2013
@@ -36,6 +36,7 @@ import java.util.Random;
 import org.apache.bookkeeper.client.ClientUtil;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -204,7 +205,7 @@ public class BookieJournalTest {
         writePreV2Journal(Bookie.getCurrentDirectory(journalDir), 100);
         writeIndexFileForLedger(Bookie.getCurrentDirectory(ledgerDir), 1, "testPasswd".getBytes());
 
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(null)
             .setJournalDirName(journalDir.getPath())
             .setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -242,7 +243,7 @@ public class BookieJournalTest {
 
         writeJunkJournal(Bookie.getCurrentDirectory(journalDir));
 
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(null)
             .setJournalDirName(journalDir.getPath())
             .setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -280,7 +281,7 @@ public class BookieJournalTest {
 
         writePreV2Journal(Bookie.getCurrentDirectory(journalDir), 0);
 
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(null)
             .setJournalDirName(journalDir.getPath())
             .setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -306,7 +307,7 @@ public class BookieJournalTest {
 
         writePostV2Journal(Bookie.getCurrentDirectory(journalDir), 0);
 
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(null)
             .setJournalDirName(journalDir.getPath())
             .setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -336,7 +337,7 @@ public class BookieJournalTest {
 
         writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
 
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(null)
             .setJournalDirName(journalDir.getPath())
             .setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -380,7 +381,7 @@ public class BookieJournalTest {
         writeIndexFileForLedger(Bookie.getCurrentDirectory(ledgerDir),
                                 1, "testPasswd".getBytes());
 
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(null)
             .setJournalDirName(journalDir.getPath())
             .setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -428,7 +429,7 @@ public class BookieJournalTest {
         writeIndexFileForLedger(Bookie.getCurrentDirectory(ledgerDir),
                                 1, "testPasswd".getBytes());
 
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(null)
             .setJournalDirName(journalDir.getPath())
             .setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -494,7 +495,7 @@ public class BookieJournalTest {
         writePartialIndexFileForLedger(Bookie.getCurrentDirectory(ledgerDir),
                                        1, "testPasswd".getBytes(), truncateMasterKey);
 
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(null)
             .setJournalDirName(journalDir.getPath())
             .setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -556,7 +557,7 @@ public class BookieJournalTest {
         writePartialIndexFileForLedger(Bookie.getCurrentDirectory(ledgerDir), 1, masterKey,
                                        truncateMasterKey);
 
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(null)
             .setJournalDirName(journalDir.getPath())
             .setLedgerDirNames(new String[] { ledgerDir.getPath() });

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java Sat Oct 12 03:42:21 2013
@@ -21,23 +21,27 @@
 
 package org.apache.bookkeeper.bookie;
 
-import org.apache.commons.io.FileUtils;
+import static org.apache.bookkeeper.bookie.UpgradeTest.newV1JournalDirectory;
+import static org.apache.bookkeeper.bookie.UpgradeTest.newV1LedgerDirectory;
+import static org.apache.bookkeeper.bookie.UpgradeTest.newV2JournalDirectory;
+import static org.apache.bookkeeper.bookie.UpgradeTest.newV2LedgerDirectory;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.test.PortManager;
+import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.commons.io.FileUtils;
 import org.apache.zookeeper.ZooKeeper;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.junit.Test;
 import org.junit.After;
 import org.junit.Before;
-import static org.junit.Assert.*;
-
-import static org.apache.bookkeeper.bookie.UpgradeTest.*;
+import org.junit.Test;
 
 public class CookieTest {
     ZooKeeperUtil zkutil;
@@ -75,7 +79,7 @@ public class CookieTest {
      */
     @Test(timeout=60000)
     public void testCleanStart() throws Exception {
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(zkutil.getZooKeeperConnectString())
             .setJournalDirName(newDirectory(false))
             .setLedgerDirNames(new String[] { newDirectory(false) })
@@ -94,7 +98,7 @@ public class CookieTest {
      */
     @Test(timeout=60000)
     public void testBadJournalCookie() throws Exception {
-        ServerConfiguration conf1 = new ServerConfiguration()
+        ServerConfiguration conf1 = TestBKConfiguration.newServerConfiguration()
             .setJournalDirName(newDirectory())
             .setLedgerDirNames(new String[] { newDirectory() })
             .setBookiePort(bookiePort);
@@ -103,7 +107,7 @@ public class CookieTest {
 
         String journalDir = newDirectory();
         String ledgerDir = newDirectory();
-        ServerConfiguration conf2 = new ServerConfiguration()
+        ServerConfiguration conf2 = TestBKConfiguration.newServerConfiguration()
             .setZkServers(zkutil.getZooKeeperConnectString())
             .setJournalDirName(journalDir)
             .setLedgerDirNames(new String[] { ledgerDir })
@@ -130,7 +134,7 @@ public class CookieTest {
         String[] ledgerDirs = new String[] {
             newDirectory(), newDirectory(), newDirectory() };
         String journalDir = newDirectory();
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(zkutil.getZooKeeperConnectString())
             .setJournalDirName(journalDir)
             .setLedgerDirNames(ledgerDirs)
@@ -171,7 +175,7 @@ public class CookieTest {
     public void testDirectoryAdded() throws Exception {
         String ledgerDir0 = newDirectory();
         String journalDir = newDirectory();
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(zkutil.getZooKeeperConnectString())
             .setJournalDirName(journalDir)
             .setLedgerDirNames(new String[] { ledgerDir0 })
@@ -203,7 +207,7 @@ public class CookieTest {
     public void testDirectoryCleared() throws Exception {
         String ledgerDir0 = newDirectory();
         String journalDir = newDirectory();
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(zkutil.getZooKeeperConnectString())
             .setJournalDirName(journalDir)
             .setLedgerDirNames(new String[] { ledgerDir0 , newDirectory() })
@@ -228,7 +232,7 @@ public class CookieTest {
      */
     @Test(timeout=60000)
     public void testBookiePortChanged() throws Exception {
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(zkutil.getZooKeeperConnectString())
             .setJournalDirName(newDirectory())
             .setLedgerDirNames(new String[] { newDirectory() , newDirectory() })
@@ -254,7 +258,7 @@ public class CookieTest {
      */
     @Test(timeout=60000)
     public void testNewBookieStartingWithAnotherBookiesPort() throws Exception {
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(zkutil.getZooKeeperConnectString())
             .setJournalDirName(newDirectory())
             .setLedgerDirNames(new String[] { newDirectory() , newDirectory() })
@@ -263,7 +267,7 @@ public class CookieTest {
         b.start();
         b.shutdown();
 
-        conf = new ServerConfiguration()
+        conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(zkutil.getZooKeeperConnectString())
             .setJournalDirName(newDirectory())
             .setLedgerDirNames(new String[] { newDirectory() , newDirectory() })
@@ -288,7 +292,7 @@ public class CookieTest {
         // Format the BK Metadata and generate INSTANCEID
         BookKeeperAdmin.format(adminConf, false, true);
 
-        ServerConfiguration bookieConf = new ServerConfiguration()
+        ServerConfiguration bookieConf = TestBKConfiguration.newServerConfiguration()
                 .setZkServers(zkutil.getZooKeeperConnectString())
                 .setJournalDirName(newDirectory(false))
                 .setLedgerDirNames(new String[] { newDirectory(false) })
@@ -319,7 +323,7 @@ public class CookieTest {
      */
     @Test(timeout=60000)
     public void testV2data() throws Exception {
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(zkutil.getZooKeeperConnectString())
             .setJournalDirName(newV2JournalDirectory())
             .setLedgerDirNames(new String[] { newV2LedgerDirectory() })
@@ -339,7 +343,7 @@ public class CookieTest {
      */
     @Test(timeout=60000)
     public void testV1data() throws Exception {
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(zkutil.getZooKeeperConnectString())
             .setJournalDirName(newV1JournalDirectory())
             .setLedgerDirNames(new String[] { newV1LedgerDirectory() })

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java Sat Oct 12 03:42:21 2013
@@ -19,14 +19,14 @@
 package org.apache.bookkeeper.bookie;
 
 import java.io.File;
-import java.io.IOException;
 
-import org.apache.bookkeeper.conf.ServerConfiguration;
+import junit.framework.Assert;
 
-import org.junit.Test;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.junit.After;
 import org.junit.Before;
-import junit.framework.Assert;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,7 +77,7 @@ public class CreateNewLogTest {
      */
     @Test(timeout=60000)
     public void testCreateNewLog() throws Exception {
-        ServerConfiguration conf = new ServerConfiguration();
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
                      
         // Creating a new configuration with a number of 
         // ledger directories.
@@ -99,4 +99,4 @@ public class CreateNewLogTest {
         Assert.assertTrue("Wrong log id", el.getCurrentLogId() > 1);
     }
 
-}
\ No newline at end of file
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java Sat Oct 12 03:42:21 2013
@@ -31,11 +31,11 @@ import junit.framework.TestCase;
 import org.apache.bookkeeper.bookie.GarbageCollectorThread.EntryLogMetadata;
 import org.apache.bookkeeper.bookie.GarbageCollectorThread.ExtractionScanner;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,7 +55,7 @@ public class EntryLogTest extends TestCa
         Bookie.checkDirectoryStructure(curDir);
 
         int gcWaitTime = 1000;
-        ServerConfiguration conf = new ServerConfiguration();
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setGcWaitTime(gcWaitTime);
         conf.setLedgerDirNames(new String[] {tmpDir.toString()});
         Bookie bookie = new Bookie(conf);
@@ -106,7 +106,7 @@ public class EntryLogTest extends TestCa
         File curDir = Bookie.getCurrentDirectory(tmpDir);
         Bookie.checkDirectoryStructure(curDir);
 
-        ServerConfiguration conf = new ServerConfiguration();
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setLedgerDirNames(new String[] {tmpDir.toString()});
         Bookie bookie = new Bookie(conf);
         // create some entries
@@ -167,7 +167,7 @@ public class EntryLogTest extends TestCa
             throws Exception {
         File tmpDir = File.createTempFile("bkTest", ".dir");
         tmpDir.delete();
-        ServerConfiguration conf = new ServerConfiguration();
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setLedgerDirNames(new String[] { tmpDir.toString() });
         EntryLogger entryLogger = null;
         try {
@@ -192,7 +192,7 @@ public class EntryLogTest extends TestCa
         ledgerDir1.delete();
         File ledgerDir2 = File.createTempFile("bkTest", ".dir");
         ledgerDir2.delete();
-        ServerConfiguration conf = new ServerConfiguration();
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(),
                 ledgerDir2.getAbsolutePath() });
         Bookie bookie = new Bookie(conf);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java Sat Oct 12 03:42:21 2013
@@ -27,8 +27,11 @@ import java.nio.ByteBuffer;
 
 import junit.framework.TestCase;
 
+import junit.framework.TestCase;
+
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.SnapshotMap;
@@ -70,7 +73,7 @@ public class LedgerCacheTest extends Tes
         // create current dir
         new File(ledgerDir, BookKeeperConstants.CURRENT_DIR).mkdir();
 
-        conf = new ServerConfiguration();
+        conf = TestBKConfiguration.newServerConfiguration();
         conf.setZkServers(null);
         conf.setJournalDirName(txnDir.getPath());
         conf.setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -256,7 +259,7 @@ public class LedgerCacheTest extends Tes
         ledgerDir1.delete();
         File ledgerDir2 = File.createTempFile("bkTest", ".dir");
         ledgerDir2.delete();
-        ServerConfiguration conf = new ServerConfiguration();
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(), ledgerDir2.getAbsolutePath() });
 
         Bookie bookie = new Bookie(conf);
@@ -312,7 +315,7 @@ public class LedgerCacheTest extends Tes
         ledgerDir.mkdir();
         Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
 
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(null)
             .setJournalDirName(journalDir.getPath())
             .setLedgerDirNames(new String[] { ledgerDir.getPath() })
@@ -326,7 +329,7 @@ public class LedgerCacheTest extends Tes
             b.addEntry(packet, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
         }
 
-        conf = new ServerConfiguration()
+        conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(null)
             .setJournalDirName(journalDir.getPath())
             .setLedgerDirNames(new String[] { ledgerDir.getPath() });

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java Sat Oct 12 03:42:21 2013
@@ -26,16 +26,16 @@ import junit.framework.TestCase;
 
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.junit.Before;
 import org.junit.Test;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TestLedgerDirsManager extends TestCase {
     static Logger LOG = LoggerFactory.getLogger(TestLedgerDirsManager.class);
 
-    ServerConfiguration conf = new ServerConfiguration();
+    ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
     File curDir;
     LedgerDirsManager dirsManager;
 
@@ -47,7 +47,7 @@ public class TestLedgerDirsManager exten
         curDir = Bookie.getCurrentDirectory(tmpDir);
         Bookie.checkDirectoryStructure(curDir);
 
-        ServerConfiguration conf = new ServerConfiguration();
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setLedgerDirNames(new String[] {tmpDir.toString()});
 
         dirsManager = new LedgerDirsManager(conf);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java Sat Oct 12 03:42:21 2013
@@ -37,6 +37,7 @@ import java.util.Arrays;
 import org.apache.bookkeeper.client.ClientUtil;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.bookkeeper.test.ZooKeeperUtil;
 import org.apache.zookeeper.ZooKeeper;
@@ -159,7 +160,7 @@ public class UpgradeTest {
     }
 
     private static void testUpgradeProceedure(String zkServers, String journalDir, String ledgerDir) throws Exception {
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(zkServers)
             .setJournalDirName(journalDir)
             .setLedgerDirNames(new String[] { ledgerDir })
@@ -216,7 +217,7 @@ public class UpgradeTest {
         String ledgerDir = newV2LedgerDirectory();
         testUpgradeProceedure(zkutil.getZooKeeperConnectString(), journalDir, ledgerDir);
         // Upgrade again
-        ServerConfiguration conf = new ServerConfiguration()
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(zkutil.getZooKeeperConnectString())
             .setJournalDirName(journalDir)
             .setLedgerDirNames(new String[] { ledgerDir })

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java Sat Oct 12 03:42:21 2013
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
@@ -58,8 +59,8 @@ import org.apache.commons.lang.StringUti
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooKeeper;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -92,7 +93,7 @@ public class TestLedgerUnderreplicationM
         zkUtil = new ZooKeeperUtil();
         zkUtil.startServer();
 
-        conf = new ServerConfiguration().setZkServers(zkUtil.getZooKeeperConnectString());
+        conf = TestBKConfiguration.newServerConfiguration().setZkServers(zkUtil.getZooKeeperConnectString());
 
         executor = Executors.newCachedThreadPool();
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Sat Oct 12 03:42:21 2013
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.client.Book
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.metastore.InMemoryMetaStore;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.AutoRecoveryMain;
@@ -71,7 +72,7 @@ public abstract class BookKeeperClusterT
     protected int numBookies;
     protected BookKeeperTestClient bkc;
 
-    protected ServerConfiguration baseConf = new ServerConfiguration();
+    protected ServerConfiguration baseConf = TestBKConfiguration.newServerConfiguration();
     protected ClientConfiguration baseClientConf = new ClientConfiguration();
 
     private Map<BookieServer, AutoRecoveryMain> autoRecoveryProcesses = new HashMap<BookieServer, AutoRecoveryMain>();
@@ -363,13 +364,11 @@ public abstract class BookKeeperClusterT
         bs.clear();
         Thread.sleep(1000);
         // restart them to ensure we can't
-        int j = 0;
         for (ServerConfiguration conf : bsConfs) {
             if (null != newConf) {
                 conf.loadConf(newConf);
             }
             bs.add(startBookie(conf));
-            j++;
         }
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java Sat Oct 12 03:42:21 2013
@@ -27,25 +27,26 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.concurrent.Executors;
 
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.junit.Test;
+import junit.framework.TestCase;
+
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import junit.framework.TestCase;
-
 public class BookieClientTest extends TestCase {
     static Logger LOG = LoggerFactory.getLogger(BookieClientTest.class);
     BookieServer bs;
@@ -53,7 +54,7 @@ public class BookieClientTest extends Te
     public int port = 13645;
     public ClientSocketChannelFactory channelFactory;
     public OrderedSafeExecutor executor;
-    ServerConfiguration conf = new ServerConfiguration();
+    ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
 
     @Override
     public void setUp() throws Exception {
@@ -64,7 +65,7 @@ public class BookieClientTest extends Te
         // Since this test does not rely on the BookKeeper client needing to
         // know via ZooKeeper which Bookies are available, okay, so pass in null
         // for the zkServers input parameter when constructing the BookieServer.
-        ServerConfiguration conf = new ServerConfiguration();
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setZkServers(null).setBookiePort(port)
             .setJournalDirName(tmpDir.getPath())
             .setLedgerDirNames(new String[] { tmpDir.getPath() });

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java Sat Oct 12 03:42:21 2013
@@ -22,25 +22,22 @@ package org.apache.bookkeeper.test;
  */
 import java.io.File;
 import java.util.Enumeration;
-
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.junit.Assert;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class tests that bookie rolling journals
@@ -59,7 +56,7 @@ public class BookieJournalRollingTest ex
     @Override
     public void setUp() throws Exception {
         // Set up the configuration properties needed.
-        baseConf.setMaxJournalSize(1);
+        baseConf.setMaxJournalSizeMB(1);
         baseConf.setMaxBackupJournals(1);
         super.setUp();
     }
@@ -216,7 +213,7 @@ public class BookieJournalRollingTest ex
         }
 
         // set flush interval to a large value
-        ServerConfiguration newConf = new ServerConfiguration();
+        ServerConfiguration newConf = TestBKConfiguration.newServerConfiguration();
         newConf.setFlushInterval(999999999);
         // restart bookies
         restartBookies(newConf);
@@ -254,7 +251,7 @@ public class BookieJournalRollingTest ex
         Thread.sleep(3 * baseConf.getFlushInterval());
 
         // restart bookies with flush interval set to a large value
-        ServerConfiguration newConf = new ServerConfiguration();
+        ServerConfiguration newConf = TestBKConfiguration.newServerConfiguration();
         newConf.setFlushInterval(999999999);
         // restart bookies
         restartBookies(newConf);