You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/18 16:40:31 UTC
svn commit: r1399680 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/conf/
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/
bookkeeper-server/src/main/java/org/apache/bookk...
Author: ivank
Date: Thu Oct 18 14:40:30 2012
New Revision: 1399680
URL: http://svn.apache.org/viewvc?rev=1399680&view=rev
Log:
BOOKKEEPER-345: Detect IOExceptions on entrylogger and bookie should consider next ledger dir(if any) (Vinay via ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestDiskChecker.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Oct 18 14:40:30 2012
@@ -158,6 +158,8 @@ Trunk (unreleased changes)
BOOKKEEPER-315: Ledger entries should be replicated sequentially instead of parallel. (umamahesh via ivank)
+ BOOKKEEPER-345: Detect IOExceptions on entrylogger and bookie should consider next ledger dir(if any) (Vinay via ivank)
+
hedwig-server:
BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf Thu Oct 18 14:40:30 2012
@@ -159,3 +159,21 @@ zkTimeout=10000
# the limitation of number of index pages.
# pageLimit=-1
+#If all ledger directories configured are full, then support only read
+#requests for clients.
+#If "readOnlyModeEnabled=true" then on all ledger disks full, bookie will be converted
+#to read-only mode and serve only read requests. Else bookie will get shutdown.
+#By default this will be disabled.
+#readOnlyModeEnabled=false
+
+#For each ledger dir, maximum disk space which can be used.
+#Default is 0.95f. i.e. 95% of disk can be used at most after which nothing will
+#be written to that partition. If all ledger dir partions are full, then bookie
+#will turn to readonly mode if 'readOnlyModeEnabled=true' is set, else it will
+#shutdown.
+#Valid values should be in between 0 and 1 (exclusive).
+#diskUsageThreshold=0.95
+
+#Disk check interval in milli seconds, interval to check the ledger dirs usage.
+#Default is 10000
+#diskCheckInterval=10000
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Thu Oct 18 14:40:30 2012
@@ -42,6 +42,8 @@ import org.apache.bookkeeper.meta.Active
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.jmx.BKMBeanInfo;
import org.apache.bookkeeper.jmx.BKMBeanRegistry;
@@ -53,6 +55,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
@@ -66,11 +69,11 @@ import org.apache.zookeeper.Watcher.Even
public class Bookie extends Thread {
public static final String INSTANCEID = "INSTANCEID";
+ public static final String READONLY = "readonly";
static Logger LOG = LoggerFactory.getLogger(Bookie.class);
final File journalDirectory;
- final File ledgerDirectories[];
final ServerConfiguration conf;
final SyncThread syncThread;
@@ -87,6 +90,8 @@ public class Bookie extends Thread {
private final String bookieRegistrationPath;
static final String CURRENT_DIR = "current";
+ private LedgerDirsManager ledgerDirsManager;
+
// ZooKeeper client instance for the Bookie
ZooKeeper zk;
private volatile boolean isZkExpired = true;
@@ -104,6 +109,10 @@ public class Bookie extends Thread {
Map<Long, byte[]> masterKeyCache = Collections.synchronizedMap(new HashMap<Long, byte[]>());
+ final private String zkBookieRegPath;
+
+ final private AtomicBoolean readOnly = new AtomicBoolean(false);
+
public static class NoLedgerException extends IOException {
private static final long serialVersionUID = 1L;
private long ledgerId;
@@ -208,6 +217,10 @@ public class Bookie extends Thread {
boolean flushFailed = false;
try {
ledgerStorage.flush();
+ } catch (NoWritableLedgerDirException e) {
+ flushFailed = true;
+ flushing.set(false);
+ transitionToReadOnlyMode();
} catch (IOException e) {
LOG.error("Exception flushing Ledger", e);
flushFailed = true;
@@ -216,8 +229,13 @@ public class Bookie extends Thread {
// if flush failed, we should not roll last mark, otherwise we would
// have some ledgers are not flushed and their journal entries were lost
if (!flushFailed) {
- journal.rollLog();
- journal.gcJournals();
+ try {
+ journal.rollLog();
+ journal.gcJournals();
+ } catch (NoWritableLedgerDirException e) {
+ flushing.set(false);
+ transitionToReadOnlyMode();
+ }
}
// clear flushing flag
@@ -272,7 +290,7 @@ public class Bookie extends Thread {
private void checkEnvironment(ZooKeeper zk) throws BookieException, IOException {
if (zk == null) { // exists only for testing, just make sure directories are correct
checkDirectoryStructure(journalDirectory);
- for (File dir : ledgerDirectories) {
+ for (File dir : ledgerDirsManager.getAllLedgerDirs()) {
checkDirectoryStructure(dir);
}
return;
@@ -300,7 +318,7 @@ public class Bookie extends Thread {
} catch (FileNotFoundException fnf) {
missedCookieDirs.add(journalDirectory);
}
- for (File dir : ledgerDirectories) {
+ for (File dir : ledgerDirsManager.getAllLedgerDirs()) {
checkDirectoryStructure(dir);
try {
Cookie c = Cookie.readFromDirectory(dir);
@@ -319,7 +337,7 @@ public class Bookie extends Thread {
if (missedCookieDirs.size() > 0) {
LOG.debug("Directories missing cookie file are {}", missedCookieDirs);
masterCookie.writeToDirectory(journalDirectory);
- for (File dir : ledgerDirectories) {
+ for (File dir : ledgerDirsManager.getAllLedgerDirs()) {
masterCookie.writeToDirectory(dir);
}
}
@@ -353,6 +371,10 @@ public class Bookie extends Thread {
return instanceId;
}
+ public LedgerDirsManager getLedgerDirsManager() {
+ return ledgerDirsManager;
+ }
+
public static File getCurrentDirectory(File dir) {
return new File(dir, CURRENT_DIR);
}
@@ -372,7 +394,7 @@ public class Bookie extends Thread {
this.bookieRegistrationPath = conf.getZkAvailableBookiesPath() + "/";
this.conf = conf;
this.journalDirectory = getCurrentDirectory(conf.getJournalDir());
- this.ledgerDirectories = getCurrentDirectories(conf.getLedgerDirs());
+ this.ledgerDirsManager = new LedgerDirsManager(conf);
// instantiate zookeeper client to initialize ledger manager
this.zk = instantiateZookeeperClient(conf);
@@ -382,10 +404,19 @@ public class Bookie extends Thread {
activeLedgerManager = activeLedgerManagerFactory.newActiveLedgerManager();
syncThread = new SyncThread(conf);
- ledgerStorage = new InterleavedLedgerStorage(conf, activeLedgerManager);
+ ledgerStorage = new InterleavedLedgerStorage(conf, activeLedgerManager,
+ ledgerDirsManager);
handles = new HandleFactoryImpl(ledgerStorage);
// instantiate the journal
- journal = new Journal(conf);
+ journal = new Journal(conf, ledgerDirsManager);
+
+ // ZK ephemeral node for this Bookie.
+ zkBookieRegPath = this.bookieRegistrationPath + getMyId();
+ }
+
+ private String getMyId() throws UnknownHostException {
+ return InetAddress.getLocalHost().getHostAddress() + ":"
+ + conf.getBookiePort();
}
void readJournal() throws IOException, BookieException {
@@ -443,6 +474,10 @@ public class Bookie extends Thread {
// start bookie thread
super.start();
+ ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+ //Start DiskChecker thread
+ ledgerDirsManager.start();
+
ledgerStorage.start();
syncThread.start();
@@ -458,6 +493,38 @@ public class Bookie extends Thread {
}
}
+ /*
+ * Get the DiskFailure listener for the bookie
+ */
+ private LedgerDirsListener getLedgerDirsListener() {
+
+ return new LedgerDirsListener() {
+
+ @Override
+ public void diskFull(File disk) {
+ // Nothing needs to be handled here.
+ }
+
+ @Override
+ public void diskFailed(File disk) {
+ // Shutdown the bookie on disk failure.
+ triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
+ }
+
+ @Override
+ public void allDisksFull() {
+ // Transition to readOnly mode on all disks full
+ transitionToReadOnlyMode();
+ }
+
+ @Override
+ public void fatalError() {
+ LOG.error("Fatal error reported by ledgerDirsManager");
+ triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
+ }
+ };
+ }
+
/**
* Register jmx with parent
*
@@ -525,9 +592,6 @@ public class Bookie extends Thread {
// zookeeper instance is null, means not register itself to zk
return;
}
- // ZK ephemeral node for this Bookie.
- String zkBookieRegPath = this.bookieRegistrationPath
- + InetAddress.getLocalHost().getHostAddress() + ":" + port;
final CountDownLatch prevNodeLatch = new CountDownLatch(1);
try{
Watcher zkPrevRegNodewatcher = new Watcher() {
@@ -573,6 +637,63 @@ public class Bookie extends Thread {
}
}
+ /*
+ * Transition the bookie to readOnly mode
+ */
+ void transitionToReadOnlyMode() {
+ if (!readOnly.compareAndSet(false, true)) {
+ return;
+ }
+ if (!conf.isReadOnlyModeEnabled()) {
+ LOG.warn("ReadOnly mode is not enabled. "
+ + "Can be enabled by configuring "
+ + "'readOnlyModeEnabled=true' in configuration."
+ + "Shutting down bookie");
+ triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
+ return;
+ }
+ LOG.info("Transitioning Bookie to ReadOnly mode,"
+ + " and will serve only read requests from clients!");
+ try {
+ if (null == zk
+ .exists(this.bookieRegistrationPath + READONLY, false)) {
+ try {
+ zk.create(this.bookieRegistrationPath + READONLY,
+ new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } catch (NodeExistsException e) {
+ // this node is just now created by someone.
+ }
+ }
+ // Clear the current registered node
+ zk.delete(zkBookieRegPath, -1);
+ // Create the readonly node
+ zk.create(this.bookieRegistrationPath + READONLY + "/" + getMyId(),
+ new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ } catch (IOException e) {
+ LOG.error("Error in transition to ReadOnly Mode."
+ + " Shutting down", e);
+ triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
+ return;
+ } catch (KeeperException e) {
+ LOG.error("Error in transition to ReadOnly Mode."
+ + " Shutting down", e);
+ triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
+ return;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted Exception while transitioning to ReadOnly Mode.");
+ return;
+ }
+ }
+
+ /*
+ * Check whether Bookie is writable
+ */
+ public boolean isReadOnly() {
+ return readOnly.get();
+ }
+
/**
* Create a new zookeeper client to zk cluster.
*
@@ -643,7 +764,25 @@ public class Bookie extends Thread {
// following add operations to it would hang unit client timeout
// so we should let bookie server exists
LOG.error("Journal manager quits unexpectedly.");
- shutdown(ExitCode.BOOKIE_EXCEPTION);
+ triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
+ }
+ }
+
+ // Triggering the Bookie shutdown in its own thread,
+ // because shutdown can be called from sync thread which would be
+ // interrupted by shutdown call.
+ void triggerBookieShutdown(final int exitCode) {
+ Thread shutdownThread = new Thread() {
+ public void run() {
+ Bookie.this.shutdown(exitCode);
+ }
+ };
+ shutdownThread.start();
+ try {
+ shutdownThread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.debug("InterruptedException while waiting for shutdown. Not a problem!!");
}
}
@@ -665,6 +804,10 @@ public class Bookie extends Thread {
// Shutdown the ZK client
if(zk != null) zk.close();
+
+ //Shutdown disk checker
+ ledgerDirsManager.shutdown();
+
// Shutdown journal
journal.shutdown();
this.join();
@@ -744,9 +887,14 @@ public class Bookie extends Thread {
*/
public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
- LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
- synchronized (handle) {
- addEntryInternal(handle, entry, cb, ctx);
+ try {
+ LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
+ synchronized (handle) {
+ addEntryInternal(handle, entry, cb, ctx);
+ }
+ } catch (NoWritableLedgerDirException e) {
+ transitionToReadOnlyMode();
+ throw new IOException(e);
}
}
@@ -756,13 +904,18 @@ public class Bookie extends Thread {
*/
public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
- LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
- synchronized (handle) {
- if (handle.isFenced()) {
- throw BookieException.create(BookieException.Code.LedgerFencedException);
- }
-
- addEntryInternal(handle, entry, cb, ctx);
+ try {
+ LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
+ synchronized (handle) {
+ if (handle.isFenced()) {
+ throw BookieException
+ .create(BookieException.Code.LedgerFencedException);
+ }
+ addEntryInternal(handle, entry, cb, ctx);
+ }
+ } catch (NoWritableLedgerDirException e) {
+ transitionToReadOnlyMode();
+ throw new IOException(e);
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java Thu Oct 18 14:40:30 2012
@@ -640,7 +640,7 @@ public class BookieShell implements Tool
private synchronized Journal getJournal() throws IOException {
if (null == journal) {
- journal = new Journal(bkConf);
+ journal = new Journal(bkConf, new LedgerDirsManager(bkConf));
}
return journal;
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Thu Oct 18 14:40:30 2012
@@ -34,16 +34,17 @@ import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.util.IOUtils;
@@ -56,7 +57,10 @@ import org.apache.bookkeeper.util.IOUtil
*/
public class EntryLogger {
private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class);
- private File dirs[];
+
+ volatile File currentDir;
+ private LedgerDirsManager ledgerDirsManager;
+ private AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
private long logId;
/**
@@ -105,8 +109,9 @@ public class EntryLogger {
* Create an EntryLogger that stores it's log files in the given
* directories
*/
- public EntryLogger(ServerConfiguration conf) throws IOException {
- this.dirs = Bookie.getCurrentDirectories(conf.getLedgerDirs());
+ public EntryLogger(ServerConfiguration conf,
+ LedgerDirsManager ledgerDirsManager) throws IOException {
+ this.ledgerDirsManager = ledgerDirsManager;
// log size limit
this.logSizeLimit = conf.getEntryLogSizeLimit();
@@ -118,7 +123,7 @@ public class EntryLogger {
LOGFILE_HEADER.put("BKLO".getBytes());
// Find the largest logId
logId = -1;
- for(File dir: dirs) {
+ for (File dir : ledgerDirsManager.getAllLedgerDirs()) {
if (!dir.exists()) {
throw new FileNotFoundException(
"Entry log directory does not exist");
@@ -142,16 +147,44 @@ public class EntryLogger {
}
protected void initialize() throws IOException {
+ // Register listener for disk full notifications.
+ ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
// create a new log to write
createNewLog();
}
+ private LedgerDirsListener getLedgerDirsListener() {
+ return new LedgerDirsListener() {
+ @Override
+ public void diskFull(File disk) {
+ // If the current entry log disk is full, then create new entry
+ // log.
+ if (currentDir != null && currentDir.equals(disk)) {
+ shouldCreateNewEntryLog.set(true);
+ }
+ }
+
+ @Override
+ public void diskFailed(File disk) {
+ // Nothing to handle here. Will be handled in Bookie
+ }
+
+ @Override
+ public void allDisksFull() {
+ // Nothing to handle here. Will be handled in Bookie
+ }
+
+ @Override
+ public void fatalError() {
+ // Nothing to handle here. Will be handled in Bookie
+ }
+ };
+ }
+
/**
* Creates a new log file
*/
void createNewLog() throws IOException {
- List<File> list = Arrays.asList(dirs);
- Collections.shuffle(list);
if (logChannel != null) {
logChannel.flush(true);
}
@@ -160,21 +193,23 @@ public class EntryLogger {
File newLogFile = null;
do {
String logFileName = Long.toHexString(++logId) + ".log";
- for (File dir : list) {
- newLogFile = new File(dir, logFileName);
- if (newLogFile.exists()) {
- LOG.warn("Found existed entry log " + newLogFile
- + " when trying to create it as a new log.");
- newLogFile = null;
- break;
- }
+ File dir = ledgerDirsManager.pickRandomWritableDir();
+ newLogFile = new File(dir, logFileName);
+ currentDir = dir;
+ if (newLogFile.exists()) {
+ LOG.warn("Found existed entry log " + newLogFile
+ + " when trying to create it as a new log.");
+ newLogFile = null;
+ continue;
}
} while (newLogFile == null);
logChannel = new BufferedChannel(new RandomAccessFile(newLogFile, "rw").getChannel(), 64*1024);
logChannel.write((ByteBuffer) LOGFILE_HEADER.clear());
channels.put(logId, logChannel);
- for(File f: dirs) {
+
+ List<File> listOfDirs = ledgerDirsManager.getWritableLedgerDirs();
+ for (File f : listOfDirs) {
setLastLogId(f, logId);
}
}
@@ -290,8 +325,16 @@ public class EntryLogger {
}
}
synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
- if (logChannel.position() + entry.remaining() + 4 > logSizeLimit) {
+ // Create new log if logSizeLimit reached or current disk is full
+ boolean createNewLog = shouldCreateNewEntryLog.get();
+ if (createNewLog
+ || (logChannel.position() + entry.remaining() + 4 > logSizeLimit)) {
createNewLog();
+
+ // Reset the flag
+ if (createNewLog) {
+ shouldCreateNewEntryLog.set(false);
+ }
}
ByteBuffer buff = ByteBuffer.allocate(4);
buff.putInt(entry.remaining());
@@ -374,7 +417,7 @@ public class EntryLogger {
* Whether the log file exists or not.
*/
boolean logExists(long logId) {
- for (File d : dirs) {
+ for (File d : ledgerDirsManager.getAllLedgerDirs()) {
File f = new File(d, Long.toHexString(logId) + ".log");
if (f.exists()) {
return true;
@@ -384,7 +427,7 @@ public class EntryLogger {
}
private File findFile(long logId) throws FileNotFoundException {
- for(File d: dirs) {
+ for (File d : ledgerDirsManager.getAllLedgerDirs()) {
File f = new File(d, Long.toHexString(logId)+".log");
if (f.exists()) {
return f;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Thu Oct 18 14:40:30 2012
@@ -40,8 +40,8 @@ import org.slf4j.LoggerFactory;
class InterleavedLedgerStorage implements LedgerStorage {
final static Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class);
- private EntryLogger entryLogger;
- private LedgerCache ledgerCache;
+ EntryLogger entryLogger;
+ LedgerCache ledgerCache;
// This is the thread that garbage collects the entry logs that do not
// contain any active ledgers in them; and compacts the entry logs that
// has lower remaining percentage to reclaim disk space.
@@ -50,9 +50,10 @@ class InterleavedLedgerStorage implement
// this indicates that a write has happened since the last flush
private volatile boolean somethingWritten = false;
- InterleavedLedgerStorage(ServerConfiguration conf, ActiveLedgerManager activeLedgerManager)
- throws IOException {
- entryLogger = new EntryLogger(conf);
+ InterleavedLedgerStorage(ServerConfiguration conf,
+ ActiveLedgerManager activeLedgerManager,
+ LedgerDirsManager ledgerDirsManager) throws IOException {
+ entryLogger = new EntryLogger(conf, ledgerDirsManager);
ledgerCache = new LedgerCacheImpl(conf, activeLedgerManager);
gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
activeLedgerManager, new EntryLogCompactionScanner());
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java Thu Oct 18 14:40:30 2012
@@ -32,6 +32,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.util.IOUtils;
@@ -111,7 +112,7 @@ class Journal extends Thread {
return txnLogPosition;
}
- synchronized void rollLog() {
+ synchronized void rollLog() throws NoWritableLedgerDirException {
byte buff[] = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
// we should record <logId, logPosition> marked in markLog
@@ -120,7 +121,9 @@ class Journal extends Thread {
bb.putLong(lastMark.getTxnLogId());
bb.putLong(lastMark.getTxnLogPosition());
LOG.debug("RollLog to persist last marked log : {}", lastMark);
- for (File dir : ledgerDirectories) {
+ List<File> writableLedgerDirs = ledgerDirsManager
+ .getWritableLedgerDirs();
+ for (File dir : writableLedgerDirs) {
File file = new File(dir, "lastMark");
FileOutputStream fos = null;
try {
@@ -148,7 +151,7 @@ class Journal extends Thread {
synchronized void readLog() {
byte buff[] = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
- for(File dir: ledgerDirectories) {
+ for(File dir: ledgerDirsManager.getAllLedgerDirs()) {
File file = new File(dir, "lastMark");
try {
FileInputStream fis = new FileInputStream(file);
@@ -250,7 +253,6 @@ class Journal extends Thread {
final int maxBackupJournals;
final File journalDirectory;
- final File ledgerDirectories[];
final ServerConfiguration conf;
private LastLogMark lastLogMark = new LastLogMark(0, 0);
@@ -259,12 +261,13 @@ class Journal extends Thread {
LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
volatile boolean running = true;
+ private LedgerDirsManager ledgerDirsManager;
- public Journal(ServerConfiguration conf) {
+ public Journal(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager) {
super("BookieJournal-" + conf.getBookiePort());
+ this.ledgerDirsManager = ledgerDirsManager;
this.conf = conf;
this.journalDirectory = Bookie.getCurrentDirectory(conf.getJournalDir());
- this.ledgerDirectories = Bookie.getCurrentDirectories(conf.getLedgerDirs());
this.maxJournalSize = conf.getMaxJournalSize() * MB;
this.maxBackupJournals = conf.getMaxBackupJournals();
@@ -314,7 +317,7 @@ class Journal extends Thread {
* </p>
* @see #markLog()
*/
- public void rollLog() {
+ public void rollLog() throws NoWritableLedgerDirException {
lastLogMark.rollLog();
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java?rev=1399680&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java Thu Oct 18 14:40:30 2012
@@ -0,0 +1,240 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.bookkeeper.util.DiskChecker.DiskErrorException;
+import org.apache.bookkeeper.util.DiskChecker.DiskOutOfSpaceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages ledger directories used by the bookie.
+ */
+public class LedgerDirsManager {
+ private static Logger LOG = LoggerFactory
+ .getLogger(LedgerDirsManager.class);
+
+ private volatile List<File> filledDirs;
+ private final List<File> ledgerDirectories;
+ private volatile List<File> writableLedgerDirectories;
+ private DiskChecker diskChecker;
+ private List<LedgerDirsListener> listeners;
+ private LedgerDirsMonitor monitor;
+ private final Random rand = new Random();
+
+ public LedgerDirsManager(ServerConfiguration conf) {
+ this.ledgerDirectories = Arrays.asList(Bookie
+ .getCurrentDirectories(conf.getLedgerDirs()));
+ this.writableLedgerDirectories = new ArrayList<File>(ledgerDirectories);
+ this.filledDirs = new ArrayList<File>();
+ listeners = new ArrayList<LedgerDirsManager.LedgerDirsListener>();
+ diskChecker = new DiskChecker(conf.getDiskUsageThreshold());
+ monitor = new LedgerDirsMonitor(conf.getDiskCheckInterval());
+ }
+
+ /**
+ * Get all ledger dirs configured
+ */
+ public List<File> getAllLedgerDirs() {
+ return ledgerDirectories;
+ }
+
+ /**
+ * Get only writable ledger dirs.
+ */
+ public List<File> getWritableLedgerDirs()
+ throws NoWritableLedgerDirException {
+ if (writableLedgerDirectories.isEmpty()) {
+ String errMsg = "All ledger directories are non writable";
+ NoWritableLedgerDirException e = new NoWritableLedgerDirException(
+ errMsg);
+ LOG.error(errMsg, e);
+ throw e;
+ }
+ return writableLedgerDirectories;
+ }
+
+ /**
+ * Get dirs, which are full more than threshold
+ */
+ public boolean isDirFull(File dir) {
+ return filledDirs.contains(dir);
+ }
+
+ /**
+ * Add the dir to filled dirs list
+ */
+ // VisibleForTesting
+ public void addToFilledDirs(File dir) {
+ if (!filledDirs.contains(dir)) {
+ LOG.warn(dir + " is out of space."
+ + " Adding it to filled dirs list");
+ // Update filled dirs list
+ List<File> updatedFilledDirs = new ArrayList<File>(filledDirs);
+ updatedFilledDirs.add(dir);
+ filledDirs = updatedFilledDirs;
+ // Update the writable ledgers list
+ List<File> newDirs = new ArrayList<File>(writableLedgerDirectories);
+ newDirs.removeAll(filledDirs);
+ writableLedgerDirectories = newDirs;
+ // Notify listeners about disk full
+ for (LedgerDirsListener listener : listeners) {
+ listener.diskFull(dir);
+ }
+ }
+ }
+
+ /**
+ * Returns one of the ledger dir from writable dirs list randomly.
+ */
+ File pickRandomWritableDir() throws NoWritableLedgerDirException {
+ List<File> writableDirs = getWritableLedgerDirs();
+ return writableDirs.get(rand.nextInt(writableDirs.size()));
+ }
+
+ public void addLedgerDirsListener(LedgerDirsListener listener) {
+ if (listener != null) {
+ listeners.add(listener);
+ }
+ }
+
+ // start the daemon for disk monitoring
+ public void start() {
+ monitor.setDaemon(true);
+ monitor.start();
+ }
+
+ // shutdown disk monitoring daemon
+ public void shutdown() {
+ monitor.interrupt();
+ try {
+ monitor.join();
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+
+ /**
+ * Thread to monitor the disk space periodically.
+ */
+ private class LedgerDirsMonitor extends Thread {
+ int interval;
+
+ public LedgerDirsMonitor(int interval) {
+ this.interval = interval;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ List<File> writableDirs;
+ try {
+ writableDirs = getWritableLedgerDirs();
+ } catch (NoWritableLedgerDirException e) {
+ for (LedgerDirsListener listener : listeners) {
+ listener.allDisksFull();
+ }
+ break;
+ }
+ // Check all writable dirs disk space usage.
+ for (File dir : writableDirs) {
+ try {
+ diskChecker.checkDir(dir);
+ } catch (DiskErrorException e) {
+ // Notify disk failure to all listeners
+ for (LedgerDirsListener listener : listeners) {
+ listener.diskFailed(dir);
+ }
+ } catch (DiskOutOfSpaceException e) {
+ // Notify disk full to all listeners
+ addToFilledDirs(dir);
+ }
+ }
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ LOG.info("LedgerDirsMonitor thread is interrupted");
+ break;
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error Occured while checking disks", e);
+ // Notify disk failure to all listeners
+ for (LedgerDirsListener listener : listeners) {
+ listener.fatalError();
+ }
+ }
+ }
+ }
+
+ /**
+ * Indicates All configured ledger directories are full.
+ */
+ public static class NoWritableLedgerDirException extends IOException {
+ private static final long serialVersionUID = -8696901285061448421L;
+
+ public NoWritableLedgerDirException(String errMsg) {
+ super(errMsg);
+ }
+ }
+
+ /**
+ * Listener for the disk check events will be notified from the
+ * {@link LedgerDirsManager} whenever disk full/failure detected.
+ */
+ public static interface LedgerDirsListener {
+ /**
+ * This will be notified on disk failure/disk error
+ *
+ * @param disk
+ * Failed disk
+ */
+ void diskFailed(File disk);
+
+ /**
+ * This will be notified on disk detected as full
+ *
+ * @param disk
+ * Filled disk
+ */
+ void diskFull(File disk);
+
+ /**
+ * This will be notified whenever all disks are detected as full.
+ */
+ void allDisksFull();
+
+ /**
+ * This will notify the fatal errors.
+ */
+ void fatalError();
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java Thu Oct 18 14:40:30 2012
@@ -32,7 +32,7 @@ import org.apache.bookkeeper.conf.Server
public class ReadOnlyEntryLogger extends EntryLogger {
public ReadOnlyEntryLogger(ServerConfiguration conf) throws IOException {
- super(conf);
+ super(conf, new LedgerDirsManager(conf));
}
@Override
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Thu Oct 18 14:40:30 2012
@@ -58,6 +58,10 @@ public class ServerConfiguration extends
// Statistics Parameters
protected final static String ENABLE_STATISTICS = "enableStatistics";
protected final static String OPEN_LEDGER_REREPLICATION_GRACE_PERIOD = "openLedgerRereplicationGracePeriod";
+ //ReadOnly mode support on all disk full
+ protected final static String READ_ONLY_MODE_ENABLED = "readOnlyModeEnabled";
+ protected final static String DISK_USAGE_THRESHOLD = "diskUsageThreshold";
+ protected final static String DISK_CHECK_INTERVAL = "diskCheckInterval";
/**
* Construct a default configuration object
@@ -573,4 +577,50 @@ public class ServerConfiguration extends
public long getOpenLedgerRereplicationGracePeriod() {
return getLong(OPEN_LEDGER_REREPLICATION_GRACE_PERIOD, 30000);
}
+
+ /**
+ * Set the ReadOnlyModeEnabled status
+ */
+ public ServerConfiguration setReadOnlyModeEnabled(boolean enabled) {
+ setProperty(READ_ONLY_MODE_ENABLED, enabled);
+ return this;
+ }
+
+ /**
+ * Get ReadOnlyModeEnabled status
+ */
+ public boolean isReadOnlyModeEnabled() {
+ return getBoolean(READ_ONLY_MODE_ENABLED, false);
+ }
+
+ /**
+ * Set the Disk free space threshold in Bytes after which disk will be
+ * considered as full during diskcheck.
+ */
+ public ServerConfiguration setDiskUsageThreshold(float threshold) {
+ setProperty(DISK_USAGE_THRESHOLD, threshold);
+ return this;
+ }
+
+ /**
+ * Returns disk free space threshold. By default 100MB
+ */
+ public float getDiskUsageThreshold() {
+ return getFloat(DISK_USAGE_THRESHOLD, 0.95f);
+ }
+
+ /**
+ * Set the disk checker interval to monitor ledger disk space
+ */
+ public ServerConfiguration setDiskCheckInterval(int interval) {
+ setProperty(DISK_CHECK_INTERVAL, interval);
+ return this;
+ }
+
+ /**
+ * Get the disk checker interval
+ */
+ public int getDiskCheckInterval() {
+ return getInt(DISK_CHECK_INTERVAL, 10 * 1000);
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java Thu Oct 18 14:40:30 2012
@@ -165,6 +165,10 @@ public interface BookieProtocol {
*/
public static final int EFENCED = 104;
+ /**
+ * The server is running as read-only mode
+ */
+ public static final int EREADONLY = 105;
public static final short FLAG_NONE = 0x0;
public static final short FLAG_DO_FENCING = 0x0001;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java Thu Oct 18 14:40:30 2012
@@ -104,6 +104,11 @@ public class BookieServer implements NIO
}
}
+ //VisibleForTesting
+ public Bookie getBookie() {
+ return bookie;
+ }
+
public synchronized void shutdown() {
if (!running) {
return;
@@ -361,6 +366,15 @@ public class BookieServer implements NIO
switch (h.getOpCode()) {
case BookieProtocol.ADDENTRY:
statType = BKStats.STATS_ADD;
+
+ if (bookie.isReadOnly()) {
+ LOG.warn("BookieServer is running as readonly mode,"
+ + " so rejecting the request from the client!");
+ src.sendResponse(buildResponse(BookieProtocol.EREADONLY,
+ h.getVersion(), h.getOpCode(), ledgerId, entryId));
+ break;
+ }
+
try {
TimedCnxn tsrc = new TimedCnxn(src, startTime);
if ((flags & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java?rev=1399680&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java Thu Oct 18 14:40:30 2012
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.util;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Class that provides utility functions for checking disk problems
+ */
+public class DiskChecker {
+ private float diskUsageThreshold;
+
+ public static class DiskErrorException extends IOException {
+ private static final long serialVersionUID = 9091606022449761729L;
+
+ public DiskErrorException(String msg) {
+ super(msg);
+ }
+ }
+
+ public static class DiskOutOfSpaceException extends IOException {
+ private static final long serialVersionUID = 160898797915906860L;
+
+ public DiskOutOfSpaceException(String msg) {
+ super(msg);
+ }
+ }
+
+ public DiskChecker(float threshold) {
+ validateThreshold(threshold);
+ this.diskUsageThreshold = threshold;
+ }
+
+ /**
+ * The semantics of mkdirsWithExistsCheck method is different from the
+ * mkdirs method provided in the Sun's java.io.File class in the following
+ * way: While creating the non-existent parent directories, this method
+ * checks for the existence of those directories if the mkdir fails at any
+ * point (since that directory might have just been created by some other
+ * process). If both mkdir() and the exists() check fails for any seemingly
+ * non-existent directory, then we signal an error; Sun's mkdir would signal
+ * an error (return false) if a directory it is attempting to create already
+ * exists or the mkdir fails.
+ *
+ * @param dir
+ * @return true on success, false on failure
+ */
+ private static boolean mkdirsWithExistsCheck(File dir) {
+ if (dir.mkdir() || dir.exists()) {
+ return true;
+ }
+ File canonDir = null;
+ try {
+ canonDir = dir.getCanonicalFile();
+ } catch (IOException e) {
+ return false;
+ }
+ String parent = canonDir.getParent();
+ return (parent != null)
+ && (mkdirsWithExistsCheck(new File(parent)) && (canonDir
+ .mkdir() || canonDir.exists()));
+ }
+
+ /**
+ * Checks the disk space available.
+ *
+ * @param dir
+ * Directory to check for the disk space
+ * @throws DiskOutOfSpaceException
+ * Throws {@link DiskOutOfSpaceException} if available space is
+ * less than threshhold.
+ */
+ // VisibleForTesting
+ void checkDiskFull(File dir) throws DiskOutOfSpaceException {
+ if (null == dir) {
+ return;
+ }
+ if (dir.exists()) {
+ long usableSpace = dir.getUsableSpace();
+ long totalSpace = dir.getTotalSpace();
+ float free = (float) usableSpace / (float) totalSpace;
+ float used = 1f - free;
+ if (used > diskUsageThreshold) {
+ throw new DiskOutOfSpaceException("Space left on device "
+ + usableSpace + " < threshhold " + diskUsageThreshold);
+ }
+ } else {
+ checkDiskFull(dir.getParentFile());
+ }
+ }
+
+ /**
+ * Create the directory if it doesn't exist and
+ *
+ * @param dir
+ * Directory to check for the disk error/full.
+ * @throws DiskErrorException
+ * If disk having errors
+ * @throws DiskOutOfSpaceException
+ * If disk is full or having less space than threshhold
+ */
+ public void checkDir(File dir) throws DiskErrorException,
+ DiskOutOfSpaceException {
+ checkDiskFull(dir);
+ if (!mkdirsWithExistsCheck(dir))
+ throw new DiskErrorException("can not create directory: "
+ + dir.toString());
+
+ if (!dir.isDirectory())
+ throw new DiskErrorException("not a directory: " + dir.toString());
+
+ if (!dir.canRead())
+ throw new DiskErrorException("directory is not readable: "
+ + dir.toString());
+
+ if (!dir.canWrite())
+ throw new DiskErrorException("directory is not writable: "
+ + dir.toString());
+ }
+
+ /**
+ * Returns the disk space threshold.
+ *
+ * @return
+ */
+ // VisibleForTesting
+ float getDiskSpaceThreshold() {
+ return diskUsageThreshold;
+ }
+
+ /**
+ * Set the disk space threshold
+ *
+ * @param diskSpaceThreshold
+ */
+ // VisibleForTesting
+ void setDiskSpaceThreshold(float diskSpaceThreshold) {
+ validateThreshold(diskSpaceThreshold);
+ this.diskUsageThreshold = diskSpaceThreshold;
+ }
+
+ private void validateThreshold(float diskSpaceThreshold) {
+ if (diskSpaceThreshold <= 0 || diskSpaceThreshold >= 1) {
+ throw new IllegalArgumentException("Disk space threashold "
+ + diskSpaceThreshold
+ + " is not valid. Should be > 0 and < 1 ");
+ }
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java?rev=1399680&r1=1399679&r2=1399680&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java Thu Oct 18 14:40:30 2012
@@ -32,6 +32,7 @@ import org.apache.bookkeeper.bookie.Garb
import org.apache.bookkeeper.bookie.GarbageCollectorThread.ExtractionScanner;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -57,8 +58,9 @@ public class EntryLogTest extends TestCa
ServerConfiguration conf = new ServerConfiguration();
conf.setGcWaitTime(gcWaitTime);
conf.setLedgerDirNames(new String[] {tmpDir.toString()});
+ Bookie bookie = new Bookie(conf);
// create some entries
- EntryLogger logger = new EntryLogger(conf);
+ EntryLogger logger = ((InterleavedLedgerStorage)bookie.ledgerStorage).entryLogger;
logger.addEntry(1, generateEntry(1, 1));
logger.addEntry(3, generateEntry(3, 1));
logger.addEntry(2, generateEntry(2, 1));
@@ -69,7 +71,7 @@ public class EntryLogTest extends TestCa
raf.setLength(raf.length()-10);
raf.close();
// now see which ledgers are in the log
- logger = new EntryLogger(conf);
+ logger = new EntryLogger(conf, bookie.getLedgerDirsManager());
EntryLogMetadata meta = new EntryLogMetadata(0L);
ExtractionScanner scanner = new ExtractionScanner(meta);
@@ -87,10 +89,11 @@ public class EntryLogTest extends TestCa
}
private ByteBuffer generateEntry(long ledger, long entry) {
- ByteBuffer bb = ByteBuffer.wrap(new byte[64]);
+ byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
+ ByteBuffer bb = ByteBuffer.wrap(new byte[8 + 8 + data.length]);
bb.putLong(ledger);
bb.putLong(entry);
- bb.put(("ledger-" + ledger + "-" + entry).getBytes());
+ bb.put(data);
bb.flip();
return bb;
}
@@ -105,6 +108,7 @@ public class EntryLogTest extends TestCa
ServerConfiguration conf = new ServerConfiguration();
conf.setLedgerDirNames(new String[] {tmpDir.toString()});
+ Bookie bookie = new Bookie(conf);
// create some entries
int numLogs = 3;
int numEntries = 10;
@@ -112,7 +116,8 @@ public class EntryLogTest extends TestCa
for (int i=0; i<numLogs; i++) {
positions[i] = new long[numEntries];
- EntryLogger logger = new EntryLogger(conf);
+ EntryLogger logger = new EntryLogger(conf,
+ bookie.getLedgerDirsManager());
for (int j=0; j<numEntries; j++) {
positions[i][j] = logger.addEntry(i, generateEntry(i, j));
}
@@ -126,14 +131,16 @@ public class EntryLogTest extends TestCa
for (int i=numLogs; i<2*numLogs; i++) {
positions[i] = new long[numEntries];
- EntryLogger logger = new EntryLogger(conf);
+ EntryLogger logger = new EntryLogger(conf,
+ bookie.getLedgerDirsManager());
for (int j=0; j<numEntries; j++) {
positions[i][j] = logger.addEntry(i, generateEntry(i, j));
}
logger.flush();
}
- EntryLogger newLogger = new EntryLogger(conf);
+ EntryLogger newLogger = new EntryLogger(conf,
+ bookie.getLedgerDirsManager());
for (int i=0; i<(2*numLogs+1); i++) {
File logFile = new File(curDir, Long.toHexString(i) + ".log");
assertTrue(logFile.exists());
@@ -164,7 +171,7 @@ public class EntryLogTest extends TestCa
conf.setLedgerDirNames(new String[] { tmpDir.toString() });
EntryLogger entryLogger = null;
try {
- entryLogger = new EntryLogger(conf);
+ entryLogger = new EntryLogger(conf, new LedgerDirsManager(conf));
fail("Expecting FileNotFoundException");
} catch (FileNotFoundException e) {
assertEquals("Entry log directory does not exist", e
@@ -176,6 +183,42 @@ public class EntryLogTest extends TestCa
}
}
+ /**
+ * Test to verify the DiskFull during addEntry
+ */
+ @Test
+ public void testAddEntryFailureOnDiskFull() throws Exception {
+ File ledgerDir1 = File.createTempFile("bkTest", ".dir");
+ ledgerDir1.delete();
+ File ledgerDir2 = File.createTempFile("bkTest", ".dir");
+ ledgerDir2.delete();
+ ServerConfiguration conf = new ServerConfiguration();
+ conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(),
+ ledgerDir2.getAbsolutePath() });
+ Bookie bookie = new Bookie(conf);
+ EntryLogger entryLogger = new EntryLogger(conf,
+ bookie.getLedgerDirsManager());
+ InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage);
+ ledgerStorage.entryLogger = entryLogger;
+ // Create ledgers
+ ledgerStorage.setMasterKey(1, "key".getBytes());
+ ledgerStorage.setMasterKey(2, "key".getBytes());
+ ledgerStorage.setMasterKey(3, "key".getBytes());
+ // Add entries
+ ledgerStorage.addEntry(generateEntry(1, 1));
+ ledgerStorage.addEntry(generateEntry(2, 1));
+ // Add entry with disk full failure simulation
+ bookie.getLedgerDirsManager().addToFilledDirs(entryLogger.currentDir);
+ ledgerStorage.addEntry(generateEntry(3, 1));
+ // Verify written entries
+ Assert.assertArrayEquals(generateEntry(1, 1).array(), ledgerStorage
+ .getEntry(1, 1).array());
+ Assert.assertArrayEquals(generateEntry(2, 1).array(), ledgerStorage
+ .getEntry(2, 1).array());
+ Assert.assertArrayEquals(generateEntry(3, 1).array(), ledgerStorage
+ .getEntry(3, 1).array());
+ }
+
@After
public void tearDown() throws Exception {
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java?rev=1399680&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java Thu Oct 18 14:40:30 2012
@@ -0,0 +1,166 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.test;
+
+import java.io.File;
+import java.util.Enumeration;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * Test to verify the readonly feature of bookies
+ */
+public class ReadOnlyBookieTest extends BookKeeperClusterTestCase {
+
+ public ReadOnlyBookieTest() {
+ super(2);
+ }
+
+ /**
+ * Check readonly bookie
+ */
+ public void testBookieShouldServeAsReadOnly() throws Exception {
+ killBookie(0);
+ baseConf.setReadOnlyModeEnabled(true);
+ startNewBookie();
+ LedgerHandle ledger = bkc.createLedger(2, 2, DigestType.MAC,
+ "".getBytes());
+
+ // Check new bookie with readonly mode enabled.
+ File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
+ assertEquals("Only one ledger dir should be present", 1,
+ ledgerDirs.length);
+ Bookie bookie = bs.get(1).getBookie();
+ LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
+
+ for (int i = 0; i < 10; i++) {
+ ledger.addEntry("data".getBytes());
+ }
+
+ // Now add the current ledger dir to filled dirs list
+ ledgerDirsManager.addToFilledDirs(new File(ledgerDirs[0], "current"));
+
+ try {
+ ledger.addEntry("data".getBytes());
+ } catch (BKException.BKNotEnoughBookiesException e) {
+ // Expected
+ }
+
+ assertTrue("Bookie should be running and converted to readonly mode",
+ bookie.isRunning() && bookie.isReadOnly());
+
+ // Now kill the other bookie and read entries from the readonly bookie
+ killBookie(0);
+
+ Enumeration<LedgerEntry> readEntries = ledger.readEntries(0, 9);
+ while (readEntries.hasMoreElements()) {
+ LedgerEntry entry = readEntries.nextElement();
+ assertEquals("Entry should contain correct data", "data",
+ new String(entry.getEntry()));
+ }
+ }
+
+ /**
+ * check readOnlyModeEnabled=false
+ */
+ public void testBookieShutdownIfReadOnlyModeNotEnabled() throws Exception {
+ File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
+ assertEquals("Only one ledger dir should be present", 1,
+ ledgerDirs.length);
+ Bookie bookie = bs.get(1).getBookie();
+ LedgerHandle ledger = bkc.createLedger(2, 2, DigestType.MAC,
+ "".getBytes());
+ LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
+
+ for (int i = 0; i < 10; i++) {
+ ledger.addEntry("data".getBytes());
+ }
+
+ // Now add the current ledger dir to filled dirs list
+ ledgerDirsManager.addToFilledDirs(new File(ledgerDirs[0], "current"));
+
+ try {
+ ledger.addEntry("data".getBytes());
+ } catch (BKException.BKNotEnoughBookiesException e) {
+ // Expected
+ }
+
+ assertFalse("Bookie should shutdown if readOnlyMode not enabled",
+ bookie.isAlive());
+ }
+
+ /**
+ * Check multiple ledger dirs
+ */
+ public void testBookieContinueWritingIfMultipleLedgersPresent()
+ throws Exception {
+ startNewBookieWithMultipleLedgerDirs(2);
+
+ File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
+ assertEquals("Only one ledger dir should be present", 2,
+ ledgerDirs.length);
+ Bookie bookie = bs.get(1).getBookie();
+ LedgerHandle ledger = bkc.createLedger(2, 2, DigestType.MAC,
+ "".getBytes());
+ LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
+
+ for (int i = 0; i < 10; i++) {
+ ledger.addEntry("data".getBytes());
+ }
+
+ // Now add the current ledger dir to filled dirs list
+ ledgerDirsManager.addToFilledDirs(new File(ledgerDirs[0], "current"));
+ for (int i = 0; i < 10; i++) {
+ ledger.addEntry("data".getBytes());
+ }
+ assertEquals("writable dirs should have one dir", 1, ledgerDirsManager
+ .getWritableLedgerDirs().size());
+ assertTrue("Bookie should shutdown if readOnlyMode not enabled",
+ bookie.isAlive());
+ }
+
+ private void startNewBookieWithMultipleLedgerDirs(int numOfLedgerDirs)
+ throws Exception {
+ ServerConfiguration conf = bsConfs.get(1);
+ killBookie(1);
+
+ File[] ledgerDirs = new File[numOfLedgerDirs];
+ for (int i = 0; i < numOfLedgerDirs; i++) {
+ File dir = File.createTempFile("bookie", "test");
+ tmpDirs.add(dir);
+ dir.delete();
+ dir.mkdir();
+ ledgerDirs[i] = dir;
+ }
+
+ ServerConfiguration newConf = newServerConfiguration(
+ conf.getBookiePort() + 1, zkUtil.getZooKeeperConnectString(),
+ ledgerDirs[0], ledgerDirs);
+ bsConfs.add(newConf);
+ bs.add(startBookie(newConf));
+ }
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestDiskChecker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestDiskChecker.java?rev=1399680&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestDiskChecker.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestDiskChecker.java Thu Oct 18 14:40:30 2012
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.bookkeeper.util.DiskChecker.DiskErrorException;
+import org.apache.bookkeeper.util.DiskChecker.DiskOutOfSpaceException;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test to verify {@link DiskChecker}
+ *
+ */
+public class TestDiskChecker {
+
+ DiskChecker diskChecker;
+
+ @Before
+ public void setup() {
+ diskChecker = new DiskChecker(0.95f);
+ }
+
+ /**
+ * Check the disk full
+ */
+ @Test(expected = DiskOutOfSpaceException.class)
+ public void testCheckDiskFull() throws IOException {
+ File file = File.createTempFile("DiskCheck", "test");
+ long usableSpace = file.getUsableSpace();
+ long totalSpace = file.getTotalSpace();
+ diskChecker
+ .setDiskSpaceThreshold((1f - ((float) usableSpace / (float) totalSpace)) - 0.05f);
+ diskChecker.checkDiskFull(file);
+ }
+
+ /**
+ * Check disk full on non exist file. in this case it should check for
+ * parent file
+ */
+ @Test(expected = DiskOutOfSpaceException.class)
+ public void testCheckDiskFullOnNonExistFile() throws IOException {
+ File file = File.createTempFile("DiskCheck", "test");
+ long usableSpace = file.getUsableSpace();
+ long totalSpace = file.getTotalSpace();
+ diskChecker
+ .setDiskSpaceThreshold((1f - ((float) usableSpace / (float) totalSpace)) - 0.05f);
+ assertTrue(file.delete());
+ diskChecker.checkDiskFull(file);
+ }
+
+ /**
+ * Check disk error for file
+ */
+ @Test(expected = DiskErrorException.class)
+ public void testCheckDiskErrorForFile() throws Exception {
+ File parent = File.createTempFile("DiskCheck", "test");
+ parent.delete();
+ parent.mkdir();
+ File child = File.createTempFile("DiskCheck", "test", parent);
+ diskChecker.checkDir(child);
+ }
+
+ /**
+ * Check disk error for valid dir.
+ */
+ @Test
+ public void testCheckDiskErrorForDir() throws Exception {
+ File parent = File.createTempFile("DiskCheck", "test");
+ parent.delete();
+ parent.mkdir();
+ File child = File.createTempFile("DiskCheck", "test", parent);
+ child.delete();
+ child.mkdir();
+ diskChecker.checkDir(child);
+ }
+}