You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2013/07/20 20:06:30 UTC

svn commit: r1505175 - in /zookeeper/bookkeeper/trunk: CHANGES.txt bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java

Author: sijie
Date: Sat Jul 20 18:06:30 2013
New Revision: 1505175

URL: http://svn.apache.org/r1505175
Log:
BOOKKEEPER-610: Make SyncThread use an executor (ivank via sijie)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1505175&r1=1505174&r2=1505175&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat Jul 20 18:06:30 2013
@@ -118,6 +118,8 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-618: Better resolution of bookie address (ivank via fpj)
 
+      BOOKKEEPER-610: Make SyncThread use an executor (ivank via sijie)
+
     NEW FEATURE:
 
       BOOKKEEPER-562: Ability to tell if a ledger is closed or not (fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java?rev=1505175&r1=1505174&r2=1505175&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java Sat Jul 20 18:06:30 2013
@@ -21,8 +21,12 @@
 
 package org.apache.bookkeeper.bookie;
 
-import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Executors;
 
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 
@@ -53,12 +57,10 @@ import org.slf4j.LoggerFactory;
  * for manual recovery in critical disaster.
  * </p>
  */
-class SyncThread extends Thread {
+class SyncThread {
     static Logger LOG = LoggerFactory.getLogger(SyncThread.class);
 
-    volatile boolean running = true;
-    // flag to ensure sync thread will not be interrupted during flush
-    final AtomicBoolean flushing = new AtomicBoolean(false);
+    final ScheduledExecutorService executor;
     final int flushInterval;
     final LedgerStorage ledgerStorage;
     final LedgerDirsListener dirsListener;
@@ -68,48 +70,78 @@ class SyncThread extends Thread {
                       LedgerDirsListener dirsListener,
                       LedgerStorage ledgerStorage,
                       CheckpointSource checkpointSource) {
-        super("SyncThread");
         this.dirsListener = dirsListener;
         this.ledgerStorage = ledgerStorage;
         this.checkpointSource = checkpointSource;
-
+        ThreadFactoryBuilder tfb = new ThreadFactoryBuilder()
+            .setNameFormat("SyncThread-" + conf.getBookiePort() + "-%d");
+        this.executor = Executors.newSingleThreadScheduledExecutor(tfb.build());
         flushInterval = conf.getFlushInterval();
         LOG.debug("Flush Interval : {}", flushInterval);
     }
 
-    /**
-     * flush data up to given logMark and roll log if success
-     * @param checkpoint
-     */
+    void start() {
+        executor.scheduleAtFixedRate(new Runnable() {
+                public void run() {
+                    try {
+                        synchronized (suspensionLock) {
+                            while (suspended) {
+                                try {
+                                    suspensionLock.wait();
+                                } catch (InterruptedException e) {
+                                    Thread.currentThread().interrupt();
+                                    continue;
+                                }
+                            }
+                        }
+                        checkpoint(checkpointSource.newCheckpoint());
+                    } catch (Throwable t) {
+                        LOG.error("Exception in SyncThread", t);
+                        dirsListener.fatalError();
+                    }
+                }
+            }, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
+    }
+
+    private void flush() {
+        Checkpoint checkpoint = checkpointSource.newCheckpoint();
+        try {
+            ledgerStorage.flush();
+        } catch (NoWritableLedgerDirException e) {
+            LOG.error("No writeable ledger directories", e);
+            dirsListener.allDisksFull();
+            return;
+        } catch (IOException e) {
+            LOG.error("Exception flushing ledgers", e);
+            return;
+        }
+
+        try {
+            checkpointSource.checkpointComplete(checkpoint, false);
+        } catch (IOException e) {
+            LOG.error("Exception marking checkpoint as complete", e);
+            dirsListener.allDisksFull();
+        }
+    }
+
     @VisibleForTesting
     public void checkpoint(Checkpoint checkpoint) {
-        boolean flushFailed = false;
         try {
-            if (running) {
-                checkpoint = ledgerStorage.checkpoint(checkpoint);
-            } else {
-                ledgerStorage.flush();
-            }
+            checkpoint = ledgerStorage.checkpoint(checkpoint);
         } catch (NoWritableLedgerDirException e) {
-            LOG.error("No writeable ledger directories");
-            flushFailed = true;
-            flushing.set(false);
+            LOG.error("No writeable ledger directories", e);
             dirsListener.allDisksFull();
+            return;
         } catch (IOException e) {
-            LOG.error("Exception flushing Ledger", e);
-            flushFailed = true;
+            LOG.error("Exception flushing ledgers", e);
+            return;
         }
 
-        // if flush failed, we should not roll last mark, otherwise we would
-        // have some ledgers are not flushed and their journal entries were lost
-        if (!flushFailed) {
-            try {
-                checkpointSource.checkpointComplete(checkpoint, running);
-            } catch (IOException e) {
-                flushing.set(false);
-                LOG.error("Marking checkpoint as complete failed", e);
-                dirsListener.allDisksFull();
-            }
+        try {
+            checkpointSource.checkpointComplete(checkpoint, true);
+        } catch (IOException e) {
+            LOG.error("Exception marking checkpoint as complete", e);
+            dirsListener.allDisksFull();
         }
     }
 
@@ -137,56 +169,23 @@ class SyncThread extends Thread {
         }
     }
 
-    @Override
-    public void run() {
-        try {
-            while(running) {
-                synchronized (this) {
+    // shutdown sync thread
+    void shutdown() throws InterruptedException {
+        executor.submit(new Runnable() {
+                public void run() {
                     try {
-                        wait(flushInterval);
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        continue;
+                        flush();
+                    } catch (Throwable t) {
+                        LOG.error("Exception flushing ledgers at shutdown", t);
                     }
                 }
-
-                synchronized (suspensionLock) {
-                    while (suspended) {
-                        try {
-                            suspensionLock.wait();
-                        } catch (InterruptedException e) {
-                            Thread.currentThread().interrupt();
-                            continue;
-                        }
-                    }
-                }
-
-                // try to mark flushing flag to check if interrupted
-                if (!flushing.compareAndSet(false, true)) {
-                    // set flushing flag failed, means flushing is true now
-                    // indicates another thread wants to interrupt sync thread to exit
-                    break;
-                }
-                checkpoint(checkpointSource.newCheckpoint());
-
-                flushing.set(false);
-            }
-        } catch (Throwable t) {
-            LOG.error("Exception in SyncThread", t);
-            flushing.set(false);
-            dirsListener.fatalError();
-        }
-    }
-
-    // shutdown sync thread
-    void shutdown() throws InterruptedException {
-        // Wake up and finish sync thread
-        running = false;
-        // make a checkpoint when shutdown
-        if (flushing.compareAndSet(false, true)) {
-            // it is safe to interrupt itself now
-            this.interrupt();
+            });
+        executor.shutdown();
+        long start = MathUtils.now();
+        while (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
+            long now = MathUtils.now();
+            LOG.info("SyncThread taking a long time to shutdown. Has taken {}"
+                    + " seconds so far", now - start);
         }
-        this.join();
     }
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java?rev=1505175&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java Sat Jul 20 18:06:30 2013
@@ -0,0 +1,340 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+
+import org.apache.bookkeeper.jmx.BKMBeanInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+
+import static org.junit.Assert.*;
+
+public class TestSyncThread {
+    static Logger LOG = LoggerFactory.getLogger(TestSyncThread.class);
+
+    ExecutorService executor = null;
+
+    @Before
+    public void setupExecutor() {
+        executor = Executors.newSingleThreadExecutor();
+    }
+
+    @After
+    public void teardownExecutor() {
+        if (executor != null) {
+            executor.shutdownNow();
+            executor = null;
+        }
+    }
+
+    /**
+     * Test that if a flush is taking a long time,
+     * the sync thread will not shutdown until it
+     * has finished.
+     */
+    @Test(timeout=60000)
+    public void testSyncThreadLongShutdown() throws Exception {
+        int flushInterval = 100;
+        ServerConfiguration conf = new ServerConfiguration().setFlushInterval(flushInterval);
+        CheckpointSource checkpointSource = new DummyCheckpointSource();
+        LedgerDirsListener listener = new DummyLedgerDirsListener();
+
+        final CountDownLatch checkpointCalledLatch = new CountDownLatch(1);
+        final CountDownLatch checkpointLatch = new CountDownLatch(1);
+
+        final CountDownLatch flushCalledLatch = new CountDownLatch(1);
+        final CountDownLatch flushLatch = new CountDownLatch(1);
+        final AtomicBoolean failedSomewhere = new AtomicBoolean(false);
+        LedgerStorage storage = new DummyLedgerStorage() {
+                @Override
+                public void flush() throws IOException {
+                    flushCalledLatch.countDown();
+                    try {
+                        flushLatch.await();
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        LOG.error("Interrupted in flush thread", ie);
+                        failedSomewhere.set(true);
+                    }
+                }
+
+                @Override
+                public Checkpoint checkpoint(Checkpoint checkpoint)
+                        throws IOException {
+                    checkpointCalledLatch.countDown();
+                    try {
+                        checkpointLatch.await();
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        LOG.error("Interrupted in checkpoint thread", ie);
+                        failedSomewhere.set(true);
+                    }
+                    return checkpoint;
+                }
+            };
+
+        final SyncThread t = new SyncThread(conf, listener, storage, checkpointSource);
+        t.start();
+        assertTrue("Checkpoint should have been called",
+                   checkpointCalledLatch.await(10, TimeUnit.SECONDS));
+        Future<Boolean> done = executor.submit(new Callable<Boolean>() {
+                public Boolean call() {
+                    try {
+                        t.shutdown();
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        LOG.error("Interrupted shutting down sync thread", ie);
+                        failedSomewhere.set(true);
+                        return false;
+                    }
+                    return true;
+                }
+            });
+        checkpointLatch.countDown();
+        assertFalse("Shutdown shouldn't have finished", done.isDone());
+        assertTrue("Flush should have been called",
+                   flushCalledLatch.await(10, TimeUnit.SECONDS));
+
+        assertFalse("Shutdown shouldn't have finished", done.isDone());
+        flushLatch.countDown();
+
+        assertTrue("Shutdown should have finished successfully", done.get(10, TimeUnit.SECONDS));
+        assertFalse("Shouldn't have failed anywhere", failedSomewhere.get());
+    }
+
+    /**
+     * Test that sync thread suspension works.
+     * i.e. when we suspend the syncthread, nothing
+     * will be synced.
+     */
+    @Test(timeout=60000)
+    public void testSyncThreadSuspension() throws Exception {
+        int flushInterval = 100;
+        ServerConfiguration conf = new ServerConfiguration().setFlushInterval(flushInterval);
+        CheckpointSource checkpointSource = new DummyCheckpointSource();
+        LedgerDirsListener listener = new DummyLedgerDirsListener();
+
+        final AtomicInteger checkpointCount = new AtomicInteger(0);
+        LedgerStorage storage = new DummyLedgerStorage() {
+                @Override
+                public Checkpoint checkpoint(Checkpoint checkpoint)
+                        throws IOException {
+                    checkpointCount.incrementAndGet();
+                    return checkpoint;
+                }
+            };
+        final SyncThread t = new SyncThread(conf, listener, storage, checkpointSource);
+        t.start();
+        while (checkpointCount.get() == 0) {
+            Thread.sleep(flushInterval);
+        }
+        t.suspendSync();
+        Thread.sleep(flushInterval);
+        int count = checkpointCount.get();
+        for (int i = 0; i < 10; i++) {
+            assertEquals("Checkpoint count shouldn't change", count, checkpointCount.get());
+        }
+        t.resumeSync();
+        int i = 0;
+        while (checkpointCount.get() == count) {
+            Thread.sleep(flushInterval);
+            i++;
+            if (i > 100) {
+                fail("Checkpointing never resumed");
+            }
+        }
+        t.shutdown();
+    }
+
+    /**
+     * Test that if the ledger storage throws a
+     * runtime exception, the bookie will be told
+     * to shutdown.
+     */
+    @Test(timeout=60000)
+    public void testSyncThreadShutdownOnError() throws Exception {
+        int flushInterval = 100;
+        ServerConfiguration conf = new ServerConfiguration().setFlushInterval(flushInterval);
+        CheckpointSource checkpointSource = new DummyCheckpointSource();
+        final CountDownLatch fatalLatch = new CountDownLatch(1);
+        LedgerDirsListener listener = new DummyLedgerDirsListener() {
+                @Override
+                public void fatalError() {
+                    fatalLatch.countDown();
+                }
+            };
+
+        LedgerStorage storage = new DummyLedgerStorage() {
+                @Override
+                public Checkpoint checkpoint(Checkpoint checkpoint)
+                        throws IOException {
+                    throw new RuntimeException("Fatal error in sync thread");
+                }
+            };
+        final SyncThread t = new SyncThread(conf, listener, storage, checkpointSource);
+        t.start();
+        assertTrue("Should have called fatal error", fatalLatch.await(10, TimeUnit.SECONDS));
+        t.shutdown();
+    }
+
+    /**
+     * Test that if the ledger storage throws
+     * a disk full exception, the owner of the sync
+     * thread will be notified.
+     */
+    @Test(timeout=60000)
+    public void testSyncThreadDisksFull() throws Exception {
+        int flushInterval = 100;
+        ServerConfiguration conf = new ServerConfiguration().setFlushInterval(flushInterval);
+        CheckpointSource checkpointSource = new DummyCheckpointSource();
+        final CountDownLatch diskFullLatch = new CountDownLatch(1);
+        LedgerDirsListener listener = new DummyLedgerDirsListener() {
+                @Override
+                public void allDisksFull() {
+                    diskFullLatch.countDown();
+                }
+            };
+
+        LedgerStorage storage = new DummyLedgerStorage() {
+                @Override
+                public Checkpoint checkpoint(Checkpoint checkpoint)
+                        throws IOException {
+                    throw new NoWritableLedgerDirException("Disk full error in sync thread");
+                }
+            };
+        final SyncThread t = new SyncThread(conf, listener, storage, checkpointSource);
+        t.start();
+        assertTrue("Should have disk full error", diskFullLatch.await(10, TimeUnit.SECONDS));
+        t.shutdown();
+    }
+
+    private static class DummyCheckpointSource implements CheckpointSource {
+        @Override
+        public Checkpoint newCheckpoint() {
+            return Checkpoint.MAX;
+        }
+
+        @Override
+        public void checkpointComplete(Checkpoint checkpoint, boolean compact)
+                throws IOException {
+        }
+    }
+    private static class DummyLedgerStorage implements LedgerStorage {
+        @Override
+        public void start() {
+        }
+
+        @Override
+        public void shutdown() throws InterruptedException {
+        }
+
+        @Override
+        public boolean ledgerExists(long ledgerId) throws IOException {
+            return true;
+        }
+
+        @Override
+        public boolean setFenced(long ledgerId) throws IOException {
+            return true;
+        }
+
+        @Override
+        public boolean isFenced(long ledgerId) throws IOException {
+            return false;
+        }
+
+        @Override
+        public void setMasterKey(long ledgerId, byte[] masterKey)
+                throws IOException {
+        }
+
+        @Override
+        public byte[] readMasterKey(long ledgerId)
+                throws IOException, BookieException {
+            return new byte[0];
+        }
+
+        @Override
+        public long addEntry(ByteBuffer entry) throws IOException {
+            return 1L;
+        }
+
+        @Override
+        public ByteBuffer getEntry(long ledgerId, long entryId)
+                throws IOException {
+            return null;
+        }
+
+        @Override
+        public void flush() throws IOException {
+        }
+
+        @Override
+        public Checkpoint checkpoint(Checkpoint checkpoint)
+                throws IOException {
+            return checkpoint;
+        }
+
+        @Override
+        public BKMBeanInfo getJMXBean() { return null; }
+    }
+
+    private static class DummyLedgerDirsListener
+        implements LedgerDirsManager.LedgerDirsListener {
+        @Override
+        public void diskFailed(File disk) {
+        }
+
+        @Override
+        public void diskFull(File disk) {
+        }
+
+        @Override
+        public void allDisksFull() {
+        }
+
+        @Override
+        public void fatalError() {
+        }
+    }
+}