You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/02/01 21:17:55 UTC

git commit: FLUME-1891: Fast replay runs even when checkpoint exists

Updated Branches:
  refs/heads/trunk 001b38240 -> d203236b2


FLUME-1891: Fast replay runs even when checkpoint exists

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d203236b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d203236b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d203236b

Branch: refs/heads/trunk
Commit: d203236b21d350f13c8cb6bb1ef69f2aa5501dcb
Parents: 001b382
Author: Brock Noland <br...@apache.org>
Authored: Fri Feb 1 14:17:27 2013 -0600
Committer: Brock Noland <br...@apache.org>
Committed: Fri Feb 1 14:17:27 2013 -0600

----------------------------------------------------------------------
 .../org/apache/flume/channel/file/FileChannel.java |    6 ++
 .../java/org/apache/flume/channel/file/Log.java    |   24 +++++--
 .../flume/channel/file/TestFileChannelRestart.java |   53 +++++++++++++++
 3 files changed, 77 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d203236b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index d921387..d98209b 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
@@ -385,6 +386,11 @@ public class FileChannel extends BasicChannelSemantics {
     }
   }
 
+  @VisibleForTesting
+  boolean didFastReplay() {
+    return log.didFastReplay();
+  }
+
   public boolean isOpen() {
     return open;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/d203236b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 8a4201c..7da8c49 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -44,6 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import javax.annotation.Nullable;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
@@ -113,6 +114,7 @@ class Log {
   private String encryptionKeyAlias;
   private Key encryptionKey;
   private final long usableSpaceRefreshInterval;
+  private boolean didFastReplay = false;
 
   static class Builder {
     private long bCheckpointInterval;
@@ -340,17 +342,17 @@ class Log {
        */
       LogUtils.sort(dataFiles);
 
-      boolean useFastReplay = this.useFastReplay;
+      boolean shouldFastReplay = this.useFastReplay;
       /*
        * Read the checkpoint (in memory queue) from one of two alternating
        * locations. We will read the last one written to disk.
        */
       File checkpointFile = new File(checkpointDir, "checkpoint");
-      if(useFastReplay) {
+      if(shouldFastReplay) {
         if(checkpointFile.exists()) {
           LOGGER.debug("Disabling fast full replay because checkpoint " +
               "exists: " + checkpointFile);
-          useFastReplay = false;
+          shouldFastReplay = false;
         } else {
           LOGGER.debug("Not disabling fast full replay because checkpoint " +
               " does not exist: " + checkpointFile);
@@ -379,7 +381,7 @@ class Log {
          * but the inflights were not. If the checkpoint was bad, the backing
          * store factory would have thrown.
          */
-        doReplay(queue, dataFiles, encryptionKeyProvider);
+        doReplay(queue, dataFiles, encryptionKeyProvider, shouldFastReplay);
       } catch (BadCheckpointException ex) {
         LOGGER.warn("Checkpoint may not have completed successfully. "
                 + "Forcing full replay, this may take a while.", ex);
@@ -391,7 +393,10 @@ class Log {
                 queueCapacity, channelNameDescriptor);
         queue = new FlumeEventQueue(backingStore, inflightTakesFile,
                 inflightPutsFile);
-        doReplay(queue, dataFiles, encryptionKeyProvider);
+        // If the checkpoint was deleted due to BadCheckpointException, then
+        // trigger fast replay if the channel is configured to.
+        shouldFastReplay = this.useFastReplay;
+        doReplay(queue, dataFiles, encryptionKeyProvider, shouldFastReplay);
       }
 
 
@@ -419,10 +424,12 @@ class Log {
 
   @SuppressWarnings("deprecation")
   private void doReplay(FlumeEventQueue queue, List<File> dataFiles,
-          KeyProvider encryptionKeyProvider) throws Exception {
+                        KeyProvider encryptionKeyProvider,
+                        boolean useFastReplay) throws Exception {
     CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles,
             queue);
     if (useFastReplay && rebuilder.rebuild()) {
+      didFastReplay = true;
       LOGGER.info("Fast replay successful.");
     } else {
       ReplayHandler replayHandler = new ReplayHandler(queue,
@@ -437,6 +444,11 @@ class Log {
     }
   }
 
+  @VisibleForTesting
+  boolean didFastReplay() {
+    return didFastReplay;
+  }
+
   int getNextFileID() {
     Preconditions.checkState(open, "Log is closed");
     return nextFileID.get();

http://git-wip-us.apache.org/repos/asf/flume/blob/d203236b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
index ea57cdb..170dc72 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -299,6 +299,59 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     testCorruptInflights("inflightTakes");
   }
 
+  @Test
+  public void testFastReplayWithCheckpoint() throws Exception{
+    testFastReplay(false, true);
+  }
+
+  @Test
+  public void testFastReplayWithBadCheckpoint() throws Exception{
+    testFastReplay(true, true);
+  }
+
+  @Test
+  public void testNoFastReplayWithCheckpoint() throws Exception{
+    testFastReplay(false, false);
+  }
+
+  @Test
+  public void testNoFastReplayWithBadCheckpoint() throws Exception{
+    testFastReplay(true, false);
+  }
+
+  private void testFastReplay(boolean shouldCorruptCheckpoint,
+                             boolean useFastReplay) throws Exception{
+    Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.USE_FAST_REPLAY,
+      String.valueOf(useFastReplay));
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = putEvents(channel, "restart", 10, 100);
+    Assert.assertEquals(100, in.size());
+    forceCheckpoint(channel);
+    channel.stop();
+    if (shouldCorruptCheckpoint) {
+      File checkpoint = new File(checkpointDir, "checkpoint");
+      RandomAccessFile writer = new RandomAccessFile(
+        Serialization.getMetaDataFile(checkpoint), "rw");
+      writer.seek(10);
+      writer.writeLong(new Random().nextLong());
+      writer.getFD().sync();
+      writer.close();
+    }
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> out = consumeChannel(channel);
+    if (useFastReplay && shouldCorruptCheckpoint) {
+      Assert.assertTrue(channel.didFastReplay());
+    } else {
+      Assert.assertFalse(channel.didFastReplay());
+    }
+    compareInputAndOut(in, out);
+  }
+
   private void testCorruptInflights(String name) throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     channel = createFileChannel(overrides);