You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/04/14 00:58:13 UTC

flume git commit: FLUME-2595. Add option to checkpoint on file channel shutdown

Repository: flume
Updated Branches:
  refs/heads/trunk fc03456a1 -> be4ae294c


FLUME-2595. Add option to checkpoint on file channel shutdown

(Roshan Naik via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/be4ae294
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/be4ae294
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/be4ae294

Branch: refs/heads/trunk
Commit: be4ae294ca549648f785e7eea7564ee95112130a
Parents: fc03456
Author: Hari Shreedharan <hs...@apache.org>
Authored: Mon Apr 13 15:57:30 2015 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Mon Apr 13 15:57:30 2015 -0700

----------------------------------------------------------------------
 .../apache/flume/channel/file/FileChannel.java  |  5 ++
 .../channel/file/FileChannelConfiguration.java  |  3 +-
 .../java/org/apache/flume/channel/file/Log.java | 22 ++++++-
 .../org/apache/flume/channel/file/TestLog.java  | 35 +++++++++++-
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  1 +
 .../tools/TestFileChannelIntegrityTool.java     | 60 +++++++++++++-------
 6 files changed, 102 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/be4ae294/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index 61c353a..ed2b996 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -100,6 +100,7 @@ public class FileChannel extends BasicChannelSemantics {
   private boolean compressBackupCheckpoint;
   private boolean fsyncPerTransaction;
   private int fsyncInterval;
+  private boolean checkpointOnClose = true;
 
   @Override
   public synchronized void setName(String name) {
@@ -251,6 +252,9 @@ public class FileChannel extends BasicChannelSemantics {
     fsyncInterval = context.getInteger(FileChannelConfiguration
       .FSYNC_INTERVAL, FileChannelConfiguration.DEFAULT_FSYNC_INTERVAL);
 
+    checkpointOnClose = context.getBoolean(FileChannelConfiguration
+            .CHKPT_ONCLOSE, FileChannelConfiguration.DEFAULT_CHKPT_ONCLOSE);
+
     if(queueRemaining == null) {
       queueRemaining = new Semaphore(capacity, true);
     }
@@ -286,6 +290,7 @@ public class FileChannel extends BasicChannelSemantics {
       builder.setBackupCheckpointDir(backupCheckpointDir);
       builder.setFsyncPerTransaction(fsyncPerTransaction);
       builder.setFsyncInterval(fsyncInterval);
+      builder.setCheckpointOnClose(checkpointOnClose);
       log = builder.build();
       log.replay();
       open = true;

http://git-wip-us.apache.org/repos/asf/flume/blob/be4ae294/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
index f8c0378..5c3c48f 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
@@ -98,5 +98,6 @@ public class FileChannelConfiguration {
   public static final String FSYNC_INTERVAL = "fsyncInterval";
   public static final int DEFAULT_FSYNC_INTERVAL = 5; // seconds.
 
-
+  public static final String CHKPT_ONCLOSE = "checkpointOnClose";
+  public static final Boolean DEFAULT_CHKPT_ONCLOSE = true;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/be4ae294/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 0e9171e..247c287 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -128,6 +128,7 @@ public class Log {
 
   private final boolean fsyncPerTransaction;
   private final int fsyncInterval;
+  private final boolean checkpointOnClose;
 
   private int readCount;
   private int putCount;
@@ -158,6 +159,8 @@ public class Log {
     private boolean fsyncPerTransaction = true;
     private int fsyncInterval;
 
+    private boolean checkpointOnClose = true;
+
     boolean isFsyncPerTransaction() {
       return fsyncPerTransaction;
     }
@@ -254,13 +257,18 @@ public class Log {
       return this;
     }
 
+    Builder setCheckpointOnClose(boolean enableCheckpointOnClose) {
+      this.checkpointOnClose = enableCheckpointOnClose;
+      return this;
+    }
+
     Log build() throws IOException {
       return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
         bUseDualCheckpoints, bCompressBackupCheckpoint,bCheckpointDir,
         bBackupCheckpointDir, bName, useLogReplayV1, useFastReplay,
         bMinimumRequiredSpace, bEncryptionKeyProvider, bEncryptionKeyAlias,
         bEncryptionCipherProvider, bUsableSpaceRefreshInterval,
-        fsyncPerTransaction, fsyncInterval, bLogDirs);
+        fsyncPerTransaction, fsyncInterval, checkpointOnClose, bLogDirs);
     }
   }
 
@@ -272,7 +280,7 @@ public class Log {
     @Nullable String encryptionKeyAlias,
     @Nullable String encryptionCipherProvider,
     long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
-    int fsyncInterval, File... logDirs)
+    int fsyncInterval, boolean checkpointOnClose, File... logDirs)
           throws IOException {
     Preconditions.checkArgument(checkpointInterval > 0,
       "checkpointInterval <= 0");
@@ -352,6 +360,8 @@ public class Log {
     this.logDirs = logDirs;
     this.fsyncPerTransaction = fsyncPerTransaction;
     this.fsyncInterval = fsyncInterval;
+    this.checkpointOnClose = checkpointOnClose;
+
     logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length);
     workerExecutor = Executors.newSingleThreadScheduledExecutor(new
       ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name)
@@ -791,6 +801,14 @@ public class Log {
     lockExclusive();
     try {
       open = false;
+      try {
+        if(checkpointOnClose) {
+          writeCheckpoint(true); // do this before acquiring exclusive lock
+        }
+      } catch (Exception err) {
+        LOGGER.warn("Failed creating checkpoint on close of channel " + channelNameDescriptor +
+                "Replay will take longer next time channel is started.", err);
+      }
       shutdownWorker();
       if (logFiles != null) {
         for (int index = 0; index < logFiles.length(); index++) {

http://git-wip-us.apache.org/repos/asf/flume/blob/be4ae294/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
index f7f1afa..801d925 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
@@ -22,6 +22,8 @@ import static org.mockito.Mockito.*;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.channels.*;
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.commons.io.FileUtils;
@@ -35,6 +37,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
+import javax.ws.rs.Path;
+
 public class TestLog {
   private static final Logger LOGGER = LoggerFactory.getLogger(TestLog.class);
   private static final long MAX_FILE_SIZE = 1000;
@@ -56,7 +60,7 @@ public class TestLog {
     }
     log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
         MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
-            checkpointDir).setLogDirs(dataDirs)
+            checkpointDir).setLogDirs(dataDirs).setCheckpointOnClose(false)
             .setChannelName("testlog").build();
     log.replay();
   }
@@ -465,6 +469,34 @@ public class TestLog {
         Long.MAX_VALUE - 1L);
   }
 
+  @Test
+  public void testCheckpointOnClose() throws Exception {
+    log.close();
+    log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
+            MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
+            checkpointDir).setLogDirs(dataDirs).setCheckpointOnClose(true)
+            .setChannelName("testLog").build();
+    log.replay();
+
+
+    // 1 Write One Event
+    FlumeEvent eventIn = TestUtils.newPersistableEvent();
+    log.put(transactionID, eventIn);
+    log.commitPut(transactionID);
+
+    // 2 Check state of checkpoint before close
+    File checkPointMetaFile =
+            FileUtils.listFiles(checkpointDir,new String[]{"meta"},false).iterator().next();
+    long before = FileUtils.checksumCRC32( checkPointMetaFile );
+
+    // 3 Close Log
+    log.close();
+
+    // 4 Verify that checkpoint was modified on close
+    long after = FileUtils.checksumCRC32( checkPointMetaFile );
+    Assert.assertFalse( before == after );
+  }
+
   private void takeAndVerify(FlumeEventPointer eventPointerIn,
       FlumeEvent eventIn)
     throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
@@ -479,4 +511,5 @@ public class TestLog {
     Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders());
     Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody());
   }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/be4ae294/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 78b139e..43ca5db 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2638,6 +2638,7 @@ capacity                                          1000000
 keep-alive                                        3                                 Amount of time (in sec) to wait for a put operation
 use-log-replay-v1                                 false                             Expert: Use old replay logic
 use-fast-replay                                   false                             Expert: Replay without using queue
+checkpointOnClose                                 true                              Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay.
 encryption.activeKey                              --                                Key name used to encrypt new data
 encryption.cipherProvider                         --                                Cipher provider type, supported types: AESCTRNOPADDING
 encryption.keyProvider                            --                                Key provider type, supported types: JCEKSFILE

http://git-wip-us.apache.org/repos/asf/flume/blob/be4ae294/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
----------------------------------------------------------------------
diff --git a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
index ac4dac4..a11126d 100644
--- a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
+++ b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
@@ -98,9 +98,20 @@ public class TestFileChannelIntegrityTool {
   }
 
   @Test
+  public void testFixCorruptRecordsWithCheckpoint() throws Exception {
+    doTestFixCorruptEvents(true);
+  }
+
+  @Test
+  public void testFixCorruptRecords() throws Exception {
+    doTestFixCorruptEvents(false);
+  }
+
+  @Test
   public void testFixInvalidRecords() throws Exception {
     doTestFixInvalidEvents(false, DummyEventVerifier.Builder.class.getName());
   }
+
   @Test
   public void testFixInvalidRecordsWithCheckpoint() throws Exception {
     doTestFixInvalidEvents(true, DummyEventVerifier.Builder.class.getName());
@@ -111,15 +122,24 @@ public class TestFileChannelIntegrityTool {
     tool.run(new String[] {"-l", dataDir.toString(), "-e", eventHandler, "-DvalidatorValue=0"});
     FileChannel channel = new FileChannel();
     channel.setName("channel");
-    String cp;
-    if(withCheckpoint) {
-      cp = origCheckpointDir.toString();
+    if (withCheckpoint) {
+      File[] cpFiles = origCheckpointDir.listFiles(new FilenameFilter() {
+        @Override
+        public boolean accept(File dir, String name) {
+          if (name.contains("lock") || name.contains("queueset")) {
+            return false;
+          }
+          return true;
+        }
+      });
+      for (File cpFile : cpFiles) {
+        Serialization.copyFile(cpFile, new File(checkpointDir, cpFile.getName()));
+      }
     } else {
       FileUtils.deleteDirectory(checkpointDir);
       Assert.assertTrue(checkpointDir.mkdirs());
-      cp = checkpointDir.toString();
     }
-    ctx.put(FileChannelConfiguration.CHECKPOINT_DIR,cp);
+    ctx.put(FileChannelConfiguration.CHECKPOINT_DIR, checkpointDir.toString());
     ctx.put(FileChannelConfiguration.DATA_DIRS, dataDir.toString());
     channel.configure(ctx);
     channel.start();
@@ -136,15 +156,6 @@ public class TestFileChannelIntegrityTool {
     Assert.assertEquals(25 - invalidEvent, i);
   }
 
-  @Test
-  public void testFixCorruptRecords() throws Exception {
-    doTestFixCorruptEvents(false);
-  }
-  @Test
-  public void testFixCorruptRecordsWithCheckpoint() throws Exception {
-    doTestFixCorruptEvents(true);
-  }
-
   public void doTestFixCorruptEvents(boolean withCheckpoint) throws Exception {
     Set<String> corruptFiles = new HashSet<String>();
     File[] files = dataDir.listFiles(new FilenameFilter() {
@@ -193,18 +204,27 @@ public class TestFileChannelIntegrityTool {
 
     }
     FileChannelIntegrityTool tool = new FileChannelIntegrityTool();
-    tool.run(new String[] {"-l", dataDir.toString()});
+    tool.run(new String[]{"-l", dataDir.toString()});
     FileChannel channel = new FileChannel();
     channel.setName("channel");
-    String cp;
-    if(withCheckpoint) {
-      cp = origCheckpointDir.toString();
+    if (withCheckpoint) {
+      File[] cpFiles = origCheckpointDir.listFiles(new FilenameFilter() {
+        @Override
+        public boolean accept(File dir, String name) {
+          if (name.contains("lock") || name.contains("queueset")) {
+            return false;
+          }
+          return true;
+        }
+      });
+      for (File cpFile : cpFiles) {
+        Serialization.copyFile(cpFile, new File(checkpointDir, cpFile.getName()));
+      }
     } else {
       FileUtils.deleteDirectory(checkpointDir);
       Assert.assertTrue(checkpointDir.mkdirs());
-      cp = checkpointDir.toString();
     }
-    ctx.put(FileChannelConfiguration.CHECKPOINT_DIR,cp);
+    ctx.put(FileChannelConfiguration.CHECKPOINT_DIR, checkpointDir.toString());
     ctx.put(FileChannelConfiguration.DATA_DIRS, dataDir.toString());
     channel.configure(ctx);
     channel.start();