You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2013/07/20 20:06:30 UTC
svn commit: r1505175 - in /zookeeper/bookkeeper/trunk: CHANGES.txt
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
Author: sijie
Date: Sat Jul 20 18:06:30 2013
New Revision: 1505175
URL: http://svn.apache.org/r1505175
Log:
BOOKKEEPER-610: Make SyncThread use an executor (ivank via sijie)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1505175&r1=1505174&r2=1505175&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat Jul 20 18:06:30 2013
@@ -118,6 +118,8 @@ Trunk (unreleased changes)
BOOKKEEPER-618: Better resolution of bookie address (ivank via fpj)
+ BOOKKEEPER-610: Make SyncThread use an executor (ivank via sijie)
+
NEW FEATURE:
BOOKKEEPER-562: Ability to tell if a ledger is closed or not (fpj)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java?rev=1505175&r1=1505174&r2=1505175&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java Sat Jul 20 18:06:30 2013
@@ -21,8 +21,12 @@
package org.apache.bookkeeper.bookie;
-import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -53,12 +57,10 @@ import org.slf4j.LoggerFactory;
* for manual recovery in critical disaster.
* </p>
*/
-class SyncThread extends Thread {
+class SyncThread {
static Logger LOG = LoggerFactory.getLogger(SyncThread.class);
- volatile boolean running = true;
- // flag to ensure sync thread will not be interrupted during flush
- final AtomicBoolean flushing = new AtomicBoolean(false);
+ final ScheduledExecutorService executor;
final int flushInterval;
final LedgerStorage ledgerStorage;
final LedgerDirsListener dirsListener;
@@ -68,48 +70,78 @@ class SyncThread extends Thread {
LedgerDirsListener dirsListener,
LedgerStorage ledgerStorage,
CheckpointSource checkpointSource) {
- super("SyncThread");
this.dirsListener = dirsListener;
this.ledgerStorage = ledgerStorage;
this.checkpointSource = checkpointSource;
-
+ ThreadFactoryBuilder tfb = new ThreadFactoryBuilder()
+ .setNameFormat("SyncThread-" + conf.getBookiePort() + "-%d");
+ this.executor = Executors.newSingleThreadScheduledExecutor(tfb.build());
flushInterval = conf.getFlushInterval();
LOG.debug("Flush Interval : {}", flushInterval);
}
- /**
- * flush data up to given logMark and roll log if success
- * @param checkpoint
- */
+ void start() {
+ executor.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ try {
+ synchronized (suspensionLock) {
+ while (suspended) {
+ try {
+ suspensionLock.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ continue;
+ }
+ }
+ }
+ checkpoint(checkpointSource.newCheckpoint());
+ } catch (Throwable t) {
+ LOG.error("Exception in SyncThread", t);
+ dirsListener.fatalError();
+ }
+ }
+ }, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
+ }
+
+ private void flush() {
+ Checkpoint checkpoint = checkpointSource.newCheckpoint();
+ try {
+ ledgerStorage.flush();
+ } catch (NoWritableLedgerDirException e) {
+ LOG.error("No writeable ledger directories", e);
+ dirsListener.allDisksFull();
+ return;
+ } catch (IOException e) {
+ LOG.error("Exception flushing ledgers", e);
+ return;
+ }
+
+ try {
+ checkpointSource.checkpointComplete(checkpoint, false);
+ } catch (IOException e) {
+ LOG.error("Exception marking checkpoint as complete", e);
+ dirsListener.allDisksFull();
+ }
+ }
+
@VisibleForTesting
public void checkpoint(Checkpoint checkpoint) {
- boolean flushFailed = false;
try {
- if (running) {
- checkpoint = ledgerStorage.checkpoint(checkpoint);
- } else {
- ledgerStorage.flush();
- }
+ checkpoint = ledgerStorage.checkpoint(checkpoint);
} catch (NoWritableLedgerDirException e) {
- LOG.error("No writeable ledger directories");
- flushFailed = true;
- flushing.set(false);
+ LOG.error("No writeable ledger directories", e);
dirsListener.allDisksFull();
+ return;
} catch (IOException e) {
- LOG.error("Exception flushing Ledger", e);
- flushFailed = true;
+ LOG.error("Exception flushing ledgers", e);
+ return;
}
- // if flush failed, we should not roll last mark, otherwise we would
- // have some ledgers are not flushed and their journal entries were lost
- if (!flushFailed) {
- try {
- checkpointSource.checkpointComplete(checkpoint, running);
- } catch (IOException e) {
- flushing.set(false);
- LOG.error("Marking checkpoint as complete failed", e);
- dirsListener.allDisksFull();
- }
+ try {
+ checkpointSource.checkpointComplete(checkpoint, true);
+ } catch (IOException e) {
+ LOG.error("Exception marking checkpoint as complete", e);
+ dirsListener.allDisksFull();
}
}
@@ -137,56 +169,23 @@ class SyncThread extends Thread {
}
}
- @Override
- public void run() {
- try {
- while(running) {
- synchronized (this) {
+ // shutdown sync thread
+ void shutdown() throws InterruptedException {
+ executor.submit(new Runnable() {
+ public void run() {
try {
- wait(flushInterval);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- continue;
+ flush();
+ } catch (Throwable t) {
+ LOG.error("Exception flushing ledgers at shutdown", t);
}
}
-
- synchronized (suspensionLock) {
- while (suspended) {
- try {
- suspensionLock.wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- continue;
- }
- }
- }
-
- // try to mark flushing flag to check if interrupted
- if (!flushing.compareAndSet(false, true)) {
- // set flushing flag failed, means flushing is true now
- // indicates another thread wants to interrupt sync thread to exit
- break;
- }
- checkpoint(checkpointSource.newCheckpoint());
-
- flushing.set(false);
- }
- } catch (Throwable t) {
- LOG.error("Exception in SyncThread", t);
- flushing.set(false);
- dirsListener.fatalError();
- }
- }
-
- // shutdown sync thread
- void shutdown() throws InterruptedException {
- // Wake up and finish sync thread
- running = false;
- // make a checkpoint when shutdown
- if (flushing.compareAndSet(false, true)) {
- // it is safe to interrupt itself now
- this.interrupt();
+ });
+ executor.shutdown();
+ long start = MathUtils.now();
+ while (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
+ long now = MathUtils.now();
+ LOG.info("SyncThread taking a long time to shutdown. Has taken {}"
+ + " seconds so far", now - start);
}
- this.join();
}
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java?rev=1505175&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java Sat Jul 20 18:06:30 2013
@@ -0,0 +1,340 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+
+import org.apache.bookkeeper.jmx.BKMBeanInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+
+import static org.junit.Assert.*;
+
+public class TestSyncThread {
+ static Logger LOG = LoggerFactory.getLogger(TestSyncThread.class);
+
+ ExecutorService executor = null;
+
+ @Before
+ public void setupExecutor() {
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @After
+ public void teardownExecutor() {
+ if (executor != null) {
+ executor.shutdownNow();
+ executor = null;
+ }
+ }
+
+ /**
+ * Test that if a flush is taking a long time,
+ * the sync thread will not shutdown until it
+ * has finished.
+ */
+ @Test(timeout=60000)
+ public void testSyncThreadLongShutdown() throws Exception {
+ int flushInterval = 100;
+ ServerConfiguration conf = new ServerConfiguration().setFlushInterval(flushInterval);
+ CheckpointSource checkpointSource = new DummyCheckpointSource();
+ LedgerDirsListener listener = new DummyLedgerDirsListener();
+
+ final CountDownLatch checkpointCalledLatch = new CountDownLatch(1);
+ final CountDownLatch checkpointLatch = new CountDownLatch(1);
+
+ final CountDownLatch flushCalledLatch = new CountDownLatch(1);
+ final CountDownLatch flushLatch = new CountDownLatch(1);
+ final AtomicBoolean failedSomewhere = new AtomicBoolean(false);
+ LedgerStorage storage = new DummyLedgerStorage() {
+ @Override
+ public void flush() throws IOException {
+ flushCalledLatch.countDown();
+ try {
+ flushLatch.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.error("Interrupted in flush thread", ie);
+ failedSomewhere.set(true);
+ }
+ }
+
+ @Override
+ public Checkpoint checkpoint(Checkpoint checkpoint)
+ throws IOException {
+ checkpointCalledLatch.countDown();
+ try {
+ checkpointLatch.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.error("Interrupted in checkpoint thread", ie);
+ failedSomewhere.set(true);
+ }
+ return checkpoint;
+ }
+ };
+
+ final SyncThread t = new SyncThread(conf, listener, storage, checkpointSource);
+ t.start();
+ assertTrue("Checkpoint should have been called",
+ checkpointCalledLatch.await(10, TimeUnit.SECONDS));
+ Future<Boolean> done = executor.submit(new Callable<Boolean>() {
+ public Boolean call() {
+ try {
+ t.shutdown();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.error("Interrupted shutting down sync thread", ie);
+ failedSomewhere.set(true);
+ return false;
+ }
+ return true;
+ }
+ });
+ checkpointLatch.countDown();
+ assertFalse("Shutdown shouldn't have finished", done.isDone());
+ assertTrue("Flush should have been called",
+ flushCalledLatch.await(10, TimeUnit.SECONDS));
+
+ assertFalse("Shutdown shouldn't have finished", done.isDone());
+ flushLatch.countDown();
+
+ assertTrue("Shutdown should have finished successfully", done.get(10, TimeUnit.SECONDS));
+ assertFalse("Shouldn't have failed anywhere", failedSomewhere.get());
+ }
+
+ /**
+ * Test that sync thread suspension works.
+ * i.e. when we suspend the syncthread, nothing
+ * will be synced.
+ */
+ @Test(timeout=60000)
+ public void testSyncThreadSuspension() throws Exception {
+ int flushInterval = 100;
+ ServerConfiguration conf = new ServerConfiguration().setFlushInterval(flushInterval);
+ CheckpointSource checkpointSource = new DummyCheckpointSource();
+ LedgerDirsListener listener = new DummyLedgerDirsListener();
+
+ final AtomicInteger checkpointCount = new AtomicInteger(0);
+ LedgerStorage storage = new DummyLedgerStorage() {
+ @Override
+ public Checkpoint checkpoint(Checkpoint checkpoint)
+ throws IOException {
+ checkpointCount.incrementAndGet();
+ return checkpoint;
+ }
+ };
+ final SyncThread t = new SyncThread(conf, listener, storage, checkpointSource);
+ t.start();
+ while (checkpointCount.get() == 0) {
+ Thread.sleep(flushInterval);
+ }
+ t.suspendSync();
+ Thread.sleep(flushInterval);
+ int count = checkpointCount.get();
+ for (int i = 0; i < 10; i++) {
+ assertEquals("Checkpoint count shouldn't change", count, checkpointCount.get());
+ }
+ t.resumeSync();
+ int i = 0;
+ while (checkpointCount.get() == count) {
+ Thread.sleep(flushInterval);
+ i++;
+ if (i > 100) {
+ fail("Checkpointing never resumed");
+ }
+ }
+ t.shutdown();
+ }
+
+ /**
+ * Test that if the ledger storage throws a
+ * runtime exception, the bookie will be told
+ * to shutdown.
+ */
+ @Test(timeout=60000)
+ public void testSyncThreadShutdownOnError() throws Exception {
+ int flushInterval = 100;
+ ServerConfiguration conf = new ServerConfiguration().setFlushInterval(flushInterval);
+ CheckpointSource checkpointSource = new DummyCheckpointSource();
+ final CountDownLatch fatalLatch = new CountDownLatch(1);
+ LedgerDirsListener listener = new DummyLedgerDirsListener() {
+ @Override
+ public void fatalError() {
+ fatalLatch.countDown();
+ }
+ };
+
+ LedgerStorage storage = new DummyLedgerStorage() {
+ @Override
+ public Checkpoint checkpoint(Checkpoint checkpoint)
+ throws IOException {
+ throw new RuntimeException("Fatal error in sync thread");
+ }
+ };
+ final SyncThread t = new SyncThread(conf, listener, storage, checkpointSource);
+ t.start();
+ assertTrue("Should have called fatal error", fatalLatch.await(10, TimeUnit.SECONDS));
+ t.shutdown();
+ }
+
+ /**
+ * Test that if the ledger storage throws
+ * a disk full exception, the owner of the sync
+ * thread will be notified.
+ */
+ @Test(timeout=60000)
+ public void testSyncThreadDisksFull() throws Exception {
+ int flushInterval = 100;
+ ServerConfiguration conf = new ServerConfiguration().setFlushInterval(flushInterval);
+ CheckpointSource checkpointSource = new DummyCheckpointSource();
+ final CountDownLatch diskFullLatch = new CountDownLatch(1);
+ LedgerDirsListener listener = new DummyLedgerDirsListener() {
+ @Override
+ public void allDisksFull() {
+ diskFullLatch.countDown();
+ }
+ };
+
+ LedgerStorage storage = new DummyLedgerStorage() {
+ @Override
+ public Checkpoint checkpoint(Checkpoint checkpoint)
+ throws IOException {
+ throw new NoWritableLedgerDirException("Disk full error in sync thread");
+ }
+ };
+ final SyncThread t = new SyncThread(conf, listener, storage, checkpointSource);
+ t.start();
+ assertTrue("Should have disk full error", diskFullLatch.await(10, TimeUnit.SECONDS));
+ t.shutdown();
+ }
+
+ private static class DummyCheckpointSource implements CheckpointSource {
+ @Override
+ public Checkpoint newCheckpoint() {
+ return Checkpoint.MAX;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
+ throws IOException {
+ }
+ }
+ private static class DummyLedgerStorage implements LedgerStorage {
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void shutdown() throws InterruptedException {
+ }
+
+ @Override
+ public boolean ledgerExists(long ledgerId) throws IOException {
+ return true;
+ }
+
+ @Override
+ public boolean setFenced(long ledgerId) throws IOException {
+ return true;
+ }
+
+ @Override
+ public boolean isFenced(long ledgerId) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void setMasterKey(long ledgerId, byte[] masterKey)
+ throws IOException {
+ }
+
+ @Override
+ public byte[] readMasterKey(long ledgerId)
+ throws IOException, BookieException {
+ return new byte[0];
+ }
+
+ @Override
+ public long addEntry(ByteBuffer entry) throws IOException {
+ return 1L;
+ }
+
+ @Override
+ public ByteBuffer getEntry(long ledgerId, long entryId)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ }
+
+ @Override
+ public Checkpoint checkpoint(Checkpoint checkpoint)
+ throws IOException {
+ return checkpoint;
+ }
+
+ @Override
+ public BKMBeanInfo getJMXBean() { return null; }
+ }
+
+ private static class DummyLedgerDirsListener
+ implements LedgerDirsManager.LedgerDirsListener {
+ @Override
+ public void diskFailed(File disk) {
+ }
+
+ @Override
+ public void diskFull(File disk) {
+ }
+
+ @Override
+ public void allDisksFull() {
+ }
+
+ @Override
+ public void fatalError() {
+ }
+ }
+}