You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/04/05 20:27:34 UTC

[2/2] git commit: FLUME-1516: FileChannel Write Dual Checkpoints to avoid replays

FLUME-1516: FileChannel Write Dual Checkpoints to avoid replays

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/6ca61680
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/6ca61680
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/6ca61680

Branch: refs/heads/trunk
Commit: 6ca616800ec897551fbb14959ce3a5f0c1d69aed
Parents: df7a197
Author: Brock Noland <br...@apache.org>
Authored: Fri Apr 5 13:26:56 2013 -0500
Committer: Brock Noland <br...@apache.org>
Committed: Fri Apr 5 13:26:56 2013 -0500

----------------------------------------------------------------------
 .../flume/channel/file/EventQueueBackingStore.java |    2 +
 .../file/EventQueueBackingStoreFactory.java        |   21 +-
 .../channel/file/EventQueueBackingStoreFile.java   |  156 +++++++-
 .../channel/file/EventQueueBackingStoreFileV3.java |   17 +-
 .../org/apache/flume/channel/file/FileChannel.java |  117 +++---
 .../channel/file/FileChannelConfiguration.java     |   10 +
 .../apache/flume/channel/file/FlumeEventQueue.java |   18 -
 .../java/org/apache/flume/channel/file/Log.java    |  183 ++++++--
 .../org/apache/flume/channel/file/LogFile.java     |   60 ++-
 .../org/apache/flume/channel/file/LogFileV3.java   |   41 +-
 .../apache/flume/channel/file/ReplayHandler.java   |   69 ++--
 .../apache/flume/channel/file/Serialization.java   |  105 ++++-
 .../flume/channel/file/proto/ProtosFactory.java    |  146 ++++++-
 .../src/main/proto/filechannel.proto               |    2 +
 .../flume/channel/file/TestFileChannelBase.java    |    7 +-
 .../flume/channel/file/TestFileChannelRestart.java |  356 ++++++++++++++-
 .../org/apache/flume/channel/file/TestUtils.java   |   17 +-
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |    2 +
 18 files changed, 1104 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
index b136eb0..2726095 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
@@ -29,6 +29,8 @@ abstract class EventQueueBackingStore {
   private long logWriteOrderID;
   private final int capacity;
   private final String name;
+  public static final String BACKUP_COMPLETE_FILENAME = "backupComplete";
+  protected Boolean slowdownBackup = false;
 
   protected EventQueueBackingStore(int capacity, String name) {
     this.capacity = capacity;

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
index a19bdb4..07a3781 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
@@ -35,8 +35,14 @@ class EventQueueBackingStoreFactory {
       String name) throws Exception {
     return get(checkpointFile, capacity, name, true);
   }
+
   static EventQueueBackingStore get(File checkpointFile, int capacity,
       String name, boolean upgrade) throws Exception {
+    return get(checkpointFile, null, capacity, name, upgrade, false);
+  }
+  static EventQueueBackingStore get(File checkpointFile,
+      File backupCheckpointDir, int capacity,String name,
+      boolean upgrade, boolean shouldBackup) throws Exception {
     File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
     RandomAccessFile checkpointFileHandle = null;
     try {
@@ -61,17 +67,20 @@ class EventQueueBackingStoreFactory {
         if(!checkpointFile.createNewFile()) {
           throw new IOException("Cannot create " + checkpointFile);
         }
-        return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name);
+        return new EventQueueBackingStoreFileV3(checkpointFile,
+            capacity, name, backupCheckpointDir, shouldBackup);
       }
       // v3 due to meta file, version will be checked by backing store
       if(metaDataExists) {
-        return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name);
+        return new EventQueueBackingStoreFileV3(checkpointFile, capacity,
+          name, backupCheckpointDir, shouldBackup);
       }
       checkpointFileHandle = new RandomAccessFile(checkpointFile, "r");
       int version = (int)checkpointFileHandle.readLong();
       if(Serialization.VERSION_2 == version) {
         if(upgrade) {
-          return upgrade(checkpointFile, capacity, name);
+          return upgrade(checkpointFile, capacity, name, backupCheckpointDir,
+            shouldBackup);
         }
         return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name);
       }
@@ -91,7 +100,8 @@ class EventQueueBackingStoreFactory {
   }
 
   private static EventQueueBackingStore upgrade(File checkpointFile,
-      int capacity, String name)
+    int capacity, String name, File backupCheckpointDir,
+    boolean shouldBackup)
           throws Exception {
     LOG.info("Attempting upgrade of " + checkpointFile + " for " + name);
     EventQueueBackingStoreFileV2 backingStoreV2 =
@@ -103,7 +113,8 @@ class EventQueueBackingStoreFactory {
     File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
     EventQueueBackingStoreFileV3.upgrade(backingStoreV2, checkpointFile,
         metaDataFile);
-    return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name);
+    return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name,
+      backupCheckpointDir, shouldBackup);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
index 4115505..5884800 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
@@ -28,8 +28,14 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,11 +62,25 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
   protected final MappedByteBuffer mappedBuffer;
   protected final RandomAccessFile checkpointFileHandle;
   protected final File checkpointFile;
+  private final Semaphore backupCompletedSema = new Semaphore(1);
+  protected final boolean shouldBackup;
+  private final File backupDir;
+  private final ExecutorService checkpointBackUpExecutor;
 
   protected EventQueueBackingStoreFile(int capacity, String name,
-      File checkpointFile) throws IOException, BadCheckpointException {
+      File checkpointFile) throws IOException,
+      BadCheckpointException {
+    this(capacity, name, checkpointFile, null, false);
+  }
+
+  protected EventQueueBackingStoreFile(int capacity, String name,
+      File checkpointFile, File checkpointBackupDir,
+      boolean backupCheckpoint) throws IOException,
+      BadCheckpointException {
     super(capacity, name);
     this.checkpointFile = checkpointFile;
+    this.shouldBackup = backupCheckpoint;
+    this.backupDir = checkpointBackupDir;
     checkpointFileHandle = new RandomAccessFile(checkpointFile, "rw");
     long totalBytes = (capacity + HEADER_SIZE) * Serialization.SIZE_OF_LONG;
     if(checkpointFileHandle.length() == 0) {
@@ -95,6 +115,13 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
               + " probably because the agent stopped while the channel was"
               + " checkpointing.");
     }
+    if (shouldBackup) {
+      checkpointBackUpExecutor = Executors.newSingleThreadExecutor(
+        new ThreadFactoryBuilder().setNameFormat(
+          getName() + " - CheckpointBackUpThread").build());
+    } else {
+      checkpointBackUpExecutor = null;
+    }
   }
 
   protected long getCheckpointLogWriteOrderID() {
@@ -103,11 +130,104 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
 
   protected abstract void writeCheckpointMetaData() throws IOException;
 
+  /**
+   * This method backs up the checkpoint and its metadata files. This method
+   * is called once the checkpoint is completely written and is called
+   * from a separate thread which runs in the background while the file channel
+   * continues operation.
+   *
+   * @param backupDirectory - the directory to which the backup files should be
+   *                        copied.
+   * @throws IOException - if the copy failed, or if there is not enough disk
+   * space to copy the checkpoint files over.
+   */
+  protected void backupCheckpoint(File backupDirectory) throws IOException {
+    int availablePermits = backupCompletedSema.drainPermits();
+    Preconditions.checkState(availablePermits == 0,
+      "Expected no permits to be available in the backup semaphore, " +
+        "but " + availablePermits + " permits were available.");
+    if (slowdownBackup) {
+      try {
+        TimeUnit.SECONDS.sleep(10);
+      } catch (Exception ex) {
+        Throwables.propagate(ex);
+      }
+    }
+    File backupFile = new File(backupDirectory, BACKUP_COMPLETE_FILENAME);
+    if (backupExists(backupDirectory)) {
+      if (!backupFile.delete()) {
+        throw new IOException("Error while doing backup of checkpoint. Could " +
+          "not remove" + backupFile.toString() + ".");
+      }
+    }
+    Serialization.deleteAllFiles(backupDirectory, Log.EXCLUDES);
+    File checkpointDir = checkpointFile.getParentFile();
+    File[] checkpointFiles = checkpointDir.listFiles();
+    Preconditions.checkNotNull(checkpointFiles, "Could not retrieve files " +
+      "from the checkpoint directory. Cannot complete backup of the " +
+      "checkpoint.");
+    for (File origFile : checkpointFiles) {
+      if(origFile.getName().equals(Log.FILE_LOCK)) {
+        continue;
+      }
+      Serialization.copyFile(origFile, new File(backupDirectory,
+        origFile.getName()));
+    }
+    Preconditions.checkState(!backupFile.exists(), "The backup file exists " +
+      "while it is not supposed to. Are multiple channels configured to use " +
+      "this directory: " + backupDirectory.toString() + " as backup?");
+    if (!backupFile.createNewFile()) {
+      LOG.error("Could not create backup file. Backup of checkpoint will " +
+        "not be used during replay even if checkpoint is bad.");
+    }
+  }
+
+  /**
+   * Restore the checkpoint, if it is found to be bad.
+   * @return true - if the previous backup was successfully completed and
+   * restore was successfully completed.
+   * @throws IOException - If restore failed due to IOException
+   *
+   */
+  public static boolean restoreBackup(File checkpointDir, File backupDir)
+    throws IOException {
+    if (!backupExists(backupDir)) {
+      return false;
+    }
+    Serialization.deleteAllFiles(checkpointDir, Log.EXCLUDES);
+    File[] backupFiles = backupDir.listFiles();
+    if (backupFiles == null) {
+      return false;
+    } else {
+      for (File backupFile : backupFiles) {
+        String fileName = backupFile.getName();
+        if (!fileName.equals(BACKUP_COMPLETE_FILENAME) &&
+          !fileName.equals(Log.FILE_LOCK)) {
+          Serialization.copyFile(backupFile, new File(checkpointDir, fileName));
+        }
+      }
+      return true;
+    }
+  }
+
   @Override
   void beginCheckpoint() throws IOException {
     LOG.info("Start checkpoint for " + checkpointFile +
         ", elements to sync = " + overwriteMap.size());
 
+    if (shouldBackup) {
+      int permits = backupCompletedSema.drainPermits();
+      Preconditions.checkState(permits <= 1, "Expected only one or less " +
+        "permits to checkpoint, but got " + String.valueOf(permits) +
+        " permits");
+      if(permits < 1) {
+        // Force the checkpoint to not happen by throwing an exception.
+        throw new IOException("Previous backup of checkpoint files is still " +
+          "in progress. Will attempt to checkpoint only at the end of the " +
+          "next checkpoint interval. Try increasing the checkpoint interval " +
+          "if this error happens often.");
+      }
+    }
     // Start checkpoint
     elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE);
     mappedBuffer.force();
@@ -141,8 +261,38 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
     // Finish checkpoint
     elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_COMPLETE);
     mappedBuffer.force();
+    if (shouldBackup) {
+      startBackupThread();
+    }
   }
 
+  /**
+   * This method starts backing up the checkpoint in the background.
+   */
+  private void startBackupThread() {
+    Preconditions.checkNotNull(checkpointBackUpExecutor,
+      "Expected the checkpoint backup exector to be non-null, " +
+        "but it is null. Checkpoint will not be backed up.");
+    LOG.info("Attempting to back up checkpoint.");
+    checkpointBackUpExecutor.submit(new Runnable() {
+
+      @Override
+      public void run() {
+        boolean error = false;
+        try {
+          backupCheckpoint(backupDir);
+        } catch (Throwable throwable) {
+          error = true;
+          LOG.error("Backing up of checkpoint directory failed.", throwable);
+        } finally {
+          backupCompletedSema.release();
+        }
+        if (!error) {
+          LOG.info("Checkpoint backup completed.");
+        }
+      }
+    });
+  }
 
   @Override
   void close() {
@@ -242,6 +392,10 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
     }
   }
 
+  public static boolean backupExists(File backupDir) {
+    return new File(backupDir, BACKUP_COMPLETE_FILENAME).exists();
+  }
+
   public static void main(String[] args) throws Exception {
     File file = new File(args[0]);
     File inflightTakesFile = new File(args[1]);

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
index 451a9d4..c153558 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
@@ -38,9 +38,15 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
       .getLogger(EventQueueBackingStoreFileV3.class);
   private final File metaDataFile;
 
-  EventQueueBackingStoreFileV3(File checkpointFile, int capacity, String name)
-      throws IOException, BadCheckpointException {
-    super(capacity, name, checkpointFile);
+  EventQueueBackingStoreFileV3(File checkpointFile, int capacity,
+      String name) throws IOException, BadCheckpointException {
+    this(checkpointFile, capacity, name, null, false);
+  }
+
+  EventQueueBackingStoreFileV3(File checkpointFile, int capacity,
+      String name, File checkpointBackupDir,
+      boolean backupCheckpoint) throws IOException, BadCheckpointException {
+    super(capacity, name, checkpointFile, checkpointBackupDir, backupCheckpoint);
     Preconditions.checkArgument(capacity > 0,
         "capacity must be greater than 0 " + capacity);
     metaDataFile = Serialization.getMetaDataFile(checkpointFile);
@@ -89,6 +95,11 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
         }
       }
     } else {
+      if(backupExists(checkpointBackupDir) && shouldBackup) {
+        // If a backup exists, then throw an exception to recover checkpoint
+        throw new BadCheckpointException("The checkpoint metadata file does " +
+            "not exist, but a backup exists");
+      }
       ProtosFactory.Checkpoint.Builder checkpointBuilder =
           ProtosFactory.Checkpoint.newBuilder();
       checkpointBuilder.setVersion(getVersion());

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index ff42d19..a7aa70c 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -19,22 +19,17 @@
 
 package org.apache.flume.channel.file;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.annotations.Disposable;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
-import org.apache.flume.annotations.Disposable;
 import org.apache.flume.channel.BasicChannelSemantics;
 import org.apache.flume.channel.BasicTransactionSemantics;
 import org.apache.flume.channel.file.Log.Builder;
@@ -45,8 +40,12 @@ import org.apache.flume.instrumentation.ChannelCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 /**
  * <p>
@@ -83,6 +82,7 @@ public class FileChannel extends BasicChannelSemantics {
   private long maxFileSize;
   private long minimumRequiredSpace;
   private File checkpointDir;
+  private File backupCheckpointDir;
   private File[] dataDirs;
   private Log log;
   private volatile boolean open;
@@ -99,6 +99,7 @@ public class FileChannel extends BasicChannelSemantics {
   private KeyProvider encryptionKeyProvider;
   private String encryptionActiveKey;
   private String encryptionCipherProvider;
+  private boolean useDualCheckpoints;
 
   @Override
   public synchronized void setName(String name) {
@@ -109,60 +110,51 @@ public class FileChannel extends BasicChannelSemantics {
   @Override
   public void configure(Context context) {
 
+    useDualCheckpoints = context.getBoolean(
+        FileChannelConfiguration.USE_DUAL_CHECKPOINTS,
+        FileChannelConfiguration.DEFAULT_USE_DUAL_CHECKPOINTS);
     String homePath = System.getProperty("user.home").replace('\\', '/');
 
     String strCheckpointDir =
         context.getString(FileChannelConfiguration.CHECKPOINT_DIR,
             homePath + "/.flume/file-channel/checkpoint");
 
+    String strBackupCheckpointDir = context.getString
+      (FileChannelConfiguration.BACKUP_CHECKPOINT_DIR, "").trim();
+
     String[] strDataDirs = context.getString(FileChannelConfiguration.DATA_DIRS,
         homePath + "/.flume/file-channel/data").split(",");
 
-    if(checkpointDir == null) {
-      checkpointDir = new File(strCheckpointDir);
-    } else if(!checkpointDir.getAbsolutePath().
-        equals(new File(strCheckpointDir).getAbsolutePath())) {
-      LOG.warn("An attempt was made to change the checkpoint " +
-          "directory after start, this is not supported.");
+    checkpointDir = new File(strCheckpointDir);
+
+    if (useDualCheckpoints) {
+      Preconditions.checkState(!strBackupCheckpointDir.isEmpty(),
+        "Dual checkpointing is enabled, but the backup directory is not set. " +
+          "Please set " + FileChannelConfiguration.BACKUP_CHECKPOINT_DIR + " " +
+          "to enable dual checkpointing");
+      backupCheckpointDir = new File(strBackupCheckpointDir);
+      /*
+       * If the backup directory is the same as the checkpoint directory,
+       * then throw an exception and force the config system to ignore this
+       * channel.
+       */
+      Preconditions.checkState(!backupCheckpointDir.equals(checkpointDir),
+        "Could not configure " + getName() + ". The checkpoint backup " +
+          "directory and the checkpoint directory are " +
+          "configured to be the same.");
     }
-    if(dataDirs == null) {
-      dataDirs = new File[strDataDirs.length];
-      for (int i = 0; i < strDataDirs.length; i++) {
-        dataDirs[i] = new File(strDataDirs[i]);
-      }
-    } else {
-      boolean changed = false;
-      if(dataDirs.length != strDataDirs.length) {
-        changed = true;
-      } else {
-        for (int i = 0; i < strDataDirs.length; i++) {
-          if(!dataDirs[i].getAbsolutePath().
-              equals(new File(strDataDirs[i]).getAbsolutePath())) {
-            changed = true;
-            break;
-          }
-        }
-      }
-      if(changed) {
-        LOG.warn("An attempt was made to change the data " +
-            "directories after start, this is not supported.");
-      }
+
+    dataDirs = new File[strDataDirs.length];
+    for (int i = 0; i < strDataDirs.length; i++) {
+      dataDirs[i] = new File(strDataDirs[i]);
     }
 
-    int newCapacity = context.getInteger(FileChannelConfiguration.CAPACITY,
+    capacity = context.getInteger(FileChannelConfiguration.CAPACITY,
         FileChannelConfiguration.DEFAULT_CAPACITY);
-    if(newCapacity <= 0 && capacity == 0) {
-      newCapacity = FileChannelConfiguration.DEFAULT_CAPACITY;
+    if(capacity <= 0) {
+      capacity = FileChannelConfiguration.DEFAULT_CAPACITY;
       LOG.warn("Invalid capacity specified, initializing channel to "
-              + "default capacity of {}", newCapacity);
-    }
-    if(capacity > 0 && newCapacity != capacity) {
-      LOG.warn("Capacity of this channel cannot be sized on the fly due " +
-          "the requirement we have enough DirectMemory for the queue and " +
-          "downsizing of the queue cannot be guranteed due to the " +
-          "fact there maybe more items on the queue than the new capacity.");
-    } else {
-      capacity = newCapacity;
+              + "default capacity of {}", capacity);
     }
 
     keepAlive =
@@ -181,8 +173,8 @@ public class FileChannel extends BasicChannelSemantics {
     }
 
     Preconditions.checkState(transactionCapacity <= capacity,
-        "File Channel transaction capacity cannot be greater than the " +
-            "capacity of the channel.");
+      "File Channel transaction capacity cannot be greater than the " +
+        "capacity of the channel.");
 
     checkpointInterval =
             context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL,
@@ -303,6 +295,8 @@ public class FileChannel extends BasicChannelSemantics {
       builder.setEncryptionKeyProvider(encryptionKeyProvider);
       builder.setEncryptionKeyAlias(encryptionActiveKey);
       builder.setEncryptionCipherProvider(encryptionCipherProvider);
+      builder.setUseDualCheckpoints(useDualCheckpoints);
+      builder.setBackupCheckpointDir(backupCheckpointDir);
       log = builder.build();
       log.replay();
       open = true;
@@ -402,6 +396,23 @@ public class FileChannel extends BasicChannelSemantics {
   }
 
   /**
+   * Did this channel recover a backup of the checkpoint to restart?
+   * @return true if the channel recovered using a backup.
+   */
+  @VisibleForTesting
+  boolean checkpointBackupRestored() {
+    if(log != null) {
+      return log.backupRestored();
+    }
+    return false;
+  }
+
+  @VisibleForTesting
+  Log getLog() {
+    return log;
+  }
+
+  /**
    * Transaction backed by a file. This transaction supports either puts
    * or takes but not both.
    */
@@ -462,7 +473,7 @@ public class FileChannel extends BasicChannelSemantics {
         }
         FlumeEventPointer ptr = log.put(transactionID, event);
         Preconditions.checkState(putList.offer(ptr), "putList offer failed "
-             + channelNameDescriptor);
+          + channelNameDescriptor);
         queue.addWithoutCommit(ptr, transactionID);
         success = true;
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
index 24368b3..c2dcffc 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
@@ -23,6 +23,12 @@ public class FileChannelConfiguration {
    * Directory Checkpoints will be written in
    */
   public static final String CHECKPOINT_DIR = "checkpointDir";
+
+  /**
+   * The directory to which the checkpoint must be backed up
+   */
+  public static final String BACKUP_CHECKPOINT_DIR = "backupCheckpointDir";
+
   /**
    * Directories data files will be written in. Multiple directories
    * can be specified as comma separated values. Writes will
@@ -90,4 +96,8 @@ public class FileChannelConfiguration {
 
   public static final String USE_FAST_REPLAY = "use-fast-replay";
   public static final boolean DEFAULT_USE_FAST_REPLAY = false;
+
+  public static final String USE_DUAL_CHECKPOINTS = "useDualCheckpoints";
+  public static final boolean DEFAULT_USE_DUAL_CHECKPOINTS = false;
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
index 1ed9547..ac03fb4 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
@@ -30,12 +30,7 @@ import java.util.Collection;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -394,19 +389,6 @@ final class FlumeEventQueue {
      * asynchronously written to disk.
      */
     public void serializeAndWrite() throws Exception {
-      //Check if there is a current write happening, if there is abort it.
-      if (future != null) {
-        try {
-          future.cancel(true);
-        } catch (Exception e) {
-          LOG.warn("Interrupted a write to inflights "
-                  + "file: " + inflightEventsFile.getName()
-                  + " to start a new write.");
-        }
-        while (!future.isDone()) {
-          TimeUnit.MILLISECONDS.sleep(100);
-        }
-      }
       Collection<Long> values = inflightEvents.values();
       if(!fileChannel.isOpen()){
         file = new RandomAccessFile(inflightEventsFile, "rw");

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 6ffc824..e61437d 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -18,6 +18,23 @@
  */
 package org.apache.flume.channel.file;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.channel.file.encryption.KeyProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -31,6 +48,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.Executors;
@@ -42,23 +60,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.flume.ChannelException;
-import org.apache.flume.Event;
-import org.apache.flume.annotations.InterfaceAudience;
-import org.apache.flume.annotations.InterfaceStability;
-import org.apache.flume.channel.file.encryption.KeyProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 /**
  * Stores FlumeEvents on disk and pointers to the events in a in memory queue.
  * Once a log object is created the replay method should be called to reconcile
@@ -76,12 +77,13 @@ class Log {
   public static final String PREFIX = "log-";
   private static final Logger LOGGER = LoggerFactory.getLogger(Log.class);
   private static final int MIN_NUM_LOGS = 2;
-  private static final String FILE_LOCK = "in_use.lock";
+  public static final String FILE_LOCK = "in_use.lock";
   // for reader
   private final Map<Integer, LogFile.RandomReader> idLogFileMap = Collections
       .synchronizedMap(new HashMap<Integer, LogFile.RandomReader>());
   private final AtomicInteger nextFileID = new AtomicInteger(0);
   private final File checkpointDir;
+  private final File backupCheckpointDir;
   private final File[] logDirs;
   private final int queueCapacity;
   private final AtomicReferenceArray<LogFile.Writer> logFiles;
@@ -97,6 +99,11 @@ class Log {
   private final Map<String, FileLock> locks;
   private final ReentrantReadWriteLock checkpointLock =
       new ReentrantReadWriteLock(true);
+
+  /**
+   * Set of files that should be excluded from backup and restores.
+   */
+  public static final Set<String> EXCLUDES = Sets.newHashSet(FILE_LOCK);
   /**
    * Shared lock
    */
@@ -115,6 +122,16 @@ class Log {
   private Key encryptionKey;
   private final long usableSpaceRefreshInterval;
   private boolean didFastReplay = false;
+  private final boolean useDualCheckpoints;
+  private volatile boolean backupRestored = false;
+
+  private int readCount;
+  private int putCount;
+  private int takeCount;
+  private int committedCount;
+  private int rollbackCount;
+
+  private final List<File> pendingDeletes = Lists.newArrayList();
 
   static class Builder {
     private long bCheckpointInterval;
@@ -134,6 +151,8 @@ class Log {
     private String bEncryptionKeyAlias;
     private String bEncryptionCipherProvider;
     private long bUsableSpaceRefreshInterval = 15L * 1000L;
+    private boolean bUseDualCheckpoints = false;
+    private File bBackupCheckpointDir = null;
 
     Builder setUsableSpaceRefreshInterval(long usableSpaceRefreshInterval) {
       bUsableSpaceRefreshInterval = usableSpaceRefreshInterval;
@@ -210,9 +229,20 @@ class Log {
       return this;
     }
 
+    Builder setUseDualCheckpoints(boolean UseDualCheckpoints) {
+      this.bUseDualCheckpoints = UseDualCheckpoints;
+      return this;
+    }
+
+    Builder setBackupCheckpointDir(File backupCheckpointDir) {
+      this.bBackupCheckpointDir = backupCheckpointDir;
+      return this;
+    }
+
     Log build() throws IOException {
       return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
-          bLogWriteTimeout, bCheckpointWriteTimeout, bCheckpointDir, bName,
+          bLogWriteTimeout, bCheckpointWriteTimeout, bUseDualCheckpoints,
+          bCheckpointDir, bBackupCheckpointDir, bName,
           useLogReplayV1, useFastReplay, bMinimumRequiredSpace,
           bEncryptionKeyProvider, bEncryptionKeyAlias,
           bEncryptionCipherProvider, bUsableSpaceRefreshInterval,
@@ -221,7 +251,8 @@ class Log {
   }
 
   private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
-      int logWriteTimeout, int checkpointWriteTimeout, File checkpointDir,
+      int logWriteTimeout, int checkpointWriteTimeout,
+      boolean useDualCheckpoints, File checkpointDir, File backupCheckpointDir,
       String name, boolean useLogReplayV1, boolean useFastReplay,
       long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider,
       @Nullable String encryptionKeyAlias,
@@ -229,15 +260,23 @@ class Log {
       long usableSpaceRefreshInterval, File... logDirs)
           throws IOException {
     Preconditions.checkArgument(checkpointInterval > 0,
-        "checkpointInterval <= 0");
+      "checkpointInterval <= 0");
     Preconditions.checkArgument(queueCapacity > 0, "queueCapacity <= 0");
     Preconditions.checkArgument(maxFileSize > 0, "maxFileSize <= 0");
     Preconditions.checkNotNull(checkpointDir, "checkpointDir");
     Preconditions.checkArgument(usableSpaceRefreshInterval > 0,
         "usableSpaceRefreshInterval <= 0");
     Preconditions.checkArgument(
-        checkpointDir.isDirectory() || checkpointDir.mkdirs(), "CheckpointDir "
-            + checkpointDir + " could not be created");
+      checkpointDir.isDirectory() || checkpointDir.mkdirs(), "CheckpointDir "
+      + checkpointDir + " could not be created");
+    if (useDualCheckpoints) {
+      Preconditions.checkNotNull(backupCheckpointDir, "backupCheckpointDir is" +
+        " null while dual checkpointing is enabled.");
+      Preconditions.checkArgument(
+        backupCheckpointDir.isDirectory() || backupCheckpointDir.mkdirs(),
+        "Backup CheckpointDir " + backupCheckpointDir +
+          " could not be created");
+    }
     Preconditions.checkNotNull(logDirs, "logDirs");
     Preconditions.checkArgument(logDirs.length > 0, "logDirs empty");
     Preconditions.checkArgument(name != null && !name.trim().isEmpty(),
@@ -255,6 +294,9 @@ class Log {
     locks = Maps.newHashMap();
     try {
       lock(checkpointDir);
+      if(useDualCheckpoints) {
+        lock(backupCheckpointDir);
+      }
       for (File logDir : logDirs) {
         lock(logDir);
       }
@@ -288,13 +330,15 @@ class Log {
     this.checkpointInterval = Math.max(checkpointInterval, 1000);
     this.maxFileSize = maxFileSize;
     this.queueCapacity = queueCapacity;
+    this.useDualCheckpoints = useDualCheckpoints;
     this.checkpointDir = checkpointDir;
+    this.backupCheckpointDir = backupCheckpointDir;
     this.logDirs = logDirs;
     this.logWriteTimeout = logWriteTimeout;
     this.checkpointWriteTimeout = checkpointWriteTimeout;
     logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length);
     workerExecutor = Executors.newSingleThreadScheduledExecutor(new
-        ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name)
+      ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name)
         .build());
     workerExecutor.scheduleWithFixedDelay(new BackgroundWorker(this),
         this.checkpointInterval, this.checkpointInterval,
@@ -365,8 +409,9 @@ class Log {
 
       try {
         backingStore =
-                EventQueueBackingStoreFactory.get(checkpointFile, queueCapacity,
-                channelNameDescriptor);
+            EventQueueBackingStoreFactory.get(checkpointFile,
+                backupCheckpointDir, queueCapacity, channelNameDescriptor,
+                true, this.useDualCheckpoints);
         queue = new FlumeEventQueue(backingStore, inflightTakesFile,
                 inflightPutsFile);
         LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified())
@@ -383,14 +428,26 @@ class Log {
          */
         doReplay(queue, dataFiles, encryptionKeyProvider, shouldFastReplay);
       } catch (BadCheckpointException ex) {
-        LOGGER.warn("Checkpoint may not have completed successfully. "
-                + "Forcing full replay, this may take a while.", ex);
-        if(!Serialization.deleteAllFiles(checkpointDir)) {
-          throw new IOException("Could not delete files in checkpoint " +
-              "directory to recover from a corrupt or incomplete checkpoint");
+        backupRestored = false;
+        if (useDualCheckpoints) {
+          LOGGER.warn("Checkpoint may not have completed successfully. "
+              + "Restoring checkpoint and starting up.", ex);
+          if (EventQueueBackingStoreFile.backupExists(backupCheckpointDir)) {
+            backupRestored = EventQueueBackingStoreFile.restoreBackup(
+              checkpointDir, backupCheckpointDir);
+          }
+        }
+        if (!backupRestored) {
+          LOGGER.warn("Checkpoint may not have completed successfully. "
+              + "Forcing full replay, this may take a while.", ex);
+          if (!Serialization.deleteAllFiles(checkpointDir, EXCLUDES)) {
+            throw new IOException("Could not delete files in checkpoint " +
+                "directory to recover from a corrupt or incomplete checkpoint");
+          }
         }
         backingStore = EventQueueBackingStoreFactory.get(checkpointFile,
-                queueCapacity, channelNameDescriptor);
+            backupCheckpointDir,
+            queueCapacity, channelNameDescriptor, true, useDualCheckpoints);
         queue = new FlumeEventQueue(backingStore, inflightTakesFile,
                 inflightPutsFile);
         // If the checkpoint was deleted due to BadCheckpointException, then
@@ -441,6 +498,11 @@ class Log {
         LOGGER.info("Replaying logs with v2 replay logic");
         replayHandler.replayLog(dataFiles);
       }
+      readCount = replayHandler.getReadCount();
+      putCount = replayHandler.getPutCount();
+      takeCount = replayHandler.getTakeCount();
+      rollbackCount = replayHandler.getRollbackCount();
+      committedCount = replayHandler.getCommitCount();
     }
   }
 
@@ -448,6 +510,36 @@ class Log {
   boolean didFastReplay() {
     return didFastReplay;
   }
+  @VisibleForTesting
+  public int getReadCount() {
+    return readCount;
+  }
+  @VisibleForTesting
+  public int getPutCount() {
+    return putCount;
+  }
+
+  @VisibleForTesting
+  public int getTakeCount() {
+    return takeCount;
+  }
+  @VisibleForTesting
+  public int getCommittedCount() {
+    return committedCount;
+  }
+  @VisibleForTesting
+  public int getRollbackCount() {
+    return rollbackCount;
+  }
+
+  /**
+   * Was a checkpoint backup used to replay?
+   * @return true if a checkpoint backup was used to replay.
+   */
+  @VisibleForTesting
+  boolean backupRestored() {
+    return backupRestored;
+  }
 
   int getNextFileID() {
     Preconditions.checkState(open, "Log is closed");
@@ -704,6 +796,13 @@ class Log {
       } catch (IOException ex) {
         LOGGER.warn("Error unlocking " + checkpointDir, ex);
       }
+      if (useDualCheckpoints) {
+        try {
+          unlock(backupCheckpointDir);
+        } catch (IOException ex) {
+          LOGGER.warn("Error unlocking " + checkpointDir, ex);
+        }
+      }
       for (File logDir : logDirs) {
         try {
           unlock(logDir);
@@ -942,6 +1041,17 @@ class Log {
 
   private void removeOldLogs(SortedSet<Integer> fileIDs) {
     Preconditions.checkState(open, "Log is closed");
+    // To maintain a single code path for deletes, if backup of checkpoint is
+    // enabled or not, we will track the files which can be deleted after the
+    // current checkpoint (since the one which just got backed up still needs
+    // these files) and delete them only after the next (since the current
+    // checkpoint will become the backup at that time,
+    // and thus these files are no longer needed).
+    for(File fileToDelete : pendingDeletes) {
+      LOGGER.info("Removing old file: " + fileToDelete);
+      FileUtils.deleteQuietly(fileToDelete);
+    }
+    pendingDeletes.clear();
     // we will find the smallest fileID currently in use and
     // won't delete any files with an id larger than the min
     int minFileID = fileIDs.first();
@@ -960,14 +1070,9 @@ class Log {
           if(reader != null) {
             reader.close();
           }
-          LOGGER.info("Removing old log " + logFile +
-              ", result = " + logFile.delete() + ", minFileID "
-              + minFileID);
           File metaDataFile = Serialization.getMetaDataFile(logFile);
-          if(metaDataFile.exists() && !metaDataFile.delete()) {
-            LOGGER.warn("Could not remove metadata file "
-                + metaDataFile + " for " + logFile);
-          }
+          pendingDeletes.add(logFile);
+          pendingDeletes.add(metaDataFile);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
index 1db3717..d3db896 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
@@ -18,6 +18,17 @@
  */
 package org.apache.flume.channel.file;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.flume.channel.file.encryption.CipherProvider;
+import org.apache.flume.channel.file.encryption.KeyProvider;
+import org.apache.flume.tools.DirectMemoryUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
@@ -29,19 +40,6 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.annotation.Nullable;
-
-import org.apache.flume.channel.file.encryption.CipherProvider;
-import org.apache.flume.channel.file.encryption.KeyProvider;
-import org.apache.flume.tools.DirectMemoryUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
 abstract class LogFile {
 
   private static final Logger LOG = LoggerFactory
@@ -420,6 +418,8 @@ abstract class LogFile {
     private int logFileID;
     private long lastCheckpointPosition;
     private long lastCheckpointWriteOrderID;
+    private long backupCheckpointPosition;
+    private long backupCheckpointWriteOrderID;
 
     /**
      * Construct a Sequential Log Reader object
@@ -444,6 +444,14 @@ abstract class LogFile {
     protected void setLastCheckpointWriteOrderID(long lastCheckpointWriteOrderID) {
       this.lastCheckpointWriteOrderID = lastCheckpointWriteOrderID;
     }
+    protected void setPreviousCheckpointPosition(
+      long backupCheckpointPosition) {
+      this.backupCheckpointPosition = backupCheckpointPosition;
+    }
+    protected void setPreviousCheckpointWriteOrderID(
+      long backupCheckpointWriteOrderID) {
+      this.backupCheckpointWriteOrderID = backupCheckpointWriteOrderID;
+    }
     protected void setLogFileID(int logFileID) {
       this.logFileID = logFileID;
       Preconditions.checkArgument(logFileID >= 0, "LogFileID is not positive: "
@@ -459,18 +467,24 @@ abstract class LogFile {
     int getLogFileID() {
       return logFileID;
     }
+
     void skipToLastCheckpointPosition(long checkpointWriteOrderID)
-        throws IOException {
-      if (lastCheckpointPosition > 0L
-          && lastCheckpointWriteOrderID <= checkpointWriteOrderID) {
-        LOG.info("fast-forward to checkpoint position: "
-                  + lastCheckpointPosition);
-        fileChannel.position(lastCheckpointPosition);
+      throws IOException {
+      if (lastCheckpointPosition > 0L) {
+        long position = 0;
+        if (lastCheckpointWriteOrderID <= checkpointWriteOrderID) {
+          position = lastCheckpointPosition;
+        } else if (backupCheckpointWriteOrderID <= checkpointWriteOrderID
+          && backupCheckpointPosition > 0) {
+          position = backupCheckpointPosition;
+        }
+        fileChannel.position(position);
+        LOG.info("fast-forward to checkpoint position: " + position);
       } else {
-        LOG.warn("Checkpoint for file(" + file.getAbsolutePath() + ") "
-            + "is: " + lastCheckpointWriteOrderID + ", which is beyond the "
-            + "requested checkpoint time: " + checkpointWriteOrderID
-            + " and position " + lastCheckpointPosition);
+        LOG.info("Checkpoint for file(" + file.getAbsolutePath() + ") "
+          + "is: " + lastCheckpointWriteOrderID + ", which is beyond the "
+          + "requested checkpoint time: " + checkpointWriteOrderID
+          + " and position " + lastCheckpointPosition);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
index f51935c..d9a2a9b 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
@@ -18,6 +18,17 @@
  */
 package org.apache.flume.channel.file;
 
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage;
+import org.apache.flume.channel.file.encryption.CipherProvider;
+import org.apache.flume.channel.file.encryption.CipherProviderFactory;
+import org.apache.flume.channel.file.encryption.KeyProvider;
+import org.apache.flume.channel.file.proto.ProtosFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
@@ -28,19 +39,6 @@ import java.security.Key;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 
-import javax.annotation.Nullable;
-
-import org.apache.flume.channel.file.proto.ProtosFactory;
-import org.apache.flume.channel.file.encryption.CipherProvider;
-import org.apache.flume.channel.file.encryption.CipherProviderFactory;
-import org.apache.flume.channel.file.encryption.KeyProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.GeneratedMessage;
-
 /**
  * Represents a single data file on disk. Has methods to write,
  * read sequentially (replay), and read randomly (channel takes).
@@ -81,9 +79,15 @@ class LogFileV3 extends LogFile {
           ProtosFactory.LogFileMetaData.newBuilder(logFileMetaData);
       metaDataBuilder.setCheckpointPosition(currentPosition);
       metaDataBuilder.setCheckpointWriteOrderID(logWriteOrderID);
+      /*
+       * Set the previous checkpoint position and write order id so that it
+       * would be possible to recover from a backup.
+       */
+      metaDataBuilder.setBackupCheckpointPosition(logFileMetaData
+        .getCheckpointPosition());
+      metaDataBuilder.setBackupCheckpointWriteOrderID(logFileMetaData
+        .getCheckpointWriteOrderID());
       logFileMetaData = metaDataBuilder.build();
-      LOGGER.info("Updating " + metaDataFile.getName()  + " currentPosition = "
-          + currentPosition + ", logWriteOrderID = " + logWriteOrderID);
       writeDelimitedTo(logFileMetaData, metaDataFile);
     }
   }
@@ -101,7 +105,7 @@ class LogFileV3 extends LogFile {
       FileInputStream inputStream = new FileInputStream(metaDataFile);
       try {
         ProtosFactory.LogFileMetaData metaData = Preconditions.checkNotNull(
-            ProtosFactory.LogFileMetaData.
+          ProtosFactory.LogFileMetaData.
             parseDelimitedFrom(inputStream), "Metadata cannot be null");
         if (metaData.getLogFileID() != logFileID) {
           throw new IOException("The file id of log file: "
@@ -193,6 +197,8 @@ class LogFileV3 extends LogFile {
       metaDataBuilder.setLogFileID(logFileID);
       metaDataBuilder.setCheckpointPosition(0L);
       metaDataBuilder.setCheckpointWriteOrderID(0L);
+      metaDataBuilder.setBackupCheckpointPosition(0L);
+      metaDataBuilder.setBackupCheckpointWriteOrderID(0L);
       File metaDataFile = Serialization.getMetaDataFile(file);
       writeDelimitedTo(metaDataBuilder.build(), metaDataFile);
     }
@@ -322,6 +328,9 @@ class LogFileV3 extends LogFile {
         setLogFileID(metaData.getLogFileID());
         setLastCheckpointPosition(metaData.getCheckpointPosition());
         setLastCheckpointWriteOrderID(metaData.getCheckpointWriteOrderID());
+        setPreviousCheckpointPosition(metaData.getBackupCheckpointPosition());
+        setPreviousCheckpointWriteOrderID(
+          metaData.getBackupCheckpointWriteOrderID());
       } finally {
         try {
           inputStream.close();

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
index fa4fd9d..fc47b23 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
@@ -18,6 +18,19 @@
  */
 package org.apache.flume.channel.file;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import org.apache.commons.collections.MultiMap;
+import org.apache.commons.collections.map.MultiValueMap;
+import org.apache.flume.channel.file.encryption.KeyProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
@@ -28,20 +41,6 @@ import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 
-import javax.annotation.Nullable;
-
-import org.apache.commons.collections.MultiMap;
-import org.apache.commons.collections.map.MultiValueMap;
-import org.apache.flume.channel.file.encryption.KeyProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
-
 /**
  * Processes a set of data logs, replaying said logs into the queue.
  */
@@ -69,6 +68,33 @@ class ReplayHandler {
    * finding the put and commit in logdir2.
    */
   private final List<Long> pendingTakes;
+  int readCount = 0;
+  int putCount = 0;
+  int takeCount = 0;
+  int rollbackCount = 0;
+  int commitCount = 0;
+  int skipCount = 0;
+
+  @VisibleForTesting
+  public int getReadCount() {
+    return readCount;
+  }
+  @VisibleForTesting
+  public int getPutCount() {
+    return putCount;
+  }
+  @VisibleForTesting
+  public int getTakeCount() {
+    return takeCount;
+  }
+  @VisibleForTesting
+  public int getCommitCount() {
+    return commitCount;
+  }
+  @VisibleForTesting
+  public int getRollbackCount() {
+    return rollbackCount;
+  }
 
   ReplayHandler(FlumeEventQueue queue,
       @Nullable KeyProvider encryptionKeyProvider) {
@@ -110,12 +136,7 @@ class ReplayHandler {
         // for puts the fileId is the fileID of the file they exist in
         // for takes the fileId and offset are pointers to a put
         int fileId = reader.getLogFileID();
-        int readCount = 0;
-        int putCount = 0;
-        int takeCount = 0;
-        int rollbackCount = 0;
-        int commitCount = 0;
-        int skipCount = 0;
+
         while ((entry = reader.next()) != null) {
           int offset = entry.getOffset();
           TransactionEventRecord record = entry.getEvent();
@@ -160,7 +181,7 @@ class ReplayHandler {
               }
             } else {
               Preconditions.checkArgument(false, "Unknown record type: "
-                  + Integer.toHexString(type));
+                + Integer.toHexString(type));
             }
 
           } else {
@@ -255,12 +276,6 @@ class ReplayHandler {
       }
       LogRecord entry = null;
       FlumeEventPointer ptr = null;
-      int readCount = 0;
-      int putCount = 0;
-      int takeCount = 0;
-      int rollbackCount = 0;
-      int commitCount = 0;
-      int skipCount = 0;
       while ((entry = next()) != null) {
         // for puts the fileId is the fileID of the file they exist in
         // for takes the fileId and offset are pointers to a put

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
index 7094d3c..d6897e1 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
@@ -18,11 +18,20 @@
  */
 package org.apache.flume.channel.file;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import java.io.BufferedInputStream;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Collections;
+import java.util.Set;
 
 class Serialization {
   private Serialization() {}
@@ -38,6 +47,9 @@ class Serialization {
   static final String METADATA_TMP_FILENAME = ".tmp";
   static final String OLD_METADATA_FILENAME = METADATA_FILENAME + ".old";
 
+  // 64 K buffer to copy files.
+  private static final int FILE_COPY_BUFFER_SIZE = 64 * 1024;
+
   public static final Logger LOG = LoggerFactory.getLogger(Serialization.class);
 
   static File getMetaDataTempFile(File metaDataFile) {
@@ -60,20 +72,39 @@ class Serialization {
   /**
    * Deletes all files in given directory.
    * @param checkpointDir - The directory whose files are to be deleted
+   * @param excludes - Names of files which should not be deleted from this
+   *                 directory.
    * @return - true if all files were successfully deleted, false otherwise.
    */
-  static boolean deleteAllFiles(File checkpointDir) {
+  static boolean deleteAllFiles(File checkpointDir,
+    @Nullable Set<String> excludes) {
     if (!checkpointDir.isDirectory()) {
       return false;
     }
-    StringBuilder builder = new StringBuilder("Deleted the following files from"
-        + " the checkpoint directory: ");
+
     File[] files = checkpointDir.listFiles();
+    if(files == null) {
+      return false;
+    }
+    StringBuilder builder;
+    if (files.length == 0) {
+      return true;
+    } else {
+      builder = new StringBuilder("Deleted the following files: ");
+    }
+    if(excludes == null) {
+      excludes = Collections.EMPTY_SET;
+    }
     for (File file : files) {
+      if(excludes.contains(file.getName())) {
+        LOG.info("Skipping " + file.getName() + " because it is in excludes " +
+          "set");
+        continue;
+      }
       if (!FileUtils.deleteQuietly(file)) {
         LOG.info(builder.toString());
         LOG.error("Error while attempting to delete: " +
-            file.getName());
+            file.getAbsolutePath());
         return false;
       }
       builder.append(", ").append(file.getName());
@@ -82,4 +113,70 @@ class Serialization {
     LOG.info(builder.toString());
     return true;
   }
+
+  /**
+   * Copy a file using a 64K size buffer. This method will copy the file and
+   * then fsync to disk
+   * @param from File to copy - this file should exist
+   * @param to Destination file - this file should not exist
+   * @return true if the copy was successful
+   */
+  static boolean copyFile(File from, File to) throws IOException {
+    Preconditions.checkNotNull(from, "Source file is null, file copy failed.");
+    Preconditions.checkNotNull(to, "Destination file is null, " +
+      "file copy failed.");
+    Preconditions.checkState(from.exists(), "Source file: " + from.toString() +
+      " does not exist.");
+    Preconditions.checkState(!to.exists(), "Destination file: "
+      + to.toString() + " unexpectedly exists.");
+
+    BufferedInputStream in = null;
+    RandomAccessFile out = null; //use a RandomAccessFile for easy fsync
+    try {
+      in = new BufferedInputStream(new FileInputStream(from));
+      out = new RandomAccessFile(to, "rw");
+      byte[] buf = new byte[FILE_COPY_BUFFER_SIZE];
+      int total = 0;
+      while(true) {
+        int read = in.read(buf);
+        if (read == -1) {
+          break;
+        }
+        out.write(buf, 0, read);
+        total += read;
+      }
+      out.getFD().sync();
+      Preconditions.checkState(total == from.length(),
+        "The size of the origin file and destination file are not equal.");
+      return true;
+    } catch (Exception ex) {
+      LOG.error("Error while attempting to copy " + from.toString() + " to "
+        + to.toString() + ".", ex);
+      Throwables.propagate(ex);
+    } finally {
+      Throwable th = null;
+      try {
+        if (in != null) {
+          in.close();
+        }
+      } catch (Throwable ex) {
+        LOG.error("Error while closing input file.", ex);
+        th = ex;
+      }
+      try {
+        if (out != null) {
+          out.close();
+        }
+      } catch (IOException ex) {
+        LOG.error("Error while closing output file.", ex);
+        Throwables.propagate(ex);
+      }
+      if (th != null) {
+        Throwables.propagate(th);
+      }
+    }
+    // Should never reach here.
+    throw new IOException("Copying file: " + from.toString() + " to: " + to
+      .toString() + " may have failed.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
index e6d4957..4860ac2 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
@@ -1286,6 +1286,14 @@ public final class ProtosFactory {
     boolean hasEncryption();
     org.apache.flume.channel.file.proto.ProtosFactory.LogFileEncryption getEncryption();
     org.apache.flume.channel.file.proto.ProtosFactory.LogFileEncryptionOrBuilder getEncryptionOrBuilder();
+
+    // optional sfixed64 backupCheckpointPosition = 6;
+    boolean hasBackupCheckpointPosition();
+    long getBackupCheckpointPosition();
+
+    // optional sfixed64 backupCheckpointWriteOrderID = 7;
+    boolean hasBackupCheckpointWriteOrderID();
+    long getBackupCheckpointWriteOrderID();
   }
   public static final class LogFileMetaData extends
       com.google.protobuf.GeneratedMessage
@@ -1369,12 +1377,34 @@ public final class ProtosFactory {
       return encryption_;
     }
 
+    // optional sfixed64 backupCheckpointPosition = 6;
+    public static final int BACKUPCHECKPOINTPOSITION_FIELD_NUMBER = 6;
+    private long backupCheckpointPosition_;
+    public boolean hasBackupCheckpointPosition() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    public long getBackupCheckpointPosition() {
+      return backupCheckpointPosition_;
+    }
+
+    // optional sfixed64 backupCheckpointWriteOrderID = 7;
+    public static final int BACKUPCHECKPOINTWRITEORDERID_FIELD_NUMBER = 7;
+    private long backupCheckpointWriteOrderID_;
+    public boolean hasBackupCheckpointWriteOrderID() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    public long getBackupCheckpointWriteOrderID() {
+      return backupCheckpointWriteOrderID_;
+    }
+
     private void initFields() {
       version_ = 0;
       logFileID_ = 0;
       checkpointPosition_ = 0L;
       checkpointWriteOrderID_ = 0L;
       encryption_ = org.apache.flume.channel.file.proto.ProtosFactory.LogFileEncryption.getDefaultInstance();
+      backupCheckpointPosition_ = 0L;
+      backupCheckpointWriteOrderID_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1425,6 +1455,12 @@ public final class ProtosFactory {
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeMessage(5, encryption_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeSFixed64(6, backupCheckpointPosition_);
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeSFixed64(7, backupCheckpointWriteOrderID_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1454,6 +1490,14 @@ public final class ProtosFactory {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(5, encryption_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeSFixed64Size(6, backupCheckpointPosition_);
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeSFixed64Size(7, backupCheckpointWriteOrderID_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1593,6 +1637,10 @@ public final class ProtosFactory {
           encryptionBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000010);
+        backupCheckpointPosition_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000020);
+        backupCheckpointWriteOrderID_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
 
@@ -1655,6 +1703,14 @@ public final class ProtosFactory {
         } else {
           result.encryption_ = encryptionBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.backupCheckpointPosition_ = backupCheckpointPosition_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.backupCheckpointWriteOrderID_ = backupCheckpointWriteOrderID_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1686,6 +1742,12 @@ public final class ProtosFactory {
         if (other.hasEncryption()) {
           mergeEncryption(other.getEncryption());
         }
+        if (other.hasBackupCheckpointPosition()) {
+          setBackupCheckpointPosition(other.getBackupCheckpointPosition());
+        }
+        if (other.hasBackupCheckpointWriteOrderID()) {
+          setBackupCheckpointWriteOrderID(other.getBackupCheckpointWriteOrderID());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1768,6 +1830,16 @@ public final class ProtosFactory {
               setEncryption(subBuilder.buildPartial());
               break;
             }
+            case 49: {
+              bitField0_ |= 0x00000020;
+              backupCheckpointPosition_ = input.readSFixed64();
+              break;
+            }
+            case 57: {
+              bitField0_ |= 0x00000040;
+              backupCheckpointWriteOrderID_ = input.readSFixed64();
+              break;
+            }
           }
         }
       }
@@ -1948,6 +2020,48 @@ public final class ProtosFactory {
         return encryptionBuilder_;
       }
 
+      // optional sfixed64 backupCheckpointPosition = 6;
+      private long backupCheckpointPosition_ ;
+      public boolean hasBackupCheckpointPosition() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      public long getBackupCheckpointPosition() {
+        return backupCheckpointPosition_;
+      }
+      public Builder setBackupCheckpointPosition(long value) {
+        bitField0_ |= 0x00000020;
+        backupCheckpointPosition_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearBackupCheckpointPosition() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        backupCheckpointPosition_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional sfixed64 backupCheckpointWriteOrderID = 7;
+      private long backupCheckpointWriteOrderID_ ;
+      public boolean hasBackupCheckpointWriteOrderID() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      public long getBackupCheckpointWriteOrderID() {
+        return backupCheckpointWriteOrderID_;
+      }
+      public Builder setBackupCheckpointWriteOrderID(long value) {
+        bitField0_ |= 0x00000040;
+        backupCheckpointWriteOrderID_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearBackupCheckpointWriteOrderID() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        backupCheckpointWriteOrderID_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:LogFileMetaData)
     }
 
@@ -5921,23 +6035,25 @@ public final class ProtosFactory {
       "sion\030\001 \002(\017\022\024\n\014writeOrderID\030\002 \002(\020\022\021\n\tqueu" +
       "eSize\030\003 \002(\017\022\021\n\tqueueHead\030\004 \002(\017\022\036\n\nactive" +
       "Logs\030\005 \003(\0132\n.ActiveLog\"-\n\tActiveLog\022\021\n\tl" +
-      "ogFileID\030\001 \002(\017\022\r\n\005count\030\002 \002(\017\"\231\001\n\017LogFil" +
+      "ogFileID\030\001 \002(\017\022\r\n\005count\030\002 \002(\017\"\341\001\n\017LogFil" +
       "eMetaData\022\017\n\007version\030\001 \002(\017\022\021\n\tlogFileID\030" +
       "\002 \002(\017\022\032\n\022checkpointPosition\030\003 \002(\020\022\036\n\026che" +
       "ckpointWriteOrderID\030\004 \002(\020\022&\n\nencryption\030" +
-      "\005 \001(\0132\022.LogFileEncryption\"Q\n\021LogFileEncr" +
-      "yption\022\026\n\016cipherProvider\030\001 \002(\t\022\020\n\010keyAli",
-      "as\030\002 \002(\t\022\022\n\nparameters\030\003 \001(\014\"S\n\026Transact" +
-      "ionEventHeader\022\014\n\004type\030\001 \002(\017\022\025\n\rtransact" +
-      "ionID\030\002 \002(\020\022\024\n\014writeOrderID\030\003 \002(\020\"!\n\003Put" +
-      "\022\032\n\005event\030\001 \002(\0132\013.FlumeEvent\"&\n\004Take\022\016\n\006" +
-      "fileID\030\001 \002(\017\022\016\n\006offset\030\002 \002(\017\"\n\n\010Rollback" +
-      "\"\026\n\006Commit\022\014\n\004type\030\001 \002(\017\"\030\n\026TransactionE" +
-      "ventFooter\">\n\nFlumeEvent\022\"\n\007headers\030\001 \003(" +
-      "\0132\021.FlumeEventHeader\022\014\n\004body\030\002 \002(\014\".\n\020Fl" +
-      "umeEventHeader\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002" +
-      "(\tB4\n#org.apache.flume.channel.file.prot",
-      "oB\rProtosFactory"
+      "\005 \001(\0132\022.LogFileEncryption\022 \n\030backupCheck" +
+      "pointPosition\030\006 \001(\020\022$\n\034backupCheckpointW",
+      "riteOrderID\030\007 \001(\020\"Q\n\021LogFileEncryption\022\026" +
+      "\n\016cipherProvider\030\001 \002(\t\022\020\n\010keyAlias\030\002 \002(\t" +
+      "\022\022\n\nparameters\030\003 \001(\014\"S\n\026TransactionEvent" +
+      "Header\022\014\n\004type\030\001 \002(\017\022\025\n\rtransactionID\030\002 " +
+      "\002(\020\022\024\n\014writeOrderID\030\003 \002(\020\"!\n\003Put\022\032\n\005even" +
+      "t\030\001 \002(\0132\013.FlumeEvent\"&\n\004Take\022\016\n\006fileID\030\001" +
+      " \002(\017\022\016\n\006offset\030\002 \002(\017\"\n\n\010Rollback\"\026\n\006Comm" +
+      "it\022\014\n\004type\030\001 \002(\017\"\030\n\026TransactionEventFoot" +
+      "er\">\n\nFlumeEvent\022\"\n\007headers\030\001 \003(\0132\021.Flum" +
+      "eEventHeader\022\014\n\004body\030\002 \002(\014\".\n\020FlumeEvent",
+      "Header\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\tB4\n#or" +
+      "g.apache.flume.channel.file.protoB\rProto" +
+      "sFactory"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5965,7 +6081,7 @@ public final class ProtosFactory {
           internal_static_LogFileMetaData_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_LogFileMetaData_descriptor,
-              new java.lang.String[] { "Version", "LogFileID", "CheckpointPosition", "CheckpointWriteOrderID", "Encryption", },
+              new java.lang.String[] { "Version", "LogFileID", "CheckpointPosition", "CheckpointWriteOrderID", "Encryption", "BackupCheckpointPosition", "BackupCheckpointWriteOrderID", },
               org.apache.flume.channel.file.proto.ProtosFactory.LogFileMetaData.class,
               org.apache.flume.channel.file.proto.ProtosFactory.LogFileMetaData.Builder.class);
           internal_static_LogFileEncryption_descriptor =

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
index 3a4e828..1e668d2 100644
--- a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
+++ b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
@@ -38,6 +38,8 @@ message LogFileMetaData {
   required sfixed64 checkpointPosition = 3;
   required sfixed64 checkpointWriteOrderID = 4;
   optional LogFileEncryption encryption = 5;
+  optional sfixed64 backupCheckpointPosition = 6;
+  optional sfixed64 backupCheckpointWriteOrderID = 7;
 }
 
 message LogFileEncryption {

http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
index 3da09ab..1ee5320 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
@@ -37,12 +37,15 @@ public class TestFileChannelBase {
   protected File checkpointDir;
   protected File[] dataDirs;
   protected String dataDir;
+  protected File backupDir;
 
   @Before
   public void setup() throws Exception {
     baseDir = Files.createTempDir();
     checkpointDir = new File(baseDir, "chkpt");
+    backupDir = new File(baseDir, "backup");
     Assert.assertTrue(checkpointDir.mkdirs() || checkpointDir.isDirectory());
+    Assert.assertTrue(backupDir.mkdirs() || backupDir.isDirectory());
     dataDirs = new File[3];
     dataDir = "";
     for (int i = 0; i < dataDirs.length; i++) {
@@ -68,7 +71,7 @@ public class TestFileChannelBase {
 
   protected Context createContext(Map<String, String> overrides) {
     return TestUtils.createFileChannelContext(checkpointDir.getAbsolutePath(),
-        dataDir, overrides);
+        dataDir, backupDir.getAbsolutePath(), overrides);
   }
 
   protected FileChannel createFileChannel() {
@@ -77,6 +80,6 @@ public class TestFileChannelBase {
 
   protected FileChannel createFileChannel(Map<String, String> overrides) {
     return TestUtils.createFileChannel(checkpointDir.getAbsolutePath(),
-        dataDir, overrides);
+        dataDir, backupDir.getAbsolutePath(), overrides);
   }
 }