You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2014/01/21 17:26:23 UTC
svn commit: r1560066 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/test/java/org/apache/bookkeeper/test/
Author: ivank
Date: Tue Jan 21 16:26:23 2014
New Revision: 1560066
URL: http://svn.apache.org/r1560066
Log:
BOOKKEEPER-661: Turn readonly back to writable if spaces are reclaimed. (sijie via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Jan 21 16:26:23 2014
@@ -144,6 +144,8 @@ Trunk (unreleased changes)
BOOKKEEPER-696: stats collection on bookkeeper client (Aniruddha, ivank via sijie)
+ BOOKKEEPER-661: Turn readonly back to writable if spaces are reclaimed. (sijie via ivank)
+
hedwig-server:
BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Tue Jan 21 16:26:23 2014
@@ -65,6 +65,7 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,7 +97,7 @@ public class Bookie extends BookieCritic
// ZK registration path for this bookie
private final String bookieRegistrationPath;
- private LedgerDirsManager ledgerDirsManager;
+ private final LedgerDirsManager ledgerDirsManager;
private LedgerDirsManager indexDirsManager;
// ZooKeeper client instance for the Bookie
@@ -116,6 +117,7 @@ public class Bookie extends BookieCritic
final ConcurrentMap<Long, byte[]> masterKeyCache = new ConcurrentHashMap<Long, byte[]>();
final private String zkBookieRegPath;
+ final private String zkBookieReadOnlyPath;
final private AtomicBoolean readOnly = new AtomicBoolean(false);
@@ -244,6 +246,7 @@ public class Bookie extends BookieCritic
final AtomicBoolean oldDataExists = new AtomicBoolean(false);
parent.list(new FilenameFilter() {
+ @Override
public boolean accept(File dir, String name) {
if (name.endsWith(".txn") || name.endsWith(".idx") || name.endsWith(".log")) {
oldDataExists.set(true);
@@ -443,7 +446,9 @@ public class Bookie extends BookieCritic
handles = new HandleFactoryImpl(ledgerStorage);
// ZK ephemeral node for this Bookie.
- zkBookieRegPath = this.bookieRegistrationPath + getMyId();
+ String myID = getMyId();
+ zkBookieRegPath = this.bookieRegistrationPath + myID;
+ zkBookieReadOnlyPath = this.bookieRegistrationPath + BookKeeperConstants.READONLY + "/" + myID;
}
private String getMyId() throws UnknownHostException {
@@ -502,6 +507,7 @@ public class Bookie extends BookieCritic
});
}
+ @Override
synchronized public void start() {
setDaemon(true);
LOG.debug("I'm starting a bookie with journal directory {}", journalDirectory.getName());
@@ -582,6 +588,18 @@ public class Bookie extends BookieCritic
LOG.error("Fatal error reported by ledgerDirsManager");
triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
}
+
+ @Override
+ public void diskWritable(File disk) {
+ // Transition to writable mode when a disk becomes writable again.
+ transitionToWritableMode();
+ }
+
+ @Override
+ public void diskJustWritable(File disk) {
+ // Transition to writable mode when a disk becomes writable again.
+ transitionToWritableMode();
+ }
};
}
@@ -645,6 +663,56 @@ public class Bookie extends BookieCritic
}
/**
+ * Check existence of <i>regPath</i> and wait it expired if possible
+ *
+ * @param regPath
+ * reg node path.
+ * @return true if regPath exists, otherwise return false
+ * @throws IOException if can't create reg path
+ */
+ protected boolean checkRegNodeAndWaitExpired(String regPath) throws IOException {
+ final CountDownLatch prevNodeLatch = new CountDownLatch(1);
+ Watcher zkPrevRegNodewatcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ // Check for prev znode deletion. Connection expiration is
+ // not handling, since bookie has logic to shutdown.
+ if (EventType.NodeDeleted == event.getType()) {
+ prevNodeLatch.countDown();
+ }
+ }
+ };
+ try {
+ Stat stat = zk.exists(regPath, zkPrevRegNodewatcher);
+ if (null != stat) {
+ // if the ephemeral owner isn't current zookeeper client
+ // wait for it to be expired.
+ if (stat.getEphemeralOwner() != zk.getSessionId()) {
+ LOG.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:"
+ + " {} ms for znode deletion", regPath, conf.getZkTimeout());
+ // waiting for the previous bookie reg znode deletion
+ if (!prevNodeLatch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS)) {
+ throw new NodeExistsException(regPath);
+ } else {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } catch (KeeperException ke) {
+ LOG.error("ZK exception checking and wait ephemeral znode {} expired : ", regPath, ke);
+ throw new IOException("ZK exception checking and wait ephemeral znode "
+ + regPath + " expired", ke);
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted checking and wait ephemeral znode {} expired : ", regPath, ie);
+ throw new IOException("Interrupted checking and wait ephemeral znode "
+ + regPath + " expired", ie);
+ }
+ }
+
+ /**
* Register as an available bookie
*/
protected void registerBookie(ServerConfiguration conf) throws IOException {
@@ -654,39 +722,14 @@ public class Bookie extends BookieCritic
}
// ZK ephemeral node for this Bookie.
- String zkBookieRegPath = this.bookieRegistrationPath
- + StringUtils.addrToString(getBookieAddress(conf));
- final CountDownLatch prevNodeLatch = new CountDownLatch(1);
try{
- Watcher zkPrevRegNodewatcher = new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- // Check for prev znode deletion. Connection expiration is
- // not handling, since bookie has logic to shutdown.
- if (EventType.NodeDeleted == event.getType()) {
- prevNodeLatch.countDown();
- }
- }
- };
- if (null != zk.exists(zkBookieRegPath, zkPrevRegNodewatcher)) {
- LOG.info("Previous bookie registration znode: "
- + zkBookieRegPath
- + " exists, so waiting zk sessiontimeout: "
- + conf.getZkTimeout() + "ms for znode deletion");
- // waiting for the previous bookie reg znode deletion
- if (!prevNodeLatch.await(conf.getZkTimeout(),
- TimeUnit.MILLISECONDS)) {
- throw new KeeperException.NodeExistsException(
- zkBookieRegPath);
- }
+ if (!checkRegNodeAndWaitExpired(zkBookieRegPath)) {
+ // Create the ZK ephemeral node for this Bookie.
+ zk.create(zkBookieRegPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL);
}
-
- // Create the ZK ephemeral node for this Bookie.
- zk.create(zkBookieRegPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL);
} catch (KeeperException ke) {
- LOG.error("ZK exception registering ephemeral Znode for Bookie!",
- ke);
+ LOG.error("ZK exception registering ephemeral Znode for Bookie!", ke);
// Throw an IOException back up. This will cause the Bookie
// constructor to error out. Alternatively, we could do a System
// exit here as this is a fatal error.
@@ -701,12 +744,43 @@ public class Bookie extends BookieCritic
}
}
- /*
+ /**
+ * Transition the bookie from readOnly mode to writable
+ */
+ @VisibleForTesting
+ public void transitionToWritableMode() {
+ if (!readOnly.compareAndSet(true, false)) {
+ return;
+ }
+ LOG.info("Transitioning Bookie to Writable mode and will serve read/write requests.");
+ try {
+ this.registerBookie(conf);
+ } catch (IOException e) {
+ LOG.warn("Error in transitioning back to writable mode : ", e);
+ transitionToReadOnlyMode();
+ return;
+ }
+ // clear the readonly state
+ try {
+ zk.delete(zkBookieReadOnlyPath, -1);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted clearing readonly state while transitioning to writable mode : ", e);
+ return;
+ } catch (KeeperException e) {
+ // if we failed when deleting the readonly flag in zookeeper, it is OK since client would
+ // already see the bookie in writable list. so just log the exception
+ LOG.warn("Failed to delete bookie readonly state in zookeeper : ", e);
+ return;
+ }
+ }
+
+ /**
* Transition the bookie to readOnly mode
*/
@VisibleForTesting
public void transitionToReadOnlyMode() {
- if (shuttingdown == true) {
+ if (shuttingdown) {
return;
}
@@ -734,12 +808,18 @@ public class Bookie extends BookieCritic
// this node is just now created by someone.
}
}
- // Create the readonly node
- zk.create(this.bookieRegistrationPath
- + BookKeeperConstants.READONLY + "/" + getMyId(),
- new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- // Clear the current registered node
- zk.delete(zkBookieRegPath, -1);
+ if (!checkRegNodeAndWaitExpired(zkBookieReadOnlyPath)) {
+ // Create the readonly node
+ zk.create(zkBookieReadOnlyPath,
+ new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+ try {
+ // Clear the current registered node
+ zk.delete(zkBookieRegPath, -1);
+ } catch (KeeperException.NoNodeException nne) {
+ LOG.warn("No writable bookie registered node {} when transitioning to readonly",
+ zkBookieRegPath, nne);
+ }
} catch (IOException e) {
LOG.error("Error in transition to ReadOnly Mode."
+ " Shutting down", e);
@@ -837,6 +917,7 @@ public class Bookie extends BookieCritic
LOG.info("Triggering shutdown of Bookie-{} with exitCode {}",
conf.getBookiePort(), exitCode);
BookieThread th = new BookieThread("BookieShutdownTrigger") {
+ @Override
public void run() {
Bookie.this.shutdown(exitCode);
}
@@ -897,14 +978,14 @@ public class Bookie extends BookieCritic
return this.exitCode;
}
- /**
+ /**
* Retrieve the ledger descriptor for the ledger which entry should be added to.
- * The LedgerDescriptor returned from this method should be eventually freed with
+ * The LedgerDescriptor returned from this method should be eventually freed with
* #putHandle().
*
* @throws BookieException if masterKey does not match the master key of the ledger
*/
- private LedgerDescriptor getLedgerForEntry(ByteBuffer entry, byte[] masterKey)
+ private LedgerDescriptor getLedgerForEntry(ByteBuffer entry, byte[] masterKey)
throws IOException, BookieException {
long ledgerId = entry.getLong();
LedgerDescriptor l = handles.getHandle(ledgerId, masterKey);
@@ -932,7 +1013,7 @@ public class Bookie extends BookieCritic
}
/**
- * Add an entry to a ledger as specified by handle.
+ * Add an entry to a ledger as specified by handle.
*/
private void addEntryInternal(LedgerDescriptor handle, ByteBuffer entry, WriteCallback cb, Object ctx)
throws IOException, BookieException {
@@ -947,11 +1028,11 @@ public class Bookie extends BookieCritic
/**
* Add entry to a ledger, even if the ledger has previous been fenced. This should only
- * happen in bookie recovery or ledger recovery cases, where entries are being replicates
+ * happen in bookie recovery or ledger recovery cases, where entries are being replicates
* so that they exist on a quorum of bookies. The corresponding client side call for this
* is not exposed to users.
*/
- public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+ public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
try {
LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
@@ -963,8 +1044,8 @@ public class Bookie extends BookieCritic
throw new IOException(e);
}
}
-
- /**
+
+ /**
* Add entry to a ledger.
* @throws BookieException.LedgerFencedException if the ledger is fenced
*/
@@ -1026,6 +1107,7 @@ public class Bookie extends BookieCritic
static class CounterCallback implements WriteCallback {
int count;
+ @Override
synchronized public void writeComplete(int rc, long l, long e, InetSocketAddress addr, Object ctx) {
count--;
if (count == 0) {
@@ -1046,7 +1128,7 @@ public class Bookie extends BookieCritic
/**
* Format the bookie server data
- *
+ *
* @param conf
* ServerConfiguration
* @param isInteractive
@@ -1135,7 +1217,7 @@ public class Bookie extends BookieCritic
* @throws IOException
* @throws InterruptedException
*/
- public static void main(String[] args)
+ public static void main(String[] args)
throws IOException, InterruptedException, BookieException, KeeperException {
Bookie b = new Bookie(new ServerConfiguration());
b.start();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Tue Jan 21 16:26:23 2014
@@ -42,8 +42,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -62,8 +62,8 @@ public class EntryLogger {
private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class);
volatile File currentDir;
- private LedgerDirsManager ledgerDirsManager;
- private AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
+ private final LedgerDirsManager ledgerDirsManager;
+ private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
private long logId;
private volatile long leastUnflushedLogId;
@@ -233,6 +233,16 @@ public class EntryLogger {
public void fatalError() {
// Nothing to handle here. Will be handled in Bookie
}
+
+ @Override
+ public void diskWritable(File disk) {
+ // Nothing to handle here. Will be handled in Bookie
+ }
+
+ @Override
+ public void diskJustWritable(File disk) {
+ // Nothing to handle here. Will be handled in Bookie
+ }
};
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Tue Jan 21 16:26:23 2014
@@ -276,13 +276,20 @@ public class GarbageCollectorThread exte
lastMinorCompactionTime = lastMajorCompactionTime = MathUtils.now();
}
- synchronized void forceGC() {
+ public synchronized void enableForceGC() {
if (forceGarbageCollection.compareAndSet(false, true)) {
LOG.info("Forced garbage collection triggered by thread: {}", Thread.currentThread().getName());
notify();
}
}
+ public void disableForceGC() {
+ if (forceGarbageCollection.compareAndSet(true, false)) {
+ LOG.info("{} disabled force garbage collection since bookie has enough space now.", Thread
+ .currentThread().getName());
+ }
+ }
+
@Override
public void run() {
while (running) {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java Tue Jan 21 16:26:23 2014
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.bookie;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
@@ -367,6 +368,16 @@ public class IndexPersistenceMgr {
public void fatalError() {
// Nothing to handle here. Will be handled in Bookie
}
+
+ @Override
+ public void diskWritable(File disk) {
+ // Nothing to handle here. Will be handled in Bookie
+ }
+
+ @Override
+ public void diskJustWritable(File disk) {
+ // Nothing to handle here. Will be handled in Bookie
+ }
};
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Tue Jan 21 16:26:23 2014
@@ -111,23 +111,35 @@ class InterleavedLedgerStorage implement
@Override
public void diskAlmostFull(File disk) {
- gcThread.forceGC();
+ gcThread.enableForceGC();
}
@Override
public void diskFull(File disk) {
- gcThread.forceGC();
+ gcThread.enableForceGC();
}
@Override
public void allDisksFull() {
- gcThread.forceGC();
+ gcThread.enableForceGC();
}
@Override
public void fatalError() {
// do nothing.
}
+
+ @Override
+ public void diskWritable(File disk) {
+ // we have enough space now, disable force gc.
+ gcThread.disableForceGC();
+ }
+
+ @Override
+ public void diskJustWritable(File disk) {
+ // if a disk is just writable, we still need force gc.
+ gcThread.enableForceGC();
+ }
};
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java Tue Jan 21 16:26:23 2014
@@ -85,6 +85,13 @@ public class LedgerDirsManager {
}
/**
+ * @return full-filled ledger dirs.
+ */
+ public List<File> getFullFilledLedgerDirs() {
+ return filledDirs;
+ }
+
+ /**
* Get dirs, which are full more than threshold
*/
public boolean isDirFull(File dir) {
@@ -115,6 +122,34 @@ public class LedgerDirsManager {
}
/**
+ * Add the dir to writable dirs list.
+ *
+ * @param dir Dir
+ */
+ public void addToWritableDirs(File dir, boolean underWarnThreshold) {
+ if (writableLedgerDirectories.contains(dir)) {
+ return;
+ }
+ LOG.info("{} becomes writable. Adding it to writable dirs list.", dir);
+ // Update writable dirs list
+ List<File> updatedWritableDirs = new ArrayList<File>(writableLedgerDirectories);
+ updatedWritableDirs.add(dir);
+ writableLedgerDirectories = updatedWritableDirs;
+ // Update the filled dirs list
+ List<File> newDirs = new ArrayList<File>(filledDirs);
+ newDirs.removeAll(writableLedgerDirectories);
+ filledDirs = newDirs;
+ // Notify listeners about disk writable
+ for (LedgerDirsListener listener : listeners) {
+ if (underWarnThreshold) {
+ listener.diskWritable(dir);
+ } else {
+ listener.diskJustWritable(dir);
+ }
+ }
+ }
+
+ /**
* Returns one of the ledger dir from writable dirs list randomly.
*/
File pickRandomWritableDir() throws NoWritableLedgerDirException {
@@ -198,48 +233,58 @@ public class LedgerDirsManager {
@Override
public void run() {
- try {
- while (true) {
- List<File> writableDirs;
+ while (true) {
+ List<File> writableDirs;
+ try {
+ writableDirs = getWritableLedgerDirs();
+ } catch (NoWritableLedgerDirException e) {
+ for (LedgerDirsListener listener : listeners) {
+ listener.allDisksFull();
+ }
+ break;
+ }
+ // Check all writable dirs disk space usage.
+ for (File dir : writableDirs) {
try {
- writableDirs = getWritableLedgerDirs();
- } catch (NoWritableLedgerDirException e) {
+ diskChecker.checkDir(dir);
+ } catch (DiskErrorException e) {
+ // Notify disk failure to all listeners
for (LedgerDirsListener listener : listeners) {
- listener.allDisksFull();
+ LOG.warn("{} has errors.", dir, e);
+ listener.diskFailed(dir);
}
- break;
- }
- // Check all writable dirs disk space usage.
- for (File dir : writableDirs) {
- try {
- diskChecker.checkDir(dir);
- } catch (DiskErrorException e) {
- // Notify disk failure to all listeners
- for (LedgerDirsListener listener : listeners) {
- LOG.warn("{} has errors.", dir, e);
- listener.diskFailed(dir);
- }
- } catch (DiskWarnThresholdException e) {
- for (LedgerDirsListener listener : listeners) {
- listener.diskAlmostFull(dir);
- }
- } catch (DiskOutOfSpaceException e) {
- // Notify disk full to all listeners
- addToFilledDirs(dir);
+ } catch (DiskWarnThresholdException e) {
+ for (LedgerDirsListener listener : listeners) {
+ listener.diskAlmostFull(dir);
}
+ } catch (DiskOutOfSpaceException e) {
+ // Notify disk full to all listeners
+ addToFilledDirs(dir);
}
+ }
+ List<File> fullfilledDirs = new ArrayList<File>(getFullFilledLedgerDirs());
+ // Check all full-filled disk space usage
+ for (File dir : fullfilledDirs) {
try {
- Thread.sleep(interval);
- } catch (InterruptedException e) {
- LOG.info("LedgerDirsMonitor thread is interrupted");
- break;
+ diskChecker.checkDir(dir);
+ addToWritableDirs(dir, true);
+ } catch (DiskErrorException e) {
+ //Notify disk failure to all the listeners
+ for (LedgerDirsListener listener : listeners) {
+ listener.diskFailed(dir);
+ }
+ } catch (DiskWarnThresholdException e) {
+ // the full-filled dir become writable but still above warn threshold
+ addToWritableDirs(dir, false);
+ } catch (DiskOutOfSpaceException e) {
+ // the full-filled dir is still full-filled
}
}
- } catch (Exception e) {
- LOG.error("Error Occured while checking disks", e);
- // Notify disk failure to all listeners
- for (LedgerDirsListener listener : listeners) {
- listener.fatalError();
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ LOG.info("LedgerDirsMonitor thread is interrupted");
+ break;
}
}
LOG.info("LedgerDirsMonitorThread exited!");
@@ -278,7 +323,7 @@ public class LedgerDirsManager {
public static interface LedgerDirsListener {
/**
* This will be notified on disk failure/disk error
- *
+ *
* @param disk
* Failed disk
*/
@@ -293,13 +338,29 @@ public class LedgerDirsManager {
/**
* This will be notified on disk detected as full
- *
+ *
* @param disk
* Filled disk
*/
void diskFull(File disk);
/**
+ * This will be notified on disk detected as writable and under warn threshold
+ *
+ * @param disk
+ * Writable disk
+ */
+ void diskWritable(File disk);
+
+ /**
+ * This will be notified on disk detected as writable but still in warn threshold
+ *
+ * @param disk
+ * Writable disk
+ */
+ void diskJustWritable(File disk);
+
+ /**
* This will be notified whenever all disks are detected as full.
*/
void allDisksFull();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java Tue Jan 21 16:26:23 2014
@@ -32,8 +32,6 @@ import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.ChannelException;
import junit.framework.Assert;
-import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -122,6 +120,32 @@ public class BookieInitializationTest {
ExitCode.ZK_REG_FAIL, bkServer.getExitCode());
}
+ @Test(timeout = 20000)
+ public void testBookieRegistrationWithSameZooKeeperClient() throws Exception {
+ File tmpDir = File.createTempFile("bookie", "test");
+ tmpDir.delete();
+ tmpDir.mkdir();
+
+ final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+ .setZkServers(null).setJournalDirName(tmpDir.getPath())
+ .setLedgerDirNames(new String[] { tmpDir.getPath() });
+
+ final String bkRegPath = conf.getZkAvailableBookiesPath() + "/"
+ + InetAddress.getLocalHost().getHostAddress() + ":"
+ + conf.getBookiePort();
+
+ MockBookie b = new MockBookie(conf);
+ b.zk = zkc;
+ b.testRegisterBookie(conf);
+ Assert.assertNotNull("Bookie registration node doesn't exists!",
+ zkc.exists(bkRegPath, false));
+
+ // test register bookie again if the registeration node is created by itself.
+ b.testRegisterBookie(conf);
+ Assert.assertNotNull("Bookie registration node doesn't exists!",
+ zkc.exists(bkRegPath, false));
+ }
+
/**
* Verify the bookie reg. Restarting bookie server will wait for the session
* timeout when previous reg node exists in zk. On zNode delete event,
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java Tue Jan 21 16:26:23 2014
@@ -205,7 +205,7 @@ public class CompactionTest extends Book
storage.start();
long startTime = MathUtils.now();
Thread.sleep(2000);
- storage.gcThread.forceGC();
+ storage.gcThread.enableForceGC();
Thread.sleep(1000);
// Minor and Major compaction times should be larger than when we started
// this test.
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java Tue Jan 21 16:26:23 2014
@@ -340,5 +340,13 @@ public class TestSyncThread {
@Override
public void fatalError() {
}
+
+ @Override
+ public void diskWritable(File disk) {
+ }
+
+ @Override
+ public void diskJustWritable(File disk) {
+ }
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java?rev=1560066&r1=1560065&r2=1560066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java Tue Jan 21 16:26:23 2014
@@ -30,6 +30,7 @@ import org.apache.bookkeeper.client.Book
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.junit.Test;
/**
* Test to verify the readonly feature of bookies
@@ -43,6 +44,7 @@ public class ReadOnlyBookieTest extends
/**
* Check readonly bookie
*/
+ @Test(timeout = 60000)
public void testBookieShouldServeAsReadOnly() throws Exception {
killBookie(0);
baseConf.setReadOnlyModeEnabled(true);
@@ -66,6 +68,7 @@ public class ReadOnlyBookieTest extends
try {
ledger.addEntry("data".getBytes());
+ fail("Should fail to add entry since there isn't enough bookies alive.");
} catch (BKException.BKNotEnoughBookiesException e) {
// Expected
}
@@ -84,9 +87,73 @@ public class ReadOnlyBookieTest extends
}
}
+ @Test(timeout = 60000)
+ public void testBookieShouldTurnWritableFromReadOnly() throws Exception {
+ killBookie(0);
+ baseConf.setReadOnlyModeEnabled(true);
+ startNewBookie();
+ LedgerHandle ledger = bkc.createLedger(2, 2, DigestType.MAC,
+ "".getBytes());
+
+ // Check new bookie with readonly mode enabled.
+ File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
+ assertEquals("Only one ledger dir should be present", 1,
+ ledgerDirs.length);
+ Bookie bookie = bs.get(1).getBookie();
+ LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
+
+ for (int i = 0; i < 10; i++) {
+ ledger.addEntry("data".getBytes());
+ }
+
+ File testDir = new File(ledgerDirs[0], "current");
+
+ // Now add the current ledger dir to filled dirs list
+ ledgerDirsManager.addToFilledDirs(testDir);
+
+ try {
+ ledger.addEntry("data".getBytes());
+ fail("Should fail to add entry since there isn't enough bookies alive.");
+ } catch (BKException.BKNotEnoughBookiesException e) {
+ // Expected
+ }
+ LOG.info("bookie is running {}, readonly {}.", bookie.isRunning(), bookie.isReadOnly());
+ assertTrue("Bookie should be running and converted to readonly mode",
+ bookie.isRunning() && bookie.isReadOnly());
+
+ // refresh the bookkeeper client
+ bkc.readBookiesBlocking();
+ // should fail to create ledger
+ try {
+ bkc.createLedger(2, 2, DigestType.MAC, "".getBytes());
+ fail("Should fail to create a ledger since there isn't enough bookies alive.");
+ } catch (BKException.BKNotEnoughBookiesException bke) {
+ // Expected.
+ }
+
+ // Now add the current ledger dir back to writable dirs list
+ ledgerDirsManager.addToWritableDirs(testDir, true);
+
+ LOG.info("bookie is running {}, readonly {}.", bookie.isRunning(), bookie.isReadOnly());
+ assertTrue("Bookie should be running and converted back to writable mode", bookie.isRunning()
+ && !bookie.isReadOnly());
+ // force client to read bookies
+ bkc.readBookiesBlocking();
+ LedgerHandle newLedger = bkc.createLedger(2, 2, DigestType.MAC, "".getBytes());
+ for (int i = 0; i < 10; i++) {
+ newLedger.addEntry("data".getBytes());
+ }
+ Enumeration<LedgerEntry> readEntries = newLedger.readEntries(0, 9);
+ while (readEntries.hasMoreElements()) {
+ LedgerEntry entry = readEntries.nextElement();
+ assertEquals("Entry should contain correct data", "data", new String(entry.getEntry()));
+ }
+ }
+
/**
* check readOnlyModeEnabled=false
*/
+ @Test(timeout = 60000)
public void testBookieShutdownIfReadOnlyModeNotEnabled() throws Exception {
File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
assertEquals("Only one ledger dir should be present", 1,
@@ -105,6 +172,7 @@ public class ReadOnlyBookieTest extends
try {
ledger.addEntry("data".getBytes());
+ fail("Should fail to add entry since there isn't enough bookies alive.");
} catch (BKException.BKNotEnoughBookiesException e) {
// Expected
}
@@ -120,6 +188,7 @@ public class ReadOnlyBookieTest extends
/**
* Check multiple ledger dirs
*/
+ @Test(timeout = 60000)
public void testBookieContinueWritingIfMultipleLedgersPresent()
throws Exception {
startNewBookieWithMultipleLedgerDirs(2);
@@ -171,6 +240,7 @@ public class ReadOnlyBookieTest extends
/**
* Test ledger creation with readonly bookies
*/
+ @Test(timeout = 60000)
public void testLedgerCreationShouldFailWithReadonlyBookie() throws Exception {
killBookie(1);
baseConf.setReadOnlyModeEnabled(true);