You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/05/09 11:41:36 UTC

svn commit: r1480580 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/

Author: ivank
Date: Thu May  9 09:41:35 2013
New Revision: 1480580

URL: http://svn.apache.org/r1480580
Log:
BOOKKEEPER-608: Make SyncThread a reusable component (ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexCorruptionTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1480580&r1=1480579&r2=1480580&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu May  9 09:41:35 2013
@@ -56,6 +56,8 @@ Trunk (unreleased changes)
 
     IMPROVEMENTS:
 
+      BOOKKEEPER-608: Make SyncThread a reusable component (ivank)
+
       BOOKKEEPER-555: Make BookieServer use Netty rather than a custom IO server (ivank)
 
       BOOKKEEPER-526: multiple threads for delivery manager (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1480580&r1=1480579&r2=1480580&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Thu May  9 09:41:35 2013
@@ -45,7 +45,6 @@ import org.apache.bookkeeper.meta.Ledger
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.GarbageCollectorThread.SafeEntryAdder;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
-import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -239,149 +238,6 @@ public class Bookie extends Thread {
         }
     }
 
-    /**
-     * SyncThread is a background thread which help checkpointing ledger storage
-     * when a checkpoint is requested. After a ledger storage is checkpointed,
-     * the journal files added before checkpoint will be garbage collected.
-     * <p>
-     * After all data has been persisted to ledger index files and entry
-     * loggers, it is safe to complete a checkpoint by persisting the log marker
-     * to disk. If bookie failed after persist log mark, bookie is able to relay
-     * journal entries started from last log mark without losing any entries.
-     * </p>
-     * <p>
-     * Those journal files whose id are less than the log id in last log mark,
-     * could be removed safely after persisting last log mark. We provide a
-     * setting to let user keeping number of old journal files which may be used
-     * for manual recovery in critical disaster.
-     * </p>
-     */
-    class SyncThread extends Thread {
-        volatile boolean running = true;
-        // flag to ensure sync thread will not be interrupted during flush
-        final AtomicBoolean flushing = new AtomicBoolean(false);
-        final int flushInterval;
-
-        public SyncThread(ServerConfiguration conf) {
-            super("SyncThread");
-            flushInterval = conf.getFlushInterval();
-            LOG.debug("Flush Interval : {}", flushInterval);
-        }
-
-        /**
-         * flush data up to given logMark and roll log if success
-         * @param checkpoint
-         */
-        @VisibleForTesting
-        public void checkpoint(Checkpoint checkpoint) {
-            boolean flushFailed = false;
-            try {
-                if (running) {
-                    checkpoint = ledgerStorage.checkpoint(checkpoint);
-                } else {
-                    ledgerStorage.flush();
-                }
-            } catch (NoWritableLedgerDirException e) {
-                LOG.error("No writeable ledger directories");
-                flushFailed = true;
-                flushing.set(false);
-                transitionToReadOnlyMode();
-            } catch (IOException e) {
-                LOG.error("Exception flushing Ledger", e);
-                flushFailed = true;
-            }
-
-            // if flush failed, we should not roll last mark, otherwise we would
-            // have some ledgers are not flushed and their journal entries were lost
-            if (!flushFailed) {
-                try {
-                    journal.checkpointComplete(checkpoint, running);
-                } catch (IOException e) {
-                    flushing.set(false);
-                    LOG.error("Marking checkpoint as complete failed", e);
-                    transitionToReadOnlyMode();
-                }
-            }
-        }
-
-        private Object suspensionLock = new Object();
-        private boolean suspended = false;
-
-        /**
-         * Suspend sync thread. (for testing)
-         */
-        @VisibleForTesting
-        public void suspendSync() {
-            synchronized(suspensionLock) {
-                suspended = true;
-            }
-        }
-
-        /**
-         * Resume sync thread. (for testing)
-         */
-        @VisibleForTesting
-        public void resumeSync() {
-            synchronized(suspensionLock) {
-                suspended = false;
-                suspensionLock.notify();
-            }
-        }
-
-        @Override
-        public void run() {
-            try {
-                while(running) {
-                    synchronized (this) {
-                        try {
-                            wait(flushInterval);
-                        } catch (InterruptedException e) {
-                            Thread.currentThread().interrupt();
-                            continue;
-                        }
-                    }
-
-                    synchronized (suspensionLock) {
-                        while (suspended) {
-                            try {
-                                suspensionLock.wait();
-                            } catch (InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                                continue;
-                            }
-                        }
-                    }
-
-                    // try to mark flushing flag to check if interrupted
-                    if (!flushing.compareAndSet(false, true)) {
-                        // set flushing flag failed, means flushing is true now
-                        // indicates another thread wants to interrupt sync thread to exit
-                        break;
-                    }
-                    checkpoint(journal.newCheckpoint());
-
-                    flushing.set(false);
-                }
-            } catch (Throwable t) {
-                LOG.error("Exception in SyncThread", t);
-                flushing.set(false);
-                triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
-            }
-        }
-
-        // shutdown sync thread
-        void shutdown() throws InterruptedException {
-            // Wake up and finish sync thread
-            running = false;
-            // make a checkpoint when shutdown
-            if (flushing.compareAndSet(false, true)) {
-                // it is safe to interrupt itself now 
-                this.interrupt();
-            }
-            this.join();
-        }
-    }
-
     public static void checkDirectoryStructure(File dir) throws IOException {
         if (!dir.exists()) {
             File parent = dir.getParentFile();
@@ -538,12 +394,15 @@ public class Bookie extends Thread {
         ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
         LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
         ledgerManager = ledgerManagerFactory.newLedgerManager();
-        syncThread = new SyncThread(conf);
+
         // instantiate the journal
         journal = new Journal(conf, ledgerDirsManager);
         ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager,
                                                      ledgerDirsManager, journal,
                                                      new BookieSafeEntryAdder());
+        syncThread = new SyncThread(conf, getLedgerDirsListener(),
+                                    ledgerStorage, journal);
+
         handles = new HandleFactoryImpl(ledgerStorage);
 
         // ZK ephemeral node for this Bookie.

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java?rev=1480580&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java Thu May  9 09:41:35 2013
@@ -0,0 +1,192 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SyncThread is a background thread which help checkpointing ledger storage
+ * when a checkpoint is requested. After a ledger storage is checkpointed,
+ * the journal files added before checkpoint will be garbage collected.
+ * <p>
+ * After all data has been persisted to ledger index files and entry
+ * loggers, it is safe to complete a checkpoint by persisting the log marker
+ * to disk. If bookie failed after persist log mark, bookie is able to relay
+ * journal entries started from last log mark without losing any entries.
+ * </p>
+ * <p>
+ * Those journal files whose id are less than the log id in last log mark,
+ * could be removed safely after persisting last log mark. We provide a
+ * setting to let user keeping number of old journal files which may be used
+ * for manual recovery in critical disaster.
+ * </p>
+ */
+class SyncThread extends Thread {
+    static Logger LOG = LoggerFactory.getLogger(SyncThread.class);
+
+    volatile boolean running = true;
+    // flag to ensure sync thread will not be interrupted during flush
+    final AtomicBoolean flushing = new AtomicBoolean(false);
+    final int flushInterval;
+    final LedgerStorage ledgerStorage;
+    final LedgerDirsListener dirsListener;
+    final CheckpointSource checkpointSource;
+
+    public SyncThread(ServerConfiguration conf,
+                      LedgerDirsListener dirsListener,
+                      LedgerStorage ledgerStorage,
+                      CheckpointSource checkpointSource) {
+        super("SyncThread");
+        this.dirsListener = dirsListener;
+        this.ledgerStorage = ledgerStorage;
+        this.checkpointSource = checkpointSource;
+
+        flushInterval = conf.getFlushInterval();
+        LOG.debug("Flush Interval : {}", flushInterval);
+    }
+
+    /**
+     * flush data up to given logMark and roll log if success
+     * @param checkpoint
+     */
+    @VisibleForTesting
+    public void checkpoint(Checkpoint checkpoint) {
+        boolean flushFailed = false;
+        try {
+            if (running) {
+                checkpoint = ledgerStorage.checkpoint(checkpoint);
+            } else {
+                ledgerStorage.flush();
+            }
+        } catch (NoWritableLedgerDirException e) {
+            LOG.error("No writeable ledger directories");
+            flushFailed = true;
+            flushing.set(false);
+            dirsListener.allDisksFull();
+        } catch (IOException e) {
+            LOG.error("Exception flushing Ledger", e);
+            flushFailed = true;
+        }
+
+        // if flush failed, we should not roll last mark, otherwise we would
+        // have some ledgers are not flushed and their journal entries were lost
+        if (!flushFailed) {
+            try {
+                checkpointSource.checkpointComplete(checkpoint, running);
+            } catch (IOException e) {
+                flushing.set(false);
+                LOG.error("Marking checkpoint as complete failed", e);
+                dirsListener.allDisksFull();
+            }
+        }
+    }
+
+    private Object suspensionLock = new Object();
+    private boolean suspended = false;
+
+    /**
+     * Suspend sync thread. (for testing)
+     */
+    @VisibleForTesting
+    public void suspendSync() {
+        synchronized(suspensionLock) {
+            suspended = true;
+        }
+    }
+
+    /**
+     * Resume sync thread. (for testing)
+     */
+    @VisibleForTesting
+    public void resumeSync() {
+        synchronized(suspensionLock) {
+            suspended = false;
+            suspensionLock.notify();
+        }
+    }
+
+    @Override
+    public void run() {
+        try {
+            while(running) {
+                synchronized (this) {
+                    try {
+                        wait(flushInterval);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        continue;
+                    }
+                }
+
+                synchronized (suspensionLock) {
+                    while (suspended) {
+                        try {
+                            suspensionLock.wait();
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            continue;
+                        }
+                    }
+                }
+
+                // try to mark flushing flag to check if interrupted
+                if (!flushing.compareAndSet(false, true)) {
+                    // set flushing flag failed, means flushing is true now
+                    // indicates another thread wants to interrupt sync thread to exit
+                    break;
+                }
+                checkpoint(checkpointSource.newCheckpoint());
+
+                flushing.set(false);
+            }
+        } catch (Throwable t) {
+            LOG.error("Exception in SyncThread", t);
+            flushing.set(false);
+            dirsListener.fatalError();
+        }
+    }
+
+    // shutdown sync thread
+    void shutdown() throws InterruptedException {
+        // Wake up and finish sync thread
+        running = false;
+        // make a checkpoint when shutdown
+        if (flushing.compareAndSet(false, true)) {
+            // it is safe to interrupt itself now
+            this.interrupt();
+        }
+        this.join();
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexCorruptionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexCorruptionTest.java?rev=1480580&r1=1480579&r2=1480580&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexCorruptionTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexCorruptionTest.java Thu May  9 09:41:35 2013
@@ -56,7 +56,7 @@ public class IndexCorruptionTest extends
     public void testNoSuchLedger() throws Exception {
         LOG.debug("Testing NoSuchLedger");
 
-        Bookie.SyncThread syncThread = bs.get(0).getBookie().syncThread;
+        SyncThread syncThread = bs.get(0).getBookie().syncThread;
         syncThread.suspendSync();
         // Create a ledger
         LedgerHandle lh = bkc.createLedger(1, 1, digestType, "".getBytes());
@@ -97,7 +97,7 @@ public class IndexCorruptionTest extends
     public void testEmptyIndexPage() throws Exception {
         LOG.debug("Testing EmptyIndexPage");
 
-        Bookie.SyncThread syncThread = bs.get(0).getBookie().syncThread;
+        SyncThread syncThread = bs.get(0).getBookie().syncThread;
         assertNotNull("Not found SyncThread.", syncThread);
 
         syncThread.suspendSync();