You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/06/13 02:36:34 UTC

[3/3] git commit: FLUME-1586: File Channel should support verifying integrity of individual events.

FLUME-1586: File Channel should support verifying integrity of individual events.

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c57ebd1d
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c57ebd1d
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c57ebd1d

Branch: refs/heads/trunk
Commit: c57ebd1d2296fd1c0c8f84eecb16799de2a73e35
Parents: 791f443
Author: Brock Noland <br...@apache.org>
Authored: Wed Jun 12 17:35:38 2013 -0700
Committer: Brock Noland <br...@apache.org>
Committed: Wed Jun 12 17:35:38 2013 -0700

----------------------------------------------------------------------
 bin/flume-ng                                    |    6 +
 .../channel/file/CorruptEventException.java     |   36 +
 .../apache/flume/channel/file/FileChannel.java  |   47 +-
 .../java/org/apache/flume/channel/file/Log.java |   12 +-
 .../org/apache/flume/channel/file/LogFile.java  |  104 +-
 .../apache/flume/channel/file/LogFileV3.java    |   18 +-
 .../flume/channel/file/NoopRecordException.java |   35 +
 .../java/org/apache/flume/channel/file/Put.java |   38 +-
 .../flume/channel/file/ReplayHandler.java       |    3 +-
 .../flume/channel/file/Serialization.java       |   15 +-
 .../channel/file/TransactionEventRecord.java    |   10 +-
 .../flume/channel/file/proto/ProtosFactory.java | 1127 +++++++++---------
 .../src/main/proto/filechannel.proto            |    1 +
 .../flume/channel/file/TestFileChannel.java     |   44 +-
 .../org/apache/flume/channel/file/TestLog.java  |   28 +-
 .../apache/flume/channel/file/TestLogFile.java  |  111 +-
 .../file/TestTransactionEventRecordV3.java      |   16 +-
 .../apache/flume/channel/file/TestUtils.java    |   55 +-
 flume-ng-dist/pom.xml                           |    4 +
 flume-ng-dist/src/main/assembly/bin.xml         |    1 +
 flume-ng-dist/src/main/assembly/src.xml         |    2 +
 flume-tools/pom.xml                             |  143 +++
 .../flume/tools/FileChannelIntegrityTool.java   |  142 +++
 .../java/org/apache/flume/tools/FlumeTool.java  |   24 +
 .../org/apache/flume/tools/FlumeToolType.java   |   40 +
 .../org/apache/flume/tools/FlumeToolsMain.java  |   68 ++
 .../tools/TestFileChannelIntegrityTool.java     |  247 ++++
 pom.xml                                         |    7 +
 28 files changed, 1765 insertions(+), 619 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/bin/flume-ng
----------------------------------------------------------------------
diff --git a/bin/flume-ng b/bin/flume-ng
index 22b95b8..65cc985 100755
--- a/bin/flume-ng
+++ b/bin/flume-ng
@@ -26,6 +26,7 @@
 FLUME_AGENT_CLASS="org.apache.flume.node.Application"
 FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
 FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
+FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"
 
 CLEAN_FLAG=1
 ################################
@@ -261,6 +262,9 @@ case "$mode" in
   avro-client)
     opt_avro_client=1
     ;;
+  tool)
+    opt_tool=1
+    ;;
   version)
    opt_version=1
    CLEAN_FLAG=0
@@ -433,6 +437,8 @@ elif [ -n "$opt_avro_client" ] ; then
   run_flume $FLUME_AVRO_CLIENT_CLASS $args
 elif [ -n "${opt_version}" ] ; then
   run_flume $FLUME_VERSION_CLASS $args
+elif [ -n "${opt_tool}" ] ; then
+  run_flume $FLUME_TOOLS_CLASS $args
 else
   error "This message should never appear" 1
 fi

http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java
new file mode 100644
index 0000000..691d291
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+
+public class CorruptEventException extends Exception {
+
+  private static final long serialVersionUID = -2986946303540798416L;
+  public CorruptEventException() {
+    super();
+  }
+
+  public CorruptEventException(String msg) {
+    super(msg);
+  }
+
+  public CorruptEventException(String msg, Throwable th) {
+    super(msg, th);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index cc0d38a..36f150b 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -100,6 +100,7 @@ public class FileChannel extends BasicChannelSemantics {
   private String encryptionActiveKey;
   private String encryptionCipherProvider;
   private boolean useDualCheckpoints;
+  private boolean isTest = false;
 
   @Override
   public synchronized void setName(String name) {
@@ -451,6 +452,7 @@ public class FileChannel extends BasicChannelSemantics {
     private String getStateAsString() {
       return String.valueOf(getState());
     }
+
     @Override
     protected void doPut(Event event) throws InterruptedException {
       channelCounter.incrementEventPutAttemptCount();
@@ -511,23 +513,40 @@ public class FileChannel extends BasicChannelSemantics {
             + "log. Try increasing the log write timeout value. " +
             channelNameDescriptor);
       }
+
+      /*
+       * 1. Take an event which is in the queue.
+       * 2. If getting that event does not throw NoopRecordException,
+       *    then return it.
+       * 3. Else try to retrieve the next event from the queue
+       * 4. Repeat 2 and 3 until queue is empty or an event is returned.
+       */
+
       try {
-        FlumeEventPointer ptr = queue.removeHead(transactionID);
-        if(ptr != null) {
-          try {
-            // first add to takeList so that if write to disk
-            // fails rollback actually does it's work
-            Preconditions.checkState(takeList.offer(ptr), "takeList offer failed "
-                 + channelNameDescriptor);
-            log.take(transactionID, ptr); // write take to disk
-            Event event = log.get(ptr);
-            return event;
-          } catch (IOException e) {
-            throw new ChannelException("Take failed due to IO error "
-                    + channelNameDescriptor, e);
+        while (true) {
+          FlumeEventPointer ptr = queue.removeHead(transactionID);
+          if (ptr == null) {
+            return null;
+          } else {
+            try {
+              // first add to takeList so that if write to disk
+              // fails rollback actually does it's work
+              Preconditions.checkState(takeList.offer(ptr),
+                "takeList offer failed "
+                  + channelNameDescriptor);
+              log.take(transactionID, ptr); // write take to disk
+              Event event = log.get(ptr);
+              return event;
+            } catch (IOException e) {
+              throw new ChannelException("Take failed due to IO error "
+                + channelNameDescriptor, e);
+            } catch (NoopRecordException e) {
+              LOG.warn("Corrupt record replaced by File Channel Integrity " +
+                "tool found. Will retrieve next event", e);
+              takeList.remove(ptr);
+            }
           }
         }
-        return null;
       } finally {
         log.unlockShared();
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 1918baa..8dc0ff8 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -73,7 +73,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-class Log {
+public class Log {
   public static final String PREFIX = "log-";
   private static final Logger LOGGER = LoggerFactory.getLogger(Log.class);
   private static final int MIN_NUM_LOGS = 2;
@@ -571,12 +571,18 @@ class Log {
    * @throws InterruptedException
    */
   FlumeEvent get(FlumeEventPointer pointer) throws IOException,
-  InterruptedException {
+    InterruptedException, NoopRecordException {
     Preconditions.checkState(open, "Log is closed");
     int id = pointer.getFileID();
     LogFile.RandomReader logFile = idLogFileMap.get(id);
     Preconditions.checkNotNull(logFile, "LogFile is null for id " + id);
-    return logFile.get(pointer.getOffset());
+    try {
+      return logFile.get(pointer.getOffset());
+    } catch (CorruptEventException ex) {
+      open = false;
+      throw new IOException("Corrupt event found. Please run File Channel " +
+        "Integrity tool.", ex);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
index d3db896..bb8ce1a 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
@@ -22,6 +22,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import org.apache.flume.ChannelException;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.channel.file.encryption.CipherProvider;
 import org.apache.flume.channel.file.encryption.KeyProvider;
 import org.apache.flume.tools.DirectMemoryUtils;
@@ -31,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import java.io.EOFException;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
@@ -40,7 +44,9 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
-abstract class LogFile {
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class LogFile {
 
   private static final Logger LOG = LoggerFactory
       .getLogger(LogFile.class);
@@ -54,8 +60,9 @@ abstract class LogFile {
   private static final ByteBuffer FILL = DirectMemoryUtils.
       allocate(1024 * 1024); // preallocation, 1MB
 
-  protected static final byte OP_RECORD = Byte.MAX_VALUE;
-  protected static final byte OP_EOF = Byte.MIN_VALUE;
+  public static final byte OP_RECORD = Byte.MAX_VALUE;
+  public static final byte OP_NOOP = (Byte.MAX_VALUE + Byte.MIN_VALUE)/2;
+  public static final byte OP_EOF = Byte.MIN_VALUE;
 
   static {
     for (int i = 0; i < FILL.capacity(); i++) {
@@ -63,6 +70,13 @@ abstract class LogFile {
     }
   }
 
+  protected static void skipRecord(RandomAccessFile fileHandle,
+    int offset) throws IOException {
+    fileHandle.seek(offset);
+    int length = fileHandle.readInt();
+    fileHandle.skipBytes(length);
+  }
+
   abstract static class MetaDataWriter {
     private final File file;
     private final int logFileID;
@@ -296,6 +310,48 @@ abstract class LogFile {
     }
   }
 
+  /**
+   * This is an class meant to be an internal Flume API,
+   * and can change at any time. Intended to be used only from File Channel  Integrity
+   * test tool. Not to be used for any other purpose.
+   */
+  public static class OperationRecordUpdater {
+    private final RandomAccessFile fileHandle;
+    private final File file;
+
+    public OperationRecordUpdater(File file) throws FileNotFoundException {
+      Preconditions.checkState(file.exists(), "File to update, " +
+        file.toString() + " does not exist.");
+      this.file = file;
+      fileHandle = new RandomAccessFile(file, "rw");
+    }
+
+    public void markRecordAsNoop(long offset) throws IOException {
+      // First ensure that the offset actually is an OP_RECORD. There is a
+      // small possibility that it still is OP_RECORD,
+      // but is not actually the beginning of a record. Is there anything we
+      // can do about it?
+      fileHandle.seek(offset);
+      byte byteRead = fileHandle.readByte();
+      Preconditions.checkState(byteRead == OP_RECORD || byteRead == OP_NOOP,
+        "Expected to read a record but the byte read indicates EOF");
+      fileHandle.seek(offset);
+      LOG.info("Marking event as " + OP_NOOP + " at " + offset + " for file " +
+        file.toString());
+      fileHandle.writeByte(OP_NOOP);
+    }
+
+    public void close() {
+      try {
+        fileHandle.getFD().sync();
+        fileHandle.close();
+      } catch (IOException e) {
+        LOG.error("Could not close file handle to file " +
+          fileHandle.toString(), e);
+      }
+    }
+  }
+
   static abstract class RandomReader {
     private final File file;
     private final BlockingQueue<RandomAccessFile> readFileHandles =
@@ -311,7 +367,7 @@ abstract class LogFile {
     }
 
     protected abstract TransactionEventRecord doGet(RandomAccessFile fileHandle)
-        throws IOException;
+        throws IOException, CorruptEventException;
 
     abstract int getVersion();
 
@@ -323,13 +379,18 @@ abstract class LogFile {
       return encryptionKeyProvider;
     }
 
-    FlumeEvent get(int offset) throws IOException, InterruptedException {
+    FlumeEvent get(int offset) throws IOException, InterruptedException,
+      CorruptEventException, NoopRecordException {
       Preconditions.checkState(open, "File closed");
       RandomAccessFile fileHandle = checkOut();
       boolean error = true;
       try {
         fileHandle.seek(offset);
         byte operation = fileHandle.readByte();
+        if(operation == OP_NOOP) {
+          throw new NoopRecordException("No op record found. Corrupt record " +
+            "may have been repaired by File Channel Integrity tool");
+        }
         Preconditions.checkState(operation == OP_RECORD,
             Integer.toHexString(operation));
         TransactionEventRecord record = doGet(fileHandle);
@@ -408,7 +469,7 @@ abstract class LogFile {
     }
   }
 
-  static abstract class SequentialReader {
+  public static abstract class SequentialReader {
 
     private final RandomAccessFile fileHandle;
     private final FileChannel fileChannel;
@@ -434,7 +495,7 @@ abstract class LogFile {
       fileHandle = new RandomAccessFile(file, "r");
       fileChannel = fileHandle.getChannel();
     }
-    abstract LogRecord doNext(int offset) throws IOException;
+    abstract LogRecord doNext(int offset) throws IOException, CorruptEventException;
 
     abstract int getVersion();
 
@@ -488,7 +549,7 @@ abstract class LogFile {
       }
     }
 
-    LogRecord next() throws IOException {
+    public LogRecord next() throws IOException, CorruptEventException {
       int offset = -1;
       try {
         long position = fileChannel.position();
@@ -499,14 +560,26 @@ abstract class LogFile {
         }
         offset = (int) position;
         Preconditions.checkState(offset >= 0);
-        byte operation = fileHandle.readByte();
-        if(operation != OP_RECORD) {
-          if(operation == OP_EOF) {
+        while (offset < fileHandle.length()) {
+          byte operation = fileHandle.readByte();
+          if (operation == OP_RECORD) {
+            break;
+          } else if (operation == OP_EOF) {
             LOG.info("Encountered EOF at " + offset + " in " + file);
+            return null;
+          } else if (operation == OP_NOOP) {
+            LOG.info("No op event found in file: " + file.toString() +
+              " at " + offset + ". Skipping event.");
+            skipRecord(fileHandle, offset + 1);
+            offset = (int) fileHandle.getFilePointer();
+            continue;
           } else {
             LOG.error("Encountered non op-record at " + offset + " " +
-                Integer.toHexString(operation) + " in " + file);
+              Integer.toHexString(operation) + " in " + file);
+            return null;
           }
+        }
+        if(offset >= fileHandle.length()) {
           return null;
         }
         return doNext(offset);
@@ -518,7 +591,10 @@ abstract class LogFile {
       }
     }
 
-    void close() {
+    public long getPosition() throws IOException {
+      return fileChannel.position();
+    }
+    public void close() {
       if(fileHandle != null) {
         try {
           fileHandle.close();
@@ -540,7 +616,7 @@ abstract class LogFile {
     return buffer;
   }
 
-  public static void main(String[] args) throws EOFException, IOException {
+  public static void main(String[] args) throws EOFException, IOException, CorruptEventException {
     File file = new File(args[0]);
     LogFile.SequentialReader reader = null;
     try {

http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
index d9a2a9b..38f6ecb 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
@@ -21,6 +21,8 @@ package org.apache.flume.channel.file;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.GeneratedMessage;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.channel.file.encryption.CipherProvider;
 import org.apache.flume.channel.file.encryption.CipherProviderFactory;
 import org.apache.flume.channel.file.encryption.KeyProvider;
@@ -43,7 +45,9 @@ import java.util.concurrent.LinkedBlockingDeque;
  * Represents a single data file on disk. Has methods to write,
  * read sequentially (replay), and read randomly (channel takes).
  */
-class LogFileV3 extends LogFile {
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class LogFileV3 extends LogFile {
   protected static final Logger LOGGER =
       LoggerFactory.getLogger(LogFileV3.class);
 
@@ -267,7 +271,7 @@ class LogFileV3 extends LogFile {
     }
     @Override
     protected TransactionEventRecord doGet(RandomAccessFile fileHandle)
-        throws IOException {
+        throws IOException, CorruptEventException {
       // readers are opened right when the file is created and thus
       // empty. As such we wait to initialize until there is some
       // data before we we initialize
@@ -297,10 +301,11 @@ class LogFileV3 extends LogFile {
     }
   }
 
-  static class SequentialReader extends LogFile.SequentialReader {
+  public static class SequentialReader extends LogFile.SequentialReader {
     private CipherProvider.Decryptor decryptor;
-    SequentialReader(File file, @Nullable KeyProvider encryptionKeyProvider)
-        throws EOFException, IOException {
+
+    public SequentialReader(File file, @Nullable KeyProvider
+      encryptionKeyProvider) throws EOFException, IOException {
       super(file, encryptionKeyProvider);
       File metaDataFile = Serialization.getMetaDataFile(file);
       FileInputStream inputStream = new FileInputStream(metaDataFile);
@@ -344,8 +349,9 @@ class LogFileV3 extends LogFile {
     public int getVersion() {
       return Serialization.VERSION_3;
     }
+
     @Override
-    LogRecord doNext(int offset) throws IOException {
+    LogRecord doNext(int offset) throws IOException, CorruptEventException {
       byte[] buffer = readDelimitedBuffer(getFileHandle());
       if(decryptor != null) {
         buffer = decryptor.decrypt(buffer);

http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/NoopRecordException.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/NoopRecordException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/NoopRecordException.java
new file mode 100644
index 0000000..5f446b8
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/NoopRecordException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+public class NoopRecordException extends Exception {
+  private static final long serialVersionUID = -7394180633208889738L;
+
+  public NoopRecordException() {
+    super();
+  }
+
+  public NoopRecordException(String msg) {
+    super(msg);
+  }
+
+  public NoopRecordException(String msg, Throwable th) {
+    super(msg, th);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
index 4235a79..f08f024 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
@@ -24,7 +24,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Map;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.flume.channel.file.proto.ProtosFactory;
 
 import com.google.common.base.Preconditions;
@@ -36,13 +39,19 @@ import com.google.protobuf.ByteString;
  */
 class Put extends TransactionEventRecord {
   private FlumeEvent event;
+  // Should we move this to a higher level to not make multiple instances?
+  // Doing that might cause performance issues, since access to this would
+  // need to be synchronized (the whole reset-update-getValue cycle would
+  // need to be).
+  private final Checksum checksum = new CRC32();
 
+  @VisibleForTesting
   Put(Long transactionID, Long logWriteOrderID) {
-    super(transactionID, logWriteOrderID);
+    this(transactionID, logWriteOrderID, null);
   }
 
   Put(Long transactionID, Long logWriteOrderID, FlumeEvent event) {
-    this(transactionID, logWriteOrderID);
+    super(transactionID, logWriteOrderID);
     this.event = event;
   }
 
@@ -78,11 +87,14 @@ class Put extends TransactionEventRecord {
       }
     }
     eventBuilder.setBody(ByteString.copyFrom(event.getBody()));
-    putBuilder.setEvent(eventBuilder.build());
+    ProtosFactory.FlumeEvent protoEvent = eventBuilder.build();
+    putBuilder.setEvent(protoEvent);
+    putBuilder.setChecksum(calculateChecksum(event.getBody()));
     putBuilder.build().writeDelimitedTo(out);
   }
   @Override
-  void readProtos(InputStream in) throws IOException {
+  void readProtos(InputStream in) throws IOException,
+    CorruptEventException {
     ProtosFactory.Put put = Preconditions.checkNotNull(ProtosFactory.
         Put.parseDelimitedFrom(in), "Put cannot be null");
     Map<String, String> headers = Maps.newHashMap();
@@ -90,9 +102,25 @@ class Put extends TransactionEventRecord {
     for(ProtosFactory.FlumeEventHeader header : protosEvent.getHeadersList()) {
       headers.put(header.getKey(), header.getValue());
     }
+    byte[] eventBody = protosEvent.getBody().toByteArray();
+
+    if (put.hasChecksum()) {
+      long eventBodyChecksum = calculateChecksum(eventBody);
+      if (eventBodyChecksum != put.getChecksum()) {
+        throw new CorruptEventException("Expected checksum for event was " +
+          eventBodyChecksum + " but the checksum of the event is " + put.getChecksum());
+      }
+    }
     // TODO when we remove v2, remove FlumeEvent and use EventBuilder here
-    event = new FlumeEvent(headers, protosEvent.getBody().toByteArray());
+    event = new FlumeEvent(headers, eventBody);
+  }
+
+  protected long calculateChecksum(byte[] body) {
+    checksum.reset();
+    checksum.update(body, 0, body.length);
+    return checksum.getValue();
   }
+
   @Override
   public short getRecordType() {
     return Type.PUT.get();

http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
index fc47b23..c8f5fdd 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
@@ -26,6 +26,7 @@ import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
 import org.apache.commons.collections.MultiMap;
 import org.apache.commons.collections.map.MultiValueMap;
+import org.apache.flume.ChannelException;
 import org.apache.flume.channel.file.encryption.KeyProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -373,7 +374,7 @@ class ReplayHandler {
       }
     }
   }
-  private LogRecord next() throws IOException {
+  private LogRecord next() throws IOException, CorruptEventException {
     LogRecord resultLogRecord = logRecordBuffer.poll();
     if(resultLogRecord != null) {
       // there is more log records to read

http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
index d6897e1..f8160d9 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
@@ -21,6 +21,8 @@ package org.apache.flume.channel.file;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import org.apache.commons.io.FileUtils;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,7 +35,9 @@ import java.io.RandomAccessFile;
 import java.util.Collections;
 import java.util.Set;
 
-class Serialization {
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class Serialization {
   private Serialization() {}
 
   static final long SIZE_OF_INT = 4;
@@ -43,9 +47,10 @@ class Serialization {
   static final int VERSION_2 = 2;
   static final int VERSION_3 = 3;
 
-  static final String METADATA_FILENAME = ".meta";
-  static final String METADATA_TMP_FILENAME = ".tmp";
-  static final String OLD_METADATA_FILENAME = METADATA_FILENAME + ".old";
+  public static final String METADATA_FILENAME = ".meta";
+  public static final String METADATA_TMP_FILENAME = ".tmp";
+  public static final String OLD_METADATA_FILENAME = METADATA_FILENAME +
+    ".old";
 
   // 64 K buffer to copy files.
   private static final int FILE_COPY_BUFFER_SIZE = 64 * 1024;
@@ -121,7 +126,7 @@ class Serialization {
    * @param to Destination file - this file should not exist
    * @return true if the copy was successful
    */
-  static boolean copyFile(File from, File to) throws IOException {
+  public static boolean copyFile(File from, File to) throws IOException {
     Preconditions.checkNotNull(from, "Source file is null, file copy failed.");
     Preconditions.checkNotNull(to, "Destination file is null, " +
       "file copy failed.");

http://git-wip-us.apache.org/repos/asf/flume/blob/c57ebd1d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
index 073042f..dda9b3f 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
@@ -29,6 +29,8 @@ import java.io.OutputStream;
 import java.lang.reflect.Constructor;
 import java.nio.ByteBuffer;
 
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.channel.file.proto.ProtosFactory;
 import org.apache.hadoop.io.Writable;
 import org.slf4j.Logger;
@@ -41,7 +43,9 @@ import com.google.common.collect.ImmutableMap;
 /**
  * Base class for records in data file: Put, Take, Rollback, Commit
  */
-abstract class TransactionEventRecord implements Writable {
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class TransactionEventRecord implements Writable {
   private static final Logger LOG = LoggerFactory
       .getLogger(TransactionEventRecord.class);
   private final long transactionID;
@@ -63,7 +67,7 @@ abstract class TransactionEventRecord implements Writable {
 
   abstract void writeProtos(OutputStream out) throws IOException;
 
-  abstract void readProtos(InputStream in) throws IOException;
+  abstract void readProtos(InputStream in) throws IOException, CorruptEventException;
 
   long getLogWriteOrderID() {
     return logWriteOrderID;
@@ -187,7 +191,7 @@ abstract class TransactionEventRecord implements Writable {
 
 
   static TransactionEventRecord fromByteArray(byte[] buffer)
-      throws IOException {
+      throws IOException, CorruptEventException {
     ByteArrayInputStream in = new ByteArrayInputStream(buffer);
     try {
       ProtosFactory.TransactionEventHeader header = Preconditions.