You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/04/14 00:58:13 UTC
flume git commit: FLUME-2595. Add option to checkpoint on file
channel shutdown
Repository: flume
Updated Branches:
refs/heads/trunk fc03456a1 -> be4ae294c
FLUME-2595. Add option to checkpoint on file channel shutdown
(Roshan Naik via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/be4ae294
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/be4ae294
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/be4ae294
Branch: refs/heads/trunk
Commit: be4ae294ca549648f785e7eea7564ee95112130a
Parents: fc03456
Author: Hari Shreedharan <hs...@apache.org>
Authored: Mon Apr 13 15:57:30 2015 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Mon Apr 13 15:57:30 2015 -0700
----------------------------------------------------------------------
.../apache/flume/channel/file/FileChannel.java | 5 ++
.../channel/file/FileChannelConfiguration.java | 3 +-
.../java/org/apache/flume/channel/file/Log.java | 22 ++++++-
.../org/apache/flume/channel/file/TestLog.java | 35 +++++++++++-
flume-ng-doc/sphinx/FlumeUserGuide.rst | 1 +
.../tools/TestFileChannelIntegrityTool.java | 60 +++++++++++++-------
6 files changed, 102 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/be4ae294/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index 61c353a..ed2b996 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -100,6 +100,7 @@ public class FileChannel extends BasicChannelSemantics {
private boolean compressBackupCheckpoint;
private boolean fsyncPerTransaction;
private int fsyncInterval;
+ private boolean checkpointOnClose = true;
@Override
public synchronized void setName(String name) {
@@ -251,6 +252,9 @@ public class FileChannel extends BasicChannelSemantics {
fsyncInterval = context.getInteger(FileChannelConfiguration
.FSYNC_INTERVAL, FileChannelConfiguration.DEFAULT_FSYNC_INTERVAL);
+ checkpointOnClose = context.getBoolean(FileChannelConfiguration
+ .CHKPT_ONCLOSE, FileChannelConfiguration.DEFAULT_CHKPT_ONCLOSE);
+
if(queueRemaining == null) {
queueRemaining = new Semaphore(capacity, true);
}
@@ -286,6 +290,7 @@ public class FileChannel extends BasicChannelSemantics {
builder.setBackupCheckpointDir(backupCheckpointDir);
builder.setFsyncPerTransaction(fsyncPerTransaction);
builder.setFsyncInterval(fsyncInterval);
+ builder.setCheckpointOnClose(checkpointOnClose);
log = builder.build();
log.replay();
open = true;
http://git-wip-us.apache.org/repos/asf/flume/blob/be4ae294/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
index f8c0378..5c3c48f 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
@@ -98,5 +98,6 @@ public class FileChannelConfiguration {
public static final String FSYNC_INTERVAL = "fsyncInterval";
public static final int DEFAULT_FSYNC_INTERVAL = 5; // seconds.
-
+ public static final String CHKPT_ONCLOSE = "checkpointOnClose";
+ public static final Boolean DEFAULT_CHKPT_ONCLOSE = true;
}
http://git-wip-us.apache.org/repos/asf/flume/blob/be4ae294/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 0e9171e..247c287 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -128,6 +128,7 @@ public class Log {
private final boolean fsyncPerTransaction;
private final int fsyncInterval;
+ private final boolean checkpointOnClose;
private int readCount;
private int putCount;
@@ -158,6 +159,8 @@ public class Log {
private boolean fsyncPerTransaction = true;
private int fsyncInterval;
+ private boolean checkpointOnClose = true;
+
boolean isFsyncPerTransaction() {
return fsyncPerTransaction;
}
@@ -254,13 +257,18 @@ public class Log {
return this;
}
+ Builder setCheckpointOnClose(boolean enableCheckpointOnClose) {
+ this.checkpointOnClose = enableCheckpointOnClose;
+ return this;
+ }
+
Log build() throws IOException {
return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
bUseDualCheckpoints, bCompressBackupCheckpoint,bCheckpointDir,
bBackupCheckpointDir, bName, useLogReplayV1, useFastReplay,
bMinimumRequiredSpace, bEncryptionKeyProvider, bEncryptionKeyAlias,
bEncryptionCipherProvider, bUsableSpaceRefreshInterval,
- fsyncPerTransaction, fsyncInterval, bLogDirs);
+ fsyncPerTransaction, fsyncInterval, checkpointOnClose, bLogDirs);
}
}
@@ -272,7 +280,7 @@ public class Log {
@Nullable String encryptionKeyAlias,
@Nullable String encryptionCipherProvider,
long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
- int fsyncInterval, File... logDirs)
+ int fsyncInterval, boolean checkpointOnClose, File... logDirs)
throws IOException {
Preconditions.checkArgument(checkpointInterval > 0,
"checkpointInterval <= 0");
@@ -352,6 +360,8 @@ public class Log {
this.logDirs = logDirs;
this.fsyncPerTransaction = fsyncPerTransaction;
this.fsyncInterval = fsyncInterval;
+ this.checkpointOnClose = checkpointOnClose;
+
logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length);
workerExecutor = Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name)
@@ -791,6 +801,14 @@ public class Log {
lockExclusive();
try {
open = false;
+ try {
+ if(checkpointOnClose) {
+ writeCheckpoint(true); // do this before acquiring exclusive lock
+ }
+ } catch (Exception err) {
+ LOGGER.warn("Failed creating checkpoint on close of channel " + channelNameDescriptor +
+ "Replay will take longer next time channel is started.", err);
+ }
shutdownWorker();
if (logFiles != null) {
for (int index = 0; index < logFiles.length(); index++) {
http://git-wip-us.apache.org/repos/asf/flume/blob/be4ae294/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
index f7f1afa..801d925 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
@@ -22,6 +22,8 @@ import static org.mockito.Mockito.*;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.nio.channels.*;
+import java.util.Collection;
import java.util.List;
import org.apache.commons.io.FileUtils;
@@ -35,6 +37,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
+import javax.ws.rs.Path;
+
public class TestLog {
private static final Logger LOGGER = LoggerFactory.getLogger(TestLog.class);
private static final long MAX_FILE_SIZE = 1000;
@@ -56,7 +60,7 @@ public class TestLog {
}
log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
- checkpointDir).setLogDirs(dataDirs)
+ checkpointDir).setLogDirs(dataDirs).setCheckpointOnClose(false)
.setChannelName("testlog").build();
log.replay();
}
@@ -465,6 +469,34 @@ public class TestLog {
Long.MAX_VALUE - 1L);
}
+ @Test
+ public void testCheckpointOnClose() throws Exception {
+ log.close();
+ log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
+ MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
+ checkpointDir).setLogDirs(dataDirs).setCheckpointOnClose(true)
+ .setChannelName("testLog").build();
+ log.replay();
+
+
+ // 1 Write One Event
+ FlumeEvent eventIn = TestUtils.newPersistableEvent();
+ log.put(transactionID, eventIn);
+ log.commitPut(transactionID);
+
+ // 2 Check state of checkpoint before close
+ File checkPointMetaFile =
+ FileUtils.listFiles(checkpointDir,new String[]{"meta"},false).iterator().next();
+ long before = FileUtils.checksumCRC32( checkPointMetaFile );
+
+ // 3 Close Log
+ log.close();
+
+ // 4 Verify that checkpoint was modified on close
+ long after = FileUtils.checksumCRC32( checkPointMetaFile );
+ Assert.assertFalse( before == after );
+ }
+
private void takeAndVerify(FlumeEventPointer eventPointerIn,
FlumeEvent eventIn)
throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
@@ -479,4 +511,5 @@ public class TestLog {
Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders());
Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody());
}
+
}
http://git-wip-us.apache.org/repos/asf/flume/blob/be4ae294/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 78b139e..43ca5db 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2638,6 +2638,7 @@ capacity 1000000
keep-alive 3 Amount of time (in sec) to wait for a put operation
use-log-replay-v1 false Expert: Use old replay logic
use-fast-replay false Expert: Replay without using queue
+checkpointOnClose true Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay.
encryption.activeKey -- Key name used to encrypt new data
encryption.cipherProvider -- Cipher provider type, supported types: AESCTRNOPADDING
encryption.keyProvider -- Key provider type, supported types: JCEKSFILE
http://git-wip-us.apache.org/repos/asf/flume/blob/be4ae294/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
----------------------------------------------------------------------
diff --git a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
index ac4dac4..a11126d 100644
--- a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
+++ b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
@@ -98,9 +98,20 @@ public class TestFileChannelIntegrityTool {
}
@Test
+ public void testFixCorruptRecordsWithCheckpoint() throws Exception {
+ doTestFixCorruptEvents(true);
+ }
+
+ @Test
+ public void testFixCorruptRecords() throws Exception {
+ doTestFixCorruptEvents(false);
+ }
+
+ @Test
public void testFixInvalidRecords() throws Exception {
doTestFixInvalidEvents(false, DummyEventVerifier.Builder.class.getName());
}
+
@Test
public void testFixInvalidRecordsWithCheckpoint() throws Exception {
doTestFixInvalidEvents(true, DummyEventVerifier.Builder.class.getName());
@@ -111,15 +122,24 @@ public class TestFileChannelIntegrityTool {
tool.run(new String[] {"-l", dataDir.toString(), "-e", eventHandler, "-DvalidatorValue=0"});
FileChannel channel = new FileChannel();
channel.setName("channel");
- String cp;
- if(withCheckpoint) {
- cp = origCheckpointDir.toString();
+ if (withCheckpoint) {
+ File[] cpFiles = origCheckpointDir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ if (name.contains("lock") || name.contains("queueset")) {
+ return false;
+ }
+ return true;
+ }
+ });
+ for (File cpFile : cpFiles) {
+ Serialization.copyFile(cpFile, new File(checkpointDir, cpFile.getName()));
+ }
} else {
FileUtils.deleteDirectory(checkpointDir);
Assert.assertTrue(checkpointDir.mkdirs());
- cp = checkpointDir.toString();
}
- ctx.put(FileChannelConfiguration.CHECKPOINT_DIR,cp);
+ ctx.put(FileChannelConfiguration.CHECKPOINT_DIR, checkpointDir.toString());
ctx.put(FileChannelConfiguration.DATA_DIRS, dataDir.toString());
channel.configure(ctx);
channel.start();
@@ -136,15 +156,6 @@ public class TestFileChannelIntegrityTool {
Assert.assertEquals(25 - invalidEvent, i);
}
- @Test
- public void testFixCorruptRecords() throws Exception {
- doTestFixCorruptEvents(false);
- }
- @Test
- public void testFixCorruptRecordsWithCheckpoint() throws Exception {
- doTestFixCorruptEvents(true);
- }
-
public void doTestFixCorruptEvents(boolean withCheckpoint) throws Exception {
Set<String> corruptFiles = new HashSet<String>();
File[] files = dataDir.listFiles(new FilenameFilter() {
@@ -193,18 +204,27 @@ public class TestFileChannelIntegrityTool {
}
FileChannelIntegrityTool tool = new FileChannelIntegrityTool();
- tool.run(new String[] {"-l", dataDir.toString()});
+ tool.run(new String[]{"-l", dataDir.toString()});
FileChannel channel = new FileChannel();
channel.setName("channel");
- String cp;
- if(withCheckpoint) {
- cp = origCheckpointDir.toString();
+ if (withCheckpoint) {
+ File[] cpFiles = origCheckpointDir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ if (name.contains("lock") || name.contains("queueset")) {
+ return false;
+ }
+ return true;
+ }
+ });
+ for (File cpFile : cpFiles) {
+ Serialization.copyFile(cpFile, new File(checkpointDir, cpFile.getName()));
+ }
} else {
FileUtils.deleteDirectory(checkpointDir);
Assert.assertTrue(checkpointDir.mkdirs());
- cp = checkpointDir.toString();
}
- ctx.put(FileChannelConfiguration.CHECKPOINT_DIR,cp);
+ ctx.put(FileChannelConfiguration.CHECKPOINT_DIR, checkpointDir.toString());
ctx.put(FileChannelConfiguration.DATA_DIRS, dataDir.toString());
channel.configure(ctx);
channel.start();