You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/05/09 11:41:36 UTC
svn commit: r1480580 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/
Author: ivank
Date: Thu May 9 09:41:35 2013
New Revision: 1480580
URL: http://svn.apache.org/r1480580
Log:
BOOKKEEPER-608: Make SyncThread a reusable component (ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexCorruptionTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1480580&r1=1480579&r2=1480580&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu May 9 09:41:35 2013
@@ -56,6 +56,8 @@ Trunk (unreleased changes)
IMPROVEMENTS:
+ BOOKKEEPER-608: Make SyncThread a reusable component (ivank)
+
BOOKKEEPER-555: Make BookieServer use Netty rather than a custom IO server (ivank)
BOOKKEEPER-526: multiple threads for delivery manager (sijie via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1480580&r1=1480579&r2=1480580&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Thu May 9 09:41:35 2013
@@ -45,7 +45,6 @@ import org.apache.bookkeeper.meta.Ledger
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.GarbageCollectorThread.SafeEntryAdder;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
-import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -239,149 +238,6 @@ public class Bookie extends Thread {
}
}
- /**
- * SyncThread is a background thread which help checkpointing ledger storage
- * when a checkpoint is requested. After a ledger storage is checkpointed,
- * the journal files added before checkpoint will be garbage collected.
- * <p>
- * After all data has been persisted to ledger index files and entry
- * loggers, it is safe to complete a checkpoint by persisting the log marker
- * to disk. If bookie failed after persist log mark, bookie is able to relay
- * journal entries started from last log mark without losing any entries.
- * </p>
- * <p>
- * Those journal files whose id are less than the log id in last log mark,
- * could be removed safely after persisting last log mark. We provide a
- * setting to let user keeping number of old journal files which may be used
- * for manual recovery in critical disaster.
- * </p>
- */
- class SyncThread extends Thread {
- volatile boolean running = true;
- // flag to ensure sync thread will not be interrupted during flush
- final AtomicBoolean flushing = new AtomicBoolean(false);
- final int flushInterval;
-
- public SyncThread(ServerConfiguration conf) {
- super("SyncThread");
- flushInterval = conf.getFlushInterval();
- LOG.debug("Flush Interval : {}", flushInterval);
- }
-
- /**
- * flush data up to given logMark and roll log if success
- * @param checkpoint
- */
- @VisibleForTesting
- public void checkpoint(Checkpoint checkpoint) {
- boolean flushFailed = false;
- try {
- if (running) {
- checkpoint = ledgerStorage.checkpoint(checkpoint);
- } else {
- ledgerStorage.flush();
- }
- } catch (NoWritableLedgerDirException e) {
- LOG.error("No writeable ledger directories");
- flushFailed = true;
- flushing.set(false);
- transitionToReadOnlyMode();
- } catch (IOException e) {
- LOG.error("Exception flushing Ledger", e);
- flushFailed = true;
- }
-
- // if flush failed, we should not roll last mark, otherwise we would
- // have some ledgers are not flushed and their journal entries were lost
- if (!flushFailed) {
- try {
- journal.checkpointComplete(checkpoint, running);
- } catch (IOException e) {
- flushing.set(false);
- LOG.error("Marking checkpoint as complete failed", e);
- transitionToReadOnlyMode();
- }
- }
- }
-
- private Object suspensionLock = new Object();
- private boolean suspended = false;
-
- /**
- * Suspend sync thread. (for testing)
- */
- @VisibleForTesting
- public void suspendSync() {
- synchronized(suspensionLock) {
- suspended = true;
- }
- }
-
- /**
- * Resume sync thread. (for testing)
- */
- @VisibleForTesting
- public void resumeSync() {
- synchronized(suspensionLock) {
- suspended = false;
- suspensionLock.notify();
- }
- }
-
- @Override
- public void run() {
- try {
- while(running) {
- synchronized (this) {
- try {
- wait(flushInterval);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- continue;
- }
- }
-
- synchronized (suspensionLock) {
- while (suspended) {
- try {
- suspensionLock.wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- continue;
- }
- }
- }
-
- // try to mark flushing flag to check if interrupted
- if (!flushing.compareAndSet(false, true)) {
- // set flushing flag failed, means flushing is true now
- // indicates another thread wants to interrupt sync thread to exit
- break;
- }
- checkpoint(journal.newCheckpoint());
-
- flushing.set(false);
- }
- } catch (Throwable t) {
- LOG.error("Exception in SyncThread", t);
- flushing.set(false);
- triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
- }
- }
-
- // shutdown sync thread
- void shutdown() throws InterruptedException {
- // Wake up and finish sync thread
- running = false;
- // make a checkpoint when shutdown
- if (flushing.compareAndSet(false, true)) {
- // it is safe to interrupt itself now
- this.interrupt();
- }
- this.join();
- }
- }
-
public static void checkDirectoryStructure(File dir) throws IOException {
if (!dir.exists()) {
File parent = dir.getParentFile();
@@ -538,12 +394,15 @@ public class Bookie extends Thread {
ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
ledgerManager = ledgerManagerFactory.newLedgerManager();
- syncThread = new SyncThread(conf);
+
// instantiate the journal
journal = new Journal(conf, ledgerDirsManager);
ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager,
ledgerDirsManager, journal,
new BookieSafeEntryAdder());
+ syncThread = new SyncThread(conf, getLedgerDirsListener(),
+ ledgerStorage, journal);
+
handles = new HandleFactoryImpl(ledgerStorage);
// ZK ephemeral node for this Bookie.
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java?rev=1480580&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java Thu May 9 09:41:35 2013
@@ -0,0 +1,192 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SyncThread is a background thread which help checkpointing ledger storage
+ * when a checkpoint is requested. After a ledger storage is checkpointed,
+ * the journal files added before checkpoint will be garbage collected.
+ * <p>
+ * After all data has been persisted to ledger index files and entry
+ * loggers, it is safe to complete a checkpoint by persisting the log marker
+ * to disk. If bookie failed after persist log mark, bookie is able to relay
+ * journal entries started from last log mark without losing any entries.
+ * </p>
+ * <p>
+ * Those journal files whose id are less than the log id in last log mark,
+ * could be removed safely after persisting last log mark. We provide a
+ * setting to let user keeping number of old journal files which may be used
+ * for manual recovery in critical disaster.
+ * </p>
+ */
+class SyncThread extends Thread {
+ static Logger LOG = LoggerFactory.getLogger(SyncThread.class);
+
+ volatile boolean running = true;
+ // flag to ensure sync thread will not be interrupted during flush
+ final AtomicBoolean flushing = new AtomicBoolean(false);
+ final int flushInterval;
+ final LedgerStorage ledgerStorage;
+ final LedgerDirsListener dirsListener;
+ final CheckpointSource checkpointSource;
+
+ public SyncThread(ServerConfiguration conf,
+ LedgerDirsListener dirsListener,
+ LedgerStorage ledgerStorage,
+ CheckpointSource checkpointSource) {
+ super("SyncThread");
+ this.dirsListener = dirsListener;
+ this.ledgerStorage = ledgerStorage;
+ this.checkpointSource = checkpointSource;
+
+ flushInterval = conf.getFlushInterval();
+ LOG.debug("Flush Interval : {}", flushInterval);
+ }
+
+ /**
+ * flush data up to given logMark and roll log if success
+ * @param checkpoint
+ */
+ @VisibleForTesting
+ public void checkpoint(Checkpoint checkpoint) {
+ boolean flushFailed = false;
+ try {
+ if (running) {
+ checkpoint = ledgerStorage.checkpoint(checkpoint);
+ } else {
+ ledgerStorage.flush();
+ }
+ } catch (NoWritableLedgerDirException e) {
+ LOG.error("No writeable ledger directories");
+ flushFailed = true;
+ flushing.set(false);
+ dirsListener.allDisksFull();
+ } catch (IOException e) {
+ LOG.error("Exception flushing Ledger", e);
+ flushFailed = true;
+ }
+
+ // if flush failed, we should not roll last mark, otherwise we would
+ // have some ledgers are not flushed and their journal entries were lost
+ if (!flushFailed) {
+ try {
+ checkpointSource.checkpointComplete(checkpoint, running);
+ } catch (IOException e) {
+ flushing.set(false);
+ LOG.error("Marking checkpoint as complete failed", e);
+ dirsListener.allDisksFull();
+ }
+ }
+ }
+
+ private Object suspensionLock = new Object();
+ private boolean suspended = false;
+
+ /**
+ * Suspend sync thread. (for testing)
+ */
+ @VisibleForTesting
+ public void suspendSync() {
+ synchronized(suspensionLock) {
+ suspended = true;
+ }
+ }
+
+ /**
+ * Resume sync thread. (for testing)
+ */
+ @VisibleForTesting
+ public void resumeSync() {
+ synchronized(suspensionLock) {
+ suspended = false;
+ suspensionLock.notify();
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ while(running) {
+ synchronized (this) {
+ try {
+ wait(flushInterval);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ continue;
+ }
+ }
+
+ synchronized (suspensionLock) {
+ while (suspended) {
+ try {
+ suspensionLock.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ continue;
+ }
+ }
+ }
+
+ // try to mark flushing flag to check if interrupted
+ if (!flushing.compareAndSet(false, true)) {
+ // set flushing flag failed, means flushing is true now
+ // indicates another thread wants to interrupt sync thread to exit
+ break;
+ }
+ checkpoint(checkpointSource.newCheckpoint());
+
+ flushing.set(false);
+ }
+ } catch (Throwable t) {
+ LOG.error("Exception in SyncThread", t);
+ flushing.set(false);
+ dirsListener.fatalError();
+ }
+ }
+
+ // shutdown sync thread
+ void shutdown() throws InterruptedException {
+ // Wake up and finish sync thread
+ running = false;
+ // make a checkpoint when shutdown
+ if (flushing.compareAndSet(false, true)) {
+ // it is safe to interrupt itself now
+ this.interrupt();
+ }
+ this.join();
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexCorruptionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexCorruptionTest.java?rev=1480580&r1=1480579&r2=1480580&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexCorruptionTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexCorruptionTest.java Thu May 9 09:41:35 2013
@@ -56,7 +56,7 @@ public class IndexCorruptionTest extends
public void testNoSuchLedger() throws Exception {
LOG.debug("Testing NoSuchLedger");
- Bookie.SyncThread syncThread = bs.get(0).getBookie().syncThread;
+ SyncThread syncThread = bs.get(0).getBookie().syncThread;
syncThread.suspendSync();
// Create a ledger
LedgerHandle lh = bkc.createLedger(1, 1, digestType, "".getBytes());
@@ -97,7 +97,7 @@ public class IndexCorruptionTest extends
public void testEmptyIndexPage() throws Exception {
LOG.debug("Testing EmptyIndexPage");
- Bookie.SyncThread syncThread = bs.get(0).getBookie().syncThread;
+ SyncThread syncThread = bs.get(0).getBookie().syncThread;
assertNotNull("Not found SyncThread.", syncThread);
syncThread.suspendSync();