You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2013/10/12 05:42:22 UTC
svn commit: r1531494 [1/2] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/
bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ bookke...
Author: sijie
Date: Sat Oct 12 03:42:21 2013
New Revision: 1531494
URL: http://svn.apache.org/r1531494
Log:
BOOKKEEPER-657: Journal Improvement (Robin Dhamankar via sijie)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieThread.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DaemonThreadFactory.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat Oct 12 03:42:21 2013
@@ -166,6 +166,8 @@ Trunk (unreleased changes)
BOOKKEEPER-658: ledger cache refactor (Robin Dhamankar via sijie)
+ BOOKKEEPER-657: Journal Improvement (Robin Dhamankar via sijie)
+
NEW FEATURE:
BOOKKEEPER-562: Ability to tell if a ledger is closed or not (fpj)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Sat Oct 12 03:42:21 2013
@@ -21,27 +21,26 @@
package org.apache.bookkeeper.bookie;
+import static com.google.common.base.Charsets.UTF_8;
+
import java.io.File;
import java.io.FileNotFoundException;
-import java.io.IOException;
import java.io.FilenameFilter;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Map;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.meta.LedgerManagerFactory;
-import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.GarbageCollectorThread.SafeEntryAdder;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
@@ -49,28 +48,29 @@ import org.apache.bookkeeper.bookie.Ledg
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.jmx.BKMBeanInfo;
import org.apache.bookkeeper.jmx.BKMBeanRegistry;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.util.StringUtils;
+import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.util.net.DNS;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import static com.google.common.base.Charsets.UTF_8;
import com.google.common.annotations.VisibleForTesting;
/**
@@ -78,7 +78,7 @@ import com.google.common.annotations.Vis
*
*/
-public class Bookie extends Thread {
+public class Bookie extends BookieThread {
static Logger LOG = LoggerFactory.getLogger(Bookie.class);
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieThread.java?rev=1531494&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieThread.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieThread.java Sat Oct 12 03:42:21 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+* Wrapper that wraps bookie threads
+* Any common handing that we require for all bookie threads
+* should be implemented here
+*/
+public class BookieThread extends Thread {
+
+ private static class BookieUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+ static Logger logger = LoggerFactory.getLogger(BookieUncaughtExceptionHandler.class);
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ logger.error("Uncaught exception in thread " + t.getName(), e);
+ Runtime.getRuntime().exit(1);
+ }
+
+ }
+
+ public BookieThread (String name) {
+ super(name);
+ setUncaughtExceptionHandler(new BookieUncaughtExceptionHandler());
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java Sat Oct 12 03:42:21 2013
@@ -100,21 +100,38 @@ public class BufferedChannel
return bc.size();
}
- public void flush(boolean sync) throws IOException {
+ /**
+ * Write any data in the buffer to the file. If sync is set to true, force a sync operation so that
+ * data is persisted to the disk.
+ * @param shouldForceWrite
+ * @throws IOException if the write or sync operation fails.
+ */
+ public void flush(boolean shouldForceWrite) throws IOException {
synchronized(this) {
- if (writeBuffer == null) {
- return;
- }
- writeBuffer.flip();
- bc.write(writeBuffer);
- writeBuffer.clear();
- writeBufferStartPosition = bc.position();
+ flushInternal();
}
- if (sync) {
+ if (shouldForceWrite) {
forceWrite(false);
}
}
+ /**
+ * Write any data in the buffer to the file and advance the writeBufferPosition
+ * Callers are expected to synchronize appropriately
+ * @throws IOException if the write fails.
+ */
+ private void flushInternal() throws IOException {
+ if (writeBuffer == null) {
+ return;
+ }
+ writeBuffer.flip();
+ do {
+ bc.write(writeBuffer);
+ } while (writeBuffer.hasRemaining());
+ writeBuffer.clear();
+ writeBufferStartPosition = bc.position();
+ }
+
public long forceWrite(boolean forceMetadata) throws IOException {
// This is the point up to which we had flushed to the file system page cache
// before issuing this force write hence is guaranteed to be made durable by
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java Sat Oct 12 03:42:21 2013
@@ -30,19 +30,24 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.util.DaemonThreadFactory;
import org.apache.bookkeeper.util.IOUtils;
+import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provide journal related management.
*/
-class Journal extends Thread implements CheckpointSource {
+class Journal extends BookieThread implements CheckpointSource {
static Logger LOG = LoggerFactory.getLogger(Journal.class);
@@ -115,13 +120,19 @@ class Journal extends Thread implements
public int hashCode() {
return mark.hashCode();
}
+
+ @Override
+ public String toString() {
+ return mark.toString();
+ }
}
/**
* Last Log Mark
*/
class LastLogMark {
- private LogMark curMark;
+ private final LogMark curMark;
+
LastLogMark(long logId, long logPosition) {
this.curMark = new LogMark(logId, logPosition);
}
@@ -200,6 +211,11 @@ class Journal extends Thread implements
}
}
}
+
+ @Override
+ public String toString() {
+ return curMark.toString();
+ }
}
/**
@@ -245,52 +261,240 @@ class Journal extends Thread implements
* Journal Entry to Record
*/
private static class QueueEntry {
+ ByteBuffer entry;
+ long ledgerId;
+ long entryId;
+ WriteCallback cb;
+ Object ctx;
+ long enqueueTime;
+
QueueEntry(ByteBuffer entry, long ledgerId, long entryId,
- WriteCallback cb, Object ctx) {
+ WriteCallback cb, Object ctx, long enqueueTime) {
this.entry = entry.duplicate();
this.cb = cb;
this.ctx = ctx;
this.ledgerId = ledgerId;
this.entryId = entryId;
+ this.enqueueTime = enqueueTime;
}
- ByteBuffer entry;
+ public void callback() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
+ }
+ cb.writeComplete(0, ledgerId, entryId, null, ctx);
+ }
+ }
- long ledgerId;
+ private class ForceWriteRequest implements Runnable {
+ private final JournalChannel logFile;
+ private final LinkedList<QueueEntry> forceWriteWaiters;
+ private boolean shouldClose;
+ private final boolean isMarker;
+ private final long lastFlushedPosition;
+ private final long logId;
- long entryId;
+ private ForceWriteRequest(JournalChannel logFile,
+ long logId,
+ long lastFlushedPosition,
+ LinkedList<QueueEntry> forceWriteWaiters,
+ boolean shouldClose,
+ boolean isMarker) {
+ this.forceWriteWaiters = forceWriteWaiters;
+ this.logFile = logFile;
+ this.logId = logId;
+ this.lastFlushedPosition = lastFlushedPosition;
+ this.shouldClose = shouldClose;
+ this.isMarker = isMarker;
+ }
- WriteCallback cb;
+ public int process(boolean shouldForceWrite) throws IOException {
+ if (isMarker) {
+ return 0;
+ }
- Object ctx;
+ try {
+ if (shouldForceWrite) {
+ this.logFile.forceWrite(false);
+ }
+ lastLogMark.setCurLogMark(this.logId, this.lastFlushedPosition);
+
+ // Notify the waiters that the force write succeeded
+ cbThreadPool.submit(this);
+
+ return this.forceWriteWaiters.size();
+ }
+ finally {
+ closeFileIfNecessary();
+ }
+ }
+
+ @Override
+ public void run() {
+ for (QueueEntry e : this.forceWriteWaiters) {
+ e.callback(); // Process cbs inline
+ }
+ }
+
+ public void closeFileIfNecessary() {
+ // Close if shouldClose is set
+ if (shouldClose) {
+ // We should guard against exceptions so its
+ // safe to call in catch blocks
+ try {
+ logFile.close();
+ // Call close only once
+ shouldClose = false;
+ }
+ catch (IOException ioe) {
+ LOG.error("I/O exception while closing file", ioe);
+ }
+ }
+ }
+ }
+
+ /**
+ * ForceWriteThread is a background thread which makes the journal durable periodically
+ *
+ */
+ private class ForceWriteThread extends BookieThread {
+ volatile boolean running = true;
+ // This holds the queue entries that should be notified after a
+ // successful force write
+ Thread threadToNotifyOnEx;
+ // should we group force writes
+ private final boolean enableGroupForceWrites;
+ // make flush interval as a parameter
+ public ForceWriteThread(Thread threadToNotifyOnEx, boolean enableGroupForceWrites) {
+ super("ForceWriteThread");
+ this.threadToNotifyOnEx = threadToNotifyOnEx;
+ this.enableGroupForceWrites = enableGroupForceWrites;
+ }
+ @Override
+ public void run() {
+ LOG.info("ForceWrite Thread started");
+ boolean shouldForceWrite = true;
+ int numReqInLastForceWrite = 0;
+ while(running) {
+ ForceWriteRequest req = null;
+ try {
+ req = forceWriteRequests.take();
+
+ // Force write the file and then notify the write completions
+ //
+ if (!req.isMarker) {
+ if (shouldForceWrite) {
+ // if we are going to force write, any request that is already in the
+ // queue will benefit from this force write - post a marker prior to issuing
+ // the flush so until this marker is encountered we can skip the force write
+ if (enableGroupForceWrites) {
+ forceWriteRequests.put(new ForceWriteRequest(req.logFile, 0, 0, null, false, true));
+ }
+
+ // If we are about to issue a write, record the number of requests in
+ // the last force write and then reset the counter so we can accumulate
+ // requests in the write we are about to issue
+ if (numReqInLastForceWrite > 0) {
+ numReqInLastForceWrite = 0;
+ }
+ }
+ numReqInLastForceWrite += req.process(shouldForceWrite);
+ }
+
+ if (enableGroupForceWrites &&
+ // if its a marker we should switch back to flushing
+ !req.isMarker &&
+ // This indicates that this is the last request in a given file
+ // so subsequent requests will go to a different file so we should
+ // flush on the next request
+ !req.shouldClose) {
+ shouldForceWrite = false;
+ }
+ else {
+ shouldForceWrite = true;
+ }
+ } catch (IOException ioe) {
+ LOG.error("I/O exception in ForceWrite thread", ioe);
+ running = false;
+ } catch (InterruptedException e) {
+ LOG.error("ForceWrite thread interrupted", e);
+ // close is idempotent
+ if (null != req) {
+ req.closeFileIfNecessary();
+ }
+ running = false;
+ }
+ }
+ // Regardless of what caused us to exit, we should notify the
+ // the parent thread as it should either exit or be in the process
+ // of exiting else we will have write requests hang
+ threadToNotifyOnEx.interrupt();
+ }
+ // shutdown sync thread
+ void shutdown() throws InterruptedException {
+ running = false;
+ this.interrupt();
+ this.join();
+ }
}
final static long MB = 1024 * 1024L;
+ final static int KB = 1024;
// max journal file size
final long maxJournalSize;
+ // pre-allocation size for the journal files
+ final long journalPreAllocSize;
+ // write buffer size for the journal files
+ final int journalWriteBufferSize;
// number journal files kept before marked journal
final int maxBackupJournals;
final File journalDirectory;
final ServerConfiguration conf;
+ ForceWriteThread forceWriteThread;
+ // should we group force writes
+ private final boolean enableGroupForceWrites;
+ // Time after which we will stop grouping and issue the flush
+ private final long maxGroupWaitInMSec;
+ // Threshold after which we flush any buffered journal writes
+ private final long bufferedWritesThreshold;
+ // should we flush if the queue is empty
+ private final boolean flushWhenQueueEmpty;
// should we hint the filesystem to remove pages from cache after force write
private final boolean removePagesFromCache;
- private LastLogMark lastLogMark = new LastLogMark(0, 0);
+ private final LastLogMark lastLogMark = new LastLogMark(0, 0);
+
+ /**
+ * The thread pool used to handle callback.
+ */
+ private final ExecutorService cbThreadPool;
// journal entry queue to commit
LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
+ LinkedBlockingQueue<ForceWriteRequest> forceWriteRequests = new LinkedBlockingQueue<ForceWriteRequest>();
volatile boolean running = true;
- private LedgerDirsManager ledgerDirsManager;
+ private final LedgerDirsManager ledgerDirsManager;
public Journal(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager) {
super("BookieJournal-" + conf.getBookiePort());
this.ledgerDirsManager = ledgerDirsManager;
this.conf = conf;
this.journalDirectory = Bookie.getCurrentDirectory(conf.getJournalDir());
- this.maxJournalSize = conf.getMaxJournalSize() * MB;
+ this.maxJournalSize = conf.getMaxJournalSizeMB() * MB;
+ this.journalPreAllocSize = conf.getJournalPreAllocSizeMB() * MB;
+ this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB;
this.maxBackupJournals = conf.getMaxBackupJournals();
+ this.enableGroupForceWrites = conf.getJournalAdaptiveGroupWrites();
+ this.forceWriteThread = new ForceWriteThread(this, enableGroupForceWrites);
+ this.maxGroupWaitInMSec = conf.getJournalMaxGroupWaitMSec();
+ this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold();
+ this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumAddWorkerThreads(), new DaemonThreadFactory());
+
+ // Unless there is a cap on the max wait (which requires group force writes)
+ // we cannot skip flushing for queue empty
+ this.flushWhenQueueEmpty = !enableGroupForceWrites || conf.getJournalFlushWhenQueueEmpty();
this.removePagesFromCache = conf.getJournalRemovePagesFromCache();
// read last log mark
@@ -362,9 +566,9 @@ class Journal extends Thread implements
throws IOException {
JournalChannel recLog;
if (journalPos <= 0) {
- recLog = new JournalChannel(journalDirectory, journalId);
+ recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize);
} else {
- recLog = new JournalChannel(journalDirectory, journalId, journalPos);
+ recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, journalPos);
}
int journalVersion = recLog.getFormatVersion();
try {
@@ -448,7 +652,7 @@ class Journal extends Thread implements
long ledgerId = entry.getLong();
long entryId = entry.getLong();
entry.rewind();
- queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx));
+ queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano()));
}
/**
@@ -480,6 +684,7 @@ class Journal extends Thread implements
LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
ByteBuffer lenBuff = ByteBuffer.allocate(4);
JournalChannel logFile = null;
+ forceWriteThread.start();
try {
List<Long> journalIds = listJournalIds(journalDirectory, null);
// Should not use MathUtils.now(), which use System.nanoTime() and
@@ -494,7 +699,11 @@ class Journal extends Thread implements
// new journal file to write
if (null == logFile) {
logId = logId + 1;
- logFile = new JournalChannel(journalDirectory, logId, removePagesFromCache);
+ logFile = new JournalChannel(journalDirectory,
+ logId,
+ journalPreAllocSize,
+ journalWriteBufferSize,
+ removePagesFromCache);
bc = logFile.getBufferedChannel();
lastFlushPosition = 0;
@@ -504,25 +713,43 @@ class Journal extends Thread implements
if (toFlush.isEmpty()) {
qe = queue.take();
} else {
- qe = queue.poll();
- if (qe == null || bc.position() > lastFlushPosition + 512*1024) {
- //logFile.force(false);
+ long pollWaitTime = maxGroupWaitInMSec - MathUtils.elapsedMSec(toFlush.getFirst().enqueueTime);
+ if (flushWhenQueueEmpty || pollWaitTime < 0) {
+ pollWaitTime = 0;
+ }
+ qe = queue.poll(pollWaitTime, TimeUnit.MILLISECONDS);
+ boolean shouldFlush = false;
+ // We should issue a forceWrite if any of the three conditions below holds good
+ // 1. If the oldest pending entry has been pending for longer than the max wait time
+ if (enableGroupForceWrites && (MathUtils.elapsedMSec(toFlush.getFirst().enqueueTime) > maxGroupWaitInMSec)) {
+ shouldFlush = true;
+ } else if ((bc.position() > lastFlushPosition + bufferedWritesThreshold)) {
+ // 2. If we have buffered more than the buffWriteThreshold
+ shouldFlush = true;
+ } else if (qe == null) {
+ // We should get here only if we flushWhenQueueEmpty is true else we would wait
+ // for timeout that would put is past the maxWait threshold
+ // 3. If the queue is empty i.e. no benefit of grouping. This happens when we have one
+ // publish at a time - common case in tests.
+ shouldFlush = true;
+ }
+
+ // toFlush is non null and not empty so should be safe to access getFirst
+ if (shouldFlush) {
bc.flush(false);
- // This separation of flush and force is useful when adaptive group
- // force write is used where the flush thread does not block while
- // the force is issued by a separate thread
- logFile.forceWrite(false);
lastFlushPosition = bc.position();
- lastLogMark.setCurLogMark(logId, lastFlushPosition);
- for (QueueEntry e : toFlush) {
- e.cb.writeComplete(BookieException.Code.OK,
- e.ledgerId, e.entryId, null, e.ctx);
+
+ // Trace the lifetime of entries through persistence
+ if (LOG.isDebugEnabled()) {
+ for (QueueEntry e : toFlush) {
+ LOG.debug("Written and queuing for flush Ledger:" + e.ledgerId + " Entry:" + e.entryId);
+ }
}
- toFlush.clear();
+ forceWriteRequests.put(new ForceWriteRequest(logFile, logId, lastFlushPosition, toFlush, (lastFlushPosition > maxJournalSize), false));
+ toFlush = new LinkedList<QueueEntry>();
// check whether journal file is over file limit
if (bc.position() > maxJournalSize) {
- logFile.close();
logFile = null;
continue;
}
@@ -548,8 +775,10 @@ class Journal extends Thread implements
bc.write(lenBuff);
bc.write(qe.entry);
+ // NOTE: preAlloc depends on the fact that we don't change file size while this is
+ // called or useful parts of the file will be zeroed out - in other words
+ // it depends on single threaded flushes to the JournalChannel
logFile.preAllocIfNeeded();
-
toFlush.add(qe);
qe = null;
}
@@ -560,6 +789,11 @@ class Journal extends Thread implements
} catch (InterruptedException ie) {
LOG.warn("Journal exits when shutting down", ie);
} finally {
+ // There could be packets queued for forceWrite on this logFile
+ // That is fine as this exception is going to anyway take down the
+ // the bookie. If we execute this as a part of graceful shutdown,
+ // close will flush the file system cache making any previous
+ // cached writes durable so this is fine as well.
IOUtils.close(LOG, logFile);
}
LOG.info("Journal exited loop!");
@@ -574,6 +808,13 @@ class Journal extends Thread implements
return;
}
LOG.info("Shutting down Journal");
+ forceWriteThread.shutdown();
+ cbThreadPool.shutdown();
+ if (!cbThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
+ LOG.warn("Couldn't shutdown journal callback thread gracefully. Forcing");
+ }
+ cbThreadPool.shutdownNow();
+
running = false;
this.interrupt();
this.join();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java Sat Oct 12 03:42:21 2013
@@ -33,6 +33,7 @@ import org.apache.bookkeeper.util.Native
import static com.google.common.base.Charsets.UTF_8;
+import org.apache.bookkeeper.util.NativeIO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,25 +58,35 @@ class JournalChannel implements Closeabl
int MIN_COMPAT_JOURNAL_FORMAT_VERSION = 1;
int CURRENT_JOURNAL_FORMAT_VERSION = 4;
- public final static long preAllocSize = 4*1024*1024;
- public final static ByteBuffer zeros = ByteBuffer.allocate(512);
+ private long preAllocSize;
private boolean fRemoveFromPageCache;
+ public final static ByteBuffer zeros = ByteBuffer.allocate(512);
+
// The position of the file channel's last force write.
private long lastForceWritePosition = 0;
+ // Mostly used by tests
JournalChannel(File journalDirectory, long logId) throws IOException {
- this(journalDirectory, logId, START_OF_FILE, false);
+ this(journalDirectory, logId, 4*1024*1024, 65536, START_OF_FILE);
}
- JournalChannel(File journalDirectory, long logId, long position) throws IOException {
- this(journalDirectory, logId, position, false);
+ JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize) throws IOException {
+ this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE);
}
- JournalChannel(File journalDirectory, long logId, boolean fRemoveFromPageCache) throws IOException {
- this(journalDirectory, logId, START_OF_FILE, fRemoveFromPageCache);
+ JournalChannel(File journalDirectory, long logId,
+ long preAllocSize, int writeBufferSize, long position) throws IOException {
+ this(journalDirectory, logId, preAllocSize, writeBufferSize, position, false);
}
- JournalChannel(File journalDirectory, long logId, long position, boolean fRemoveFromPageCache) throws IOException {
+ JournalChannel(File journalDirectory, long logId,
+ long preAllocSize, int writeBufferSize, boolean fRemoveFromPageCache) throws IOException {
+ this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE, fRemoveFromPageCache);
+ }
+
+ JournalChannel(File journalDirectory, long logId,
+ long preAllocSize, int writeBufferSize, long position, boolean fRemoveFromPageCache) throws IOException {
+ this.preAllocSize = preAllocSize;
this.fRemoveFromPageCache = fRemoveFromPageCache;
File fn = new File(journalDirectory, Long.toHexString(logId) + ".txn");
@@ -97,8 +108,7 @@ class JournalChannel implements Closeabl
bb.flip();
fc.write(bb);
- bc = new BufferedChannel(fc, 65536);
-
+ bc = new BufferedChannel(fc, writeBufferSize);
forceWrite(true);
nextPrealloc = preAllocSize;
fc.write(zeros, nextPrealloc);
@@ -184,6 +194,9 @@ class JournalChannel implements Closeabl
}
public void forceWrite(boolean forceMetadata) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Journal ForceWrite");
+ }
long newForceWritePosition = bc.forceWrite(forceMetadata);
if (newForceWritePosition > lastForceWritePosition) {
if (fRemoveFromPageCache) {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java Sat Oct 12 03:42:21 2013
@@ -21,25 +21,22 @@
package org.apache.bookkeeper.bookie;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
-
-import com.google.common.annotations.VisibleForTesting;
-
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
-
-import java.io.IOException;
-
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* SyncThread is a background thread which help checkpointing ledger storage
* when a checkpoint is requested. After a ledger storage is checkpointed,
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Sat Oct 12 03:42:21 2013
@@ -24,6 +24,8 @@ import com.google.common.annotations.Bet
import org.apache.commons.lang.StringUtils;
+import com.google.common.annotations.Beta;
+
/**
* Configuration manages server-side settings
*/
@@ -48,7 +50,13 @@ public class ServerConfiguration extends
// Journal Parameters
protected final static String MAX_JOURNAL_SIZE = "journalMaxSizeMB";
protected final static String MAX_BACKUP_JOURNALS = "journalMaxBackups";
+ protected final static String JOURNAL_ADAPTIVE_GROUP_WRITES = "journalAdaptiveGroupWrites";
+ protected final static String JOURNAL_MAX_GROUP_WAIT_MSEC = "journalMaxGroupWaitMSec";
+ protected final static String JOURNAL_BUFFERED_WRITES_THRESHOLD = "journalBufferedWritesThreshold";
+ protected final static String JOURNAL_FLUSH_WHEN_QUEUE_EMPTY = "journalFlushWhenQueueEmpty";
protected final static String JOURNAL_REMOVE_FROM_PAGE_CACHE = "journalRemoveFromPageCache";
+ protected final static String JOURNAL_PRE_ALLOC_SIZE = "journalPreAllocSizeMB";
+ protected final static String JOURNAL_WRITE_BUFFER_SIZE = "journalWriteBufferSizeKB";
// Bookie Parameters
protected final static String BOOKIE_PORT = "bookiePort";
protected final static String LISTENING_INTERFACE = "listeningInterface";
@@ -72,6 +80,12 @@ public class ServerConfiguration extends
protected final static String AUDITOR_PERIODIC_CHECK_INTERVAL = "auditorPeriodicCheckInterval";
protected final static String AUTO_RECOVERY_DAEMON_ENABLED = "autoRecoveryDaemonEnabled";
+ // Worker Thread parameters.
+ protected final static String NUM_ADD_WORKER_THREADS = "numAddWorkerThreads";
+
+ protected final static String READ_BUFFER_SIZE = "readBufferSizeBytes";
+ protected final static String WRITE_BUFFER_SIZE = "writeBufferSizeBytes";
+
/**
* Construct a default configuration object
*/
@@ -231,7 +245,7 @@ public class ServerConfiguration extends
*
* @return max journal file size
*/
- public long getMaxJournalSize() {
+ public long getMaxJournalSizeMB() {
return this.getLong(MAX_JOURNAL_SIZE, 2 * 1024);
}
@@ -242,12 +256,30 @@ public class ServerConfiguration extends
* new max journal file size
* @return server configuration
*/
- public ServerConfiguration setMaxJournalSize(long maxJournalSize) {
+ public ServerConfiguration setMaxJournalSizeMB(long maxJournalSize) {
this.setProperty(MAX_JOURNAL_SIZE, Long.toString(maxJournalSize));
return this;
}
/**
+ * How much space should we pre-allocate at a time in the journal
+ *
+ * @return journal pre-allocation size in MB
+ */
+ public int getJournalPreAllocSizeMB() {
+ return this.getInt(JOURNAL_PRE_ALLOC_SIZE, 16);
+ }
+
+ /**
+ * Size of the write buffers used for the journal
+ *
+ * @return journal write buffer size in KB
+ */
+ public int getJournalWriteBufferSizeKB() {
+ return this.getInt(JOURNAL_WRITE_BUFFER_SIZE, 64);
+ }
+
+ /**
* Max number of older journal files kept
*
* @return max number of older journal files to kept
@@ -648,6 +680,125 @@ public class ServerConfiguration extends
}
/**
+ * Get the number of threads that should handle write requests.
+ * @return
+ */
+ public int getNumAddWorkerThreads() {
+ return getInt(NUM_ADD_WORKER_THREADS, 1);
+ }
+
+ /**
+ * Get the number of bytes we should use as capacity for the {@link
+ * org.apache.bookkeeper.bookie.BufferedReadChannel}
+ * Default is 512 bytes
+ * @return read buffer size
+ */
+ public int getReadBufferBytes() {
+ return getInt(READ_BUFFER_SIZE, 512);
+ }
+
+ /**
+ * Set the number of bytes we should use as capacity for the {@link
+ * org.apache.bookkeeper.bookie.BufferedReadChannel}
+ *
+ * @param readBufferSize
+ * Read Buffer Size
+ * @return server configuration
+ */
+ public ServerConfiguration setReadBufferBytes(int readBufferSize) {
+ setProperty(READ_BUFFER_SIZE, readBufferSize);
+ return this;
+ }
+
+ /**
+ * Get the number of bytes used as capacity for the write buffer. Default is
+ * 64KB.
+ * NOTE: Make sure this value is greater than the maximum message size.
+ * @return
+ */
+ public int getWriteBufferBytes() {
+ return getInt(WRITE_BUFFER_SIZE, 65536);
+ }
+
+ /**
+ * Set the number of bytes used as capacity for the write buffer.
+ *
+ * @param writeBufferBytes
+ * Write Buffer Bytes
+ * @return server configuration
+ */
+ public ServerConfiguration setWriteBufferBytes(int writeBufferBytes) {
+ setProperty(WRITE_BUFFER_SIZE, writeBufferBytes);
+ return this;
+ }
+
+
+ /**
+ * Should we group journal force writes
+ *
+ * @return group journal force writes
+ */
+ public boolean getJournalAdaptiveGroupWrites() {
+ return getBoolean(JOURNAL_ADAPTIVE_GROUP_WRITES, true);
+ }
+
+ /**
+ * Enable/disable group journal force writes
+ *
+ * @param enabled flag to enable/disable group journal force writes
+ */
+ public ServerConfiguration setJournalAdaptiveGroupWrites(boolean enabled) {
+ setProperty(JOURNAL_ADAPTIVE_GROUP_WRITES, enabled);
+ return this;
+ }
+
+ /**
+ * Maximum latency to impose on a journal write to achieve grouping
+ *
+ * @return max wait for grouping
+ */
+ public long getJournalMaxGroupWaitMSec() {
+ return getLong(JOURNAL_MAX_GROUP_WAIT_MSEC, 200);
+ }
+
+ /**
+ * Maximum latency to impose on a journal write to achieve grouping
+ *
+ * @return max wait for grouping
+ */
+ public long getJournalBufferedWritesThreshold() {
+ return getLong(JOURNAL_BUFFERED_WRITES_THRESHOLD, 512 * 1024);
+ }
+
+
+ /**
+ * Set if we should flush the journal when queue is empty
+ */
+ public ServerConfiguration setJournalFlushWhenQueueEmpty(boolean enabled) {
+ setProperty(JOURNAL_FLUSH_WHEN_QUEUE_EMPTY, enabled);
+ return this;
+ }
+
+ /**
+ * Should we flush the journal when queue is empty
+ *
+ * @return flush when queue is empty
+ */
+ public boolean getJournalFlushWhenQueueEmpty() {
+ return getBoolean(JOURNAL_FLUSH_WHEN_QUEUE_EMPTY, false);
+ }
+
+ /**
+ * Should we remove pages from page cache after force write
+ *
+ * @return remove pages from cache
+ */
+ @Beta
+ public boolean getJournalRemovePagesFromCache() {
+ return getBoolean(JOURNAL_REMOVE_FROM_PAGE_CACHE, false);
+ }
+
+ /**
* Set whether the bookie is able to go into read-only mode.
* If this is set to false, the bookie will shutdown on encountering
* an error condition.
@@ -757,25 +908,4 @@ public class ServerConfiguration extends
return getBoolean(AUTO_RECOVERY_DAEMON_ENABLED, false);
}
- /**
- * Should we remove pages from page cache after force write
- *
- * @return remove pages from cache
- */
- @Beta
- public boolean getJournalRemovePagesFromCache() {
- return getBoolean(JOURNAL_REMOVE_FROM_PAGE_CACHE, false);
- }
-
- /**
- * Sets that whether should we remove pages from page cache after force write.
- *
- * @param enabled
- * - true if we need to remove pages from page cache. otherwise, false
- * @return ServerConfiguration
- */
- public ServerConfiguration setJournalRemovePagesFromCache(boolean enabled) {
- setProperty(JOURNAL_REMOVE_FROM_PAGE_CACHE, enabled);
- return this;
- }
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DaemonThreadFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DaemonThreadFactory.java?rev=1531494&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DaemonThreadFactory.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DaemonThreadFactory.java Sat Oct 12 03:42:21 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.util;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.Executors;
+
+public class DaemonThreadFactory implements ThreadFactory {
+ private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
+ private int priority = Thread.NORM_PRIORITY;
+ public DaemonThreadFactory() {
+ }
+ public DaemonThreadFactory(int priority) {
+ assert priority >= Thread.MIN_PRIORITY && priority <= Thread.MAX_PRIORITY;
+ this.priority = priority;
+ }
+ public Thread newThread(Runnable r) {
+ Thread thread = defaultThreadFactory.newThread(r);
+ thread.setDaemon(true);
+ thread.setPriority(priority);
+ return thread;
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java Sat Oct 12 03:42:21 2013
@@ -48,4 +48,29 @@ public class MathUtils {
return System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
}
+ /**
+ * Current time from some arbitrary time base in the past, counting in
+ * nanoseconds, and not affected by settimeofday or similar system clock
+ * changes. This is appropriate to use when computing how much longer to
+ * wait for an interval to expire.
+ *
+ * NOTE: only use it for measuring.
+ * http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
+ *
+ * @return current time in nanoseconds.
+ */
+ public static long nowInNano() {
+ return System.nanoTime();
+ }
+
+ /**
+ * Milliseconds elapsed since the time specified, the input is nanoTime
+ * the only conversion happens when computing the elapsed time
+ *
+ * @param startNanoTime the start of the interval that we are measuring
+ * @return elapsed time in milliseconds.
+ */
+ public static long elapsedMSec (long startNanoTime) {
+ return (System.nanoTime() - startNanoTime)/ NANOSECONDS_PER_MILLISECOND;
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java Sat Oct 12 03:42:21 2013
@@ -35,6 +35,7 @@ import junit.framework.Assert;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.client.BookKeeperAdmin;
@@ -132,7 +133,7 @@ public class BookieInitializationTest {
tmpDir.delete();
tmpDir.mkdir();
- final ServerConfiguration conf = new ServerConfiguration()
+ final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(null).setJournalDirName(tmpDir.getPath())
.setLedgerDirNames(new String[] { tmpDir.getPath() });
@@ -198,7 +199,7 @@ public class BookieInitializationTest {
tmpDir.delete();
tmpDir.mkdir();
- ServerConfiguration conf = new ServerConfiguration().setZkServers(null)
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration().setZkServers(null)
.setJournalDirName(tmpDir.getPath()).setLedgerDirNames(
new String[] { tmpDir.getPath() });
@@ -253,7 +254,7 @@ public class BookieInitializationTest {
tmpDir.delete();
tmpDir.mkdir();
- ServerConfiguration conf = new ServerConfiguration();
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
int port = 12555;
conf.setZkServers(null).setBookiePort(port).setJournalDirName(
tmpDir.getPath()).setLedgerDirNames(
@@ -285,7 +286,7 @@ public class BookieInitializationTest {
tmpDir.delete();
tmpDir.mkdir();
- final ServerConfiguration conf = new ServerConfiguration()
+ final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(zkutil.getZooKeeperConnectString())
.setZkTimeout(5000).setJournalDirName(tmpDir.getPath())
.setLedgerDirNames(new String[] { tmpDir.getPath() });
@@ -310,7 +311,7 @@ public class BookieInitializationTest {
tmpDir.mkdir();
final String ZK_ROOT = "/ledgers2";
- final ServerConfiguration conf = new ServerConfiguration()
+ final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(zkutil.getZooKeeperConnectString())
.setZkTimeout(5000).setJournalDirName(tmpDir.getPath())
.setLedgerDirNames(new String[] { tmpDir.getPath() });
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java Sat Oct 12 03:42:21 2013
@@ -36,6 +36,7 @@ import java.util.Random;
import org.apache.bookkeeper.client.ClientUtil;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -204,7 +205,7 @@ public class BookieJournalTest {
writePreV2Journal(Bookie.getCurrentDirectory(journalDir), 100);
writeIndexFileForLedger(Bookie.getCurrentDirectory(ledgerDir), 1, "testPasswd".getBytes());
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(null)
.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -242,7 +243,7 @@ public class BookieJournalTest {
writeJunkJournal(Bookie.getCurrentDirectory(journalDir));
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(null)
.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -280,7 +281,7 @@ public class BookieJournalTest {
writePreV2Journal(Bookie.getCurrentDirectory(journalDir), 0);
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(null)
.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -306,7 +307,7 @@ public class BookieJournalTest {
writePostV2Journal(Bookie.getCurrentDirectory(journalDir), 0);
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(null)
.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -336,7 +337,7 @@ public class BookieJournalTest {
writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(null)
.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -380,7 +381,7 @@ public class BookieJournalTest {
writeIndexFileForLedger(Bookie.getCurrentDirectory(ledgerDir),
1, "testPasswd".getBytes());
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(null)
.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -428,7 +429,7 @@ public class BookieJournalTest {
writeIndexFileForLedger(Bookie.getCurrentDirectory(ledgerDir),
1, "testPasswd".getBytes());
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(null)
.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -494,7 +495,7 @@ public class BookieJournalTest {
writePartialIndexFileForLedger(Bookie.getCurrentDirectory(ledgerDir),
1, "testPasswd".getBytes(), truncateMasterKey);
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(null)
.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -556,7 +557,7 @@ public class BookieJournalTest {
writePartialIndexFileForLedger(Bookie.getCurrentDirectory(ledgerDir), 1, masterKey,
truncateMasterKey);
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(null)
.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() });
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java Sat Oct 12 03:42:21 2013
@@ -21,23 +21,27 @@
package org.apache.bookkeeper.bookie;
-import org.apache.commons.io.FileUtils;
+import static org.apache.bookkeeper.bookie.UpgradeTest.newV1JournalDirectory;
+import static org.apache.bookkeeper.bookie.UpgradeTest.newV1LedgerDirectory;
+import static org.apache.bookkeeper.bookie.UpgradeTest.newV2JournalDirectory;
+import static org.apache.bookkeeper.bookie.UpgradeTest.newV2LedgerDirectory;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.test.PortManager;
+import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.ZooKeeper;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.junit.Test;
import org.junit.After;
import org.junit.Before;
-import static org.junit.Assert.*;
-
-import static org.apache.bookkeeper.bookie.UpgradeTest.*;
+import org.junit.Test;
public class CookieTest {
ZooKeeperUtil zkutil;
@@ -75,7 +79,7 @@ public class CookieTest {
*/
@Test(timeout=60000)
public void testCleanStart() throws Exception {
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(zkutil.getZooKeeperConnectString())
.setJournalDirName(newDirectory(false))
.setLedgerDirNames(new String[] { newDirectory(false) })
@@ -94,7 +98,7 @@ public class CookieTest {
*/
@Test(timeout=60000)
public void testBadJournalCookie() throws Exception {
- ServerConfiguration conf1 = new ServerConfiguration()
+ ServerConfiguration conf1 = TestBKConfiguration.newServerConfiguration()
.setJournalDirName(newDirectory())
.setLedgerDirNames(new String[] { newDirectory() })
.setBookiePort(bookiePort);
@@ -103,7 +107,7 @@ public class CookieTest {
String journalDir = newDirectory();
String ledgerDir = newDirectory();
- ServerConfiguration conf2 = new ServerConfiguration()
+ ServerConfiguration conf2 = TestBKConfiguration.newServerConfiguration()
.setZkServers(zkutil.getZooKeeperConnectString())
.setJournalDirName(journalDir)
.setLedgerDirNames(new String[] { ledgerDir })
@@ -130,7 +134,7 @@ public class CookieTest {
String[] ledgerDirs = new String[] {
newDirectory(), newDirectory(), newDirectory() };
String journalDir = newDirectory();
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(zkutil.getZooKeeperConnectString())
.setJournalDirName(journalDir)
.setLedgerDirNames(ledgerDirs)
@@ -171,7 +175,7 @@ public class CookieTest {
public void testDirectoryAdded() throws Exception {
String ledgerDir0 = newDirectory();
String journalDir = newDirectory();
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(zkutil.getZooKeeperConnectString())
.setJournalDirName(journalDir)
.setLedgerDirNames(new String[] { ledgerDir0 })
@@ -203,7 +207,7 @@ public class CookieTest {
public void testDirectoryCleared() throws Exception {
String ledgerDir0 = newDirectory();
String journalDir = newDirectory();
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(zkutil.getZooKeeperConnectString())
.setJournalDirName(journalDir)
.setLedgerDirNames(new String[] { ledgerDir0 , newDirectory() })
@@ -228,7 +232,7 @@ public class CookieTest {
*/
@Test(timeout=60000)
public void testBookiePortChanged() throws Exception {
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(zkutil.getZooKeeperConnectString())
.setJournalDirName(newDirectory())
.setLedgerDirNames(new String[] { newDirectory() , newDirectory() })
@@ -254,7 +258,7 @@ public class CookieTest {
*/
@Test(timeout=60000)
public void testNewBookieStartingWithAnotherBookiesPort() throws Exception {
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(zkutil.getZooKeeperConnectString())
.setJournalDirName(newDirectory())
.setLedgerDirNames(new String[] { newDirectory() , newDirectory() })
@@ -263,7 +267,7 @@ public class CookieTest {
b.start();
b.shutdown();
- conf = new ServerConfiguration()
+ conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(zkutil.getZooKeeperConnectString())
.setJournalDirName(newDirectory())
.setLedgerDirNames(new String[] { newDirectory() , newDirectory() })
@@ -288,7 +292,7 @@ public class CookieTest {
// Format the BK Metadata and generate INSTANCEID
BookKeeperAdmin.format(adminConf, false, true);
- ServerConfiguration bookieConf = new ServerConfiguration()
+ ServerConfiguration bookieConf = TestBKConfiguration.newServerConfiguration()
.setZkServers(zkutil.getZooKeeperConnectString())
.setJournalDirName(newDirectory(false))
.setLedgerDirNames(new String[] { newDirectory(false) })
@@ -319,7 +323,7 @@ public class CookieTest {
*/
@Test(timeout=60000)
public void testV2data() throws Exception {
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(zkutil.getZooKeeperConnectString())
.setJournalDirName(newV2JournalDirectory())
.setLedgerDirNames(new String[] { newV2LedgerDirectory() })
@@ -339,7 +343,7 @@ public class CookieTest {
*/
@Test(timeout=60000)
public void testV1data() throws Exception {
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(zkutil.getZooKeeperConnectString())
.setJournalDirName(newV1JournalDirectory())
.setLedgerDirNames(new String[] { newV1LedgerDirectory() })
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java Sat Oct 12 03:42:21 2013
@@ -19,14 +19,14 @@
package org.apache.bookkeeper.bookie;
import java.io.File;
-import java.io.IOException;
-import org.apache.bookkeeper.conf.ServerConfiguration;
+import junit.framework.Assert;
-import org.junit.Test;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.junit.After;
import org.junit.Before;
-import junit.framework.Assert;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,7 +77,7 @@ public class CreateNewLogTest {
*/
@Test(timeout=60000)
public void testCreateNewLog() throws Exception {
- ServerConfiguration conf = new ServerConfiguration();
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
// Creating a new configuration with a number of
// ledger directories.
@@ -99,4 +99,4 @@ public class CreateNewLogTest {
Assert.assertTrue("Wrong log id", el.getCurrentLogId() > 1);
}
-}
\ No newline at end of file
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java Sat Oct 12 03:42:21 2013
@@ -31,11 +31,11 @@ import junit.framework.TestCase;
import org.apache.bookkeeper.bookie.GarbageCollectorThread.EntryLogMetadata;
import org.apache.bookkeeper.bookie.GarbageCollectorThread.ExtractionScanner;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +55,7 @@ public class EntryLogTest extends TestCa
Bookie.checkDirectoryStructure(curDir);
int gcWaitTime = 1000;
- ServerConfiguration conf = new ServerConfiguration();
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setGcWaitTime(gcWaitTime);
conf.setLedgerDirNames(new String[] {tmpDir.toString()});
Bookie bookie = new Bookie(conf);
@@ -106,7 +106,7 @@ public class EntryLogTest extends TestCa
File curDir = Bookie.getCurrentDirectory(tmpDir);
Bookie.checkDirectoryStructure(curDir);
- ServerConfiguration conf = new ServerConfiguration();
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setLedgerDirNames(new String[] {tmpDir.toString()});
Bookie bookie = new Bookie(conf);
// create some entries
@@ -167,7 +167,7 @@ public class EntryLogTest extends TestCa
throws Exception {
File tmpDir = File.createTempFile("bkTest", ".dir");
tmpDir.delete();
- ServerConfiguration conf = new ServerConfiguration();
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setLedgerDirNames(new String[] { tmpDir.toString() });
EntryLogger entryLogger = null;
try {
@@ -192,7 +192,7 @@ public class EntryLogTest extends TestCa
ledgerDir1.delete();
File ledgerDir2 = File.createTempFile("bkTest", ".dir");
ledgerDir2.delete();
- ServerConfiguration conf = new ServerConfiguration();
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(),
ledgerDir2.getAbsolutePath() });
Bookie bookie = new Bookie(conf);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java Sat Oct 12 03:42:21 2013
@@ -27,8 +27,11 @@ import java.nio.ByteBuffer;
import junit.framework.TestCase;
+import junit.framework.TestCase;
+
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.SnapshotMap;
@@ -70,7 +73,7 @@ public class LedgerCacheTest extends Tes
// create current dir
new File(ledgerDir, BookKeeperConstants.CURRENT_DIR).mkdir();
- conf = new ServerConfiguration();
+ conf = TestBKConfiguration.newServerConfiguration();
conf.setZkServers(null);
conf.setJournalDirName(txnDir.getPath());
conf.setLedgerDirNames(new String[] { ledgerDir.getPath() });
@@ -256,7 +259,7 @@ public class LedgerCacheTest extends Tes
ledgerDir1.delete();
File ledgerDir2 = File.createTempFile("bkTest", ".dir");
ledgerDir2.delete();
- ServerConfiguration conf = new ServerConfiguration();
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(), ledgerDir2.getAbsolutePath() });
Bookie bookie = new Bookie(conf);
@@ -312,7 +315,7 @@ public class LedgerCacheTest extends Tes
ledgerDir.mkdir();
Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(null)
.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
@@ -326,7 +329,7 @@ public class LedgerCacheTest extends Tes
b.addEntry(packet, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
}
- conf = new ServerConfiguration()
+ conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(null)
.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() });
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java Sat Oct 12 03:42:21 2013
@@ -26,16 +26,16 @@ import junit.framework.TestCase;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.junit.Before;
import org.junit.Test;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestLedgerDirsManager extends TestCase {
static Logger LOG = LoggerFactory.getLogger(TestLedgerDirsManager.class);
- ServerConfiguration conf = new ServerConfiguration();
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
File curDir;
LedgerDirsManager dirsManager;
@@ -47,7 +47,7 @@ public class TestLedgerDirsManager exten
curDir = Bookie.getCurrentDirectory(tmpDir);
Bookie.checkDirectoryStructure(curDir);
- ServerConfiguration conf = new ServerConfiguration();
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setLedgerDirNames(new String[] {tmpDir.toString()});
dirsManager = new LedgerDirsManager(conf);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java Sat Oct 12 03:42:21 2013
@@ -37,6 +37,7 @@ import java.util.Arrays;
import org.apache.bookkeeper.client.ClientUtil;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.test.ZooKeeperUtil;
import org.apache.zookeeper.ZooKeeper;
@@ -159,7 +160,7 @@ public class UpgradeTest {
}
private static void testUpgradeProceedure(String zkServers, String journalDir, String ledgerDir) throws Exception {
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(zkServers)
.setJournalDirName(journalDir)
.setLedgerDirNames(new String[] { ledgerDir })
@@ -216,7 +217,7 @@ public class UpgradeTest {
String ledgerDir = newV2LedgerDirectory();
testUpgradeProceedure(zkutil.getZooKeeperConnectString(), journalDir, ledgerDir);
// Upgrade again
- ServerConfiguration conf = new ServerConfiguration()
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
.setZkServers(zkutil.getZooKeeperConnectString())
.setJournalDirName(journalDir)
.setLedgerDirNames(new String[] { ledgerDir })
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java Sat Oct 12 03:42:21 2013
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
@@ -58,8 +59,8 @@ import org.apache.commons.lang.StringUti
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -92,7 +93,7 @@ public class TestLedgerUnderreplicationM
zkUtil = new ZooKeeperUtil();
zkUtil.startServer();
- conf = new ServerConfiguration().setZkServers(zkUtil.getZooKeeperConnectString());
+ conf = TestBKConfiguration.newServerConfiguration().setZkServers(zkUtil.getZooKeeperConnectString());
executor = Executors.newCachedThreadPool();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Sat Oct 12 03:42:21 2013
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.client.Book
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.metastore.InMemoryMetaStore;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.AutoRecoveryMain;
@@ -71,7 +72,7 @@ public abstract class BookKeeperClusterT
protected int numBookies;
protected BookKeeperTestClient bkc;
- protected ServerConfiguration baseConf = new ServerConfiguration();
+ protected ServerConfiguration baseConf = TestBKConfiguration.newServerConfiguration();
protected ClientConfiguration baseClientConf = new ClientConfiguration();
private Map<BookieServer, AutoRecoveryMain> autoRecoveryProcesses = new HashMap<BookieServer, AutoRecoveryMain>();
@@ -363,13 +364,11 @@ public abstract class BookKeeperClusterT
bs.clear();
Thread.sleep(1000);
// restart them to ensure we can't
- int j = 0;
for (ServerConfiguration conf : bsConfs) {
if (null != newConf) {
conf.loadConf(newConf);
}
bs.add(startBookie(conf));
- j++;
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java Sat Oct 12 03:42:21 2013
@@ -27,25 +27,26 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.Executors;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.junit.Test;
+import junit.framework.TestCase;
+
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import junit.framework.TestCase;
-
public class BookieClientTest extends TestCase {
static Logger LOG = LoggerFactory.getLogger(BookieClientTest.class);
BookieServer bs;
@@ -53,7 +54,7 @@ public class BookieClientTest extends Te
public int port = 13645;
public ClientSocketChannelFactory channelFactory;
public OrderedSafeExecutor executor;
- ServerConfiguration conf = new ServerConfiguration();
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
@Override
public void setUp() throws Exception {
@@ -64,7 +65,7 @@ public class BookieClientTest extends Te
// Since this test does not rely on the BookKeeper client needing to
// know via ZooKeeper which Bookies are available, okay, so pass in null
// for the zkServers input parameter when constructing the BookieServer.
- ServerConfiguration conf = new ServerConfiguration();
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setZkServers(null).setBookiePort(port)
.setJournalDirName(tmpDir.getPath())
.setLedgerDirNames(new String[] { tmpDir.getPath() });
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java?rev=1531494&r1=1531493&r2=1531494&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java Sat Oct 12 03:42:21 2013
@@ -22,25 +22,22 @@ package org.apache.bookkeeper.test;
*/
import java.io.File;
import java.util.Enumeration;
-
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.junit.Assert;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class tests that bookie rolling journals
@@ -59,7 +56,7 @@ public class BookieJournalRollingTest ex
@Override
public void setUp() throws Exception {
// Set up the configuration properties needed.
- baseConf.setMaxJournalSize(1);
+ baseConf.setMaxJournalSizeMB(1);
baseConf.setMaxBackupJournals(1);
super.setUp();
}
@@ -216,7 +213,7 @@ public class BookieJournalRollingTest ex
}
// set flush interval to a large value
- ServerConfiguration newConf = new ServerConfiguration();
+ ServerConfiguration newConf = TestBKConfiguration.newServerConfiguration();
newConf.setFlushInterval(999999999);
// restart bookies
restartBookies(newConf);
@@ -254,7 +251,7 @@ public class BookieJournalRollingTest ex
Thread.sleep(3 * baseConf.getFlushInterval());
// restart bookies with flush interval set to a large value
- ServerConfiguration newConf = new ServerConfiguration();
+ ServerConfiguration newConf = TestBKConfiguration.newServerConfiguration();
newConf.setFlushInterval(999999999);
// restart bookies
restartBookies(newConf);