You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/02/01 21:17:55 UTC
git commit: FLUME-1891: Fast replay runs even when checkpoint exists
Updated Branches:
refs/heads/trunk 001b38240 -> d203236b2
FLUME-1891: Fast replay runs even when checkpoint exists
(Hari Shreedharan via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d203236b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d203236b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d203236b
Branch: refs/heads/trunk
Commit: d203236b21d350f13c8cb6bb1ef69f2aa5501dcb
Parents: 001b382
Author: Brock Noland <br...@apache.org>
Authored: Fri Feb 1 14:17:27 2013 -0600
Committer: Brock Noland <br...@apache.org>
Committed: Fri Feb 1 14:17:27 2013 -0600
----------------------------------------------------------------------
.../org/apache/flume/channel/file/FileChannel.java | 6 ++
.../java/org/apache/flume/channel/file/Log.java | 24 +++++--
.../flume/channel/file/TestFileChannelRestart.java | 53 +++++++++++++++
3 files changed, 77 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/d203236b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index d921387..d98209b 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
@@ -385,6 +386,11 @@ public class FileChannel extends BasicChannelSemantics {
}
}
+ @VisibleForTesting
+ boolean didFastReplay() {
+ return log.didFastReplay();
+ }
+
public boolean isOpen() {
return open;
}
http://git-wip-us.apache.org/repos/asf/flume/blob/d203236b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 8a4201c..7da8c49 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -44,6 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import javax.annotation.Nullable;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
@@ -113,6 +114,7 @@ class Log {
private String encryptionKeyAlias;
private Key encryptionKey;
private final long usableSpaceRefreshInterval;
+ private boolean didFastReplay = false;
static class Builder {
private long bCheckpointInterval;
@@ -340,17 +342,17 @@ class Log {
*/
LogUtils.sort(dataFiles);
- boolean useFastReplay = this.useFastReplay;
+ boolean shouldFastReplay = this.useFastReplay;
/*
* Read the checkpoint (in memory queue) from one of two alternating
* locations. We will read the last one written to disk.
*/
File checkpointFile = new File(checkpointDir, "checkpoint");
- if(useFastReplay) {
+ if(shouldFastReplay) {
if(checkpointFile.exists()) {
LOGGER.debug("Disabling fast full replay because checkpoint " +
"exists: " + checkpointFile);
- useFastReplay = false;
+ shouldFastReplay = false;
} else {
LOGGER.debug("Not disabling fast full replay because checkpoint " +
" does not exist: " + checkpointFile);
@@ -379,7 +381,7 @@ class Log {
* but the inflights were not. If the checkpoint was bad, the backing
* store factory would have thrown.
*/
- doReplay(queue, dataFiles, encryptionKeyProvider);
+ doReplay(queue, dataFiles, encryptionKeyProvider, shouldFastReplay);
} catch (BadCheckpointException ex) {
LOGGER.warn("Checkpoint may not have completed successfully. "
+ "Forcing full replay, this may take a while.", ex);
@@ -391,7 +393,10 @@ class Log {
queueCapacity, channelNameDescriptor);
queue = new FlumeEventQueue(backingStore, inflightTakesFile,
inflightPutsFile);
- doReplay(queue, dataFiles, encryptionKeyProvider);
+ // If the checkpoint was deleted due to BadCheckpointException, then
+ // trigger fast replay if the channel is configured to.
+ shouldFastReplay = this.useFastReplay;
+ doReplay(queue, dataFiles, encryptionKeyProvider, shouldFastReplay);
}
@@ -419,10 +424,12 @@ class Log {
@SuppressWarnings("deprecation")
private void doReplay(FlumeEventQueue queue, List<File> dataFiles,
- KeyProvider encryptionKeyProvider) throws Exception {
+ KeyProvider encryptionKeyProvider,
+ boolean useFastReplay) throws Exception {
CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles,
queue);
if (useFastReplay && rebuilder.rebuild()) {
+ didFastReplay = true;
LOGGER.info("Fast replay successful.");
} else {
ReplayHandler replayHandler = new ReplayHandler(queue,
@@ -437,6 +444,11 @@ class Log {
}
}
+ @VisibleForTesting
+ boolean didFastReplay() {
+ return didFastReplay;
+ }
+
int getNextFileID() {
Preconditions.checkState(open, "Log is closed");
return nextFileID.get();
http://git-wip-us.apache.org/repos/asf/flume/blob/d203236b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
index ea57cdb..170dc72 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -299,6 +299,59 @@ public class TestFileChannelRestart extends TestFileChannelBase {
testCorruptInflights("inflightTakes");
}
+ @Test
+ public void testFastReplayWithCheckpoint() throws Exception{
+ testFastReplay(false, true);
+ }
+
+ @Test
+ public void testFastReplayWithBadCheckpoint() throws Exception{
+ testFastReplay(true, true);
+ }
+
+ @Test
+ public void testNoFastReplayWithCheckpoint() throws Exception{
+ testFastReplay(false, false);
+ }
+
+ @Test
+ public void testNoFastReplayWithBadCheckpoint() throws Exception{
+ testFastReplay(true, false);
+ }
+
+ private void testFastReplay(boolean shouldCorruptCheckpoint,
+ boolean useFastReplay) throws Exception{
+ Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_FAST_REPLAY,
+ String.valueOf(useFastReplay));
+ channel = createFileChannel(overrides);
+ channel.start();
+ Assert.assertTrue(channel.isOpen());
+ Set<String> in = putEvents(channel, "restart", 10, 100);
+ Assert.assertEquals(100, in.size());
+ forceCheckpoint(channel);
+ channel.stop();
+ if (shouldCorruptCheckpoint) {
+ File checkpoint = new File(checkpointDir, "checkpoint");
+ RandomAccessFile writer = new RandomAccessFile(
+ Serialization.getMetaDataFile(checkpoint), "rw");
+ writer.seek(10);
+ writer.writeLong(new Random().nextLong());
+ writer.getFD().sync();
+ writer.close();
+ }
+ channel = createFileChannel(overrides);
+ channel.start();
+ Assert.assertTrue(channel.isOpen());
+ Set<String> out = consumeChannel(channel);
+ if (useFastReplay && shouldCorruptCheckpoint) {
+ Assert.assertTrue(channel.didFastReplay());
+ } else {
+ Assert.assertFalse(channel.didFastReplay());
+ }
+ compareInputAndOut(in, out);
+ }
+
private void testCorruptInflights(String name) throws Exception {
Map<String, String> overrides = Maps.newHashMap();
channel = createFileChannel(overrides);