You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/04 23:42:14 UTC

[09/50] [abbrv] hadoop git commit: HDFS-8965. Harden edit log reading code against out of memory errors (cmccabe)

HDFS-8965. Harden edit log reading code against out of memory errors (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/24f6a7c9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/24f6a7c9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/24f6a7c9

Branch: refs/heads/YARN-1197
Commit: 24f6a7c9563757234f53ca23e12f9c9208b53082
Parents: 8fa41d9
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Mon Aug 31 17:31:29 2015 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Mon Aug 31 18:06:30 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../bkjournal/BookKeeperEditLogInputStream.java |   2 +-
 .../hadoop/hdfs/protocol/LayoutVersion.java     |   2 +-
 .../namenode/EditLogBackupInputStream.java      |   2 +-
 .../server/namenode/EditLogFileInputStream.java |   2 +-
 .../hdfs/server/namenode/FSEditLogOp.java       | 354 +++++++++++++------
 .../hdfs/server/namenode/TestEditLog.java       |   2 +-
 .../namenode/TestEditLogFileInputStream.java    |  80 +++++
 8 files changed, 341 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/24f6a7c9/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 6584c84..57ddcb2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -872,6 +872,8 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-8946. Improve choosing datanode storage for block placement. (yliu)
 
+    HDFS-8965. Harden edit log reading code against out of memory errors (cmccabe)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24f6a7c9/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
index e2098dd..86da807 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
@@ -83,7 +83,7 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
     tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
     DataInputStream in = new DataInputStream(tracker);
 
-    reader = new FSEditLogOp.Reader(in, tracker, logVersion);
+    reader = FSEditLogOp.Reader.create(in, tracker, logVersion);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24f6a7c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
index c893744..1750790 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
@@ -87,7 +87,7 @@ public class LayoutVersion {
     FSIMAGE_COMPRESSION(-25, "Support for fsimage compression"),
     FSIMAGE_CHECKSUM(-26, "Support checksum for fsimage"),
     REMOVE_REL13_DISK_LAYOUT_SUPPORT(-27, "Remove support for 0.13 disk layout"),
-    EDITS_CHESKUM(-28, "Support checksum for editlog"),
+    EDITS_CHECKSUM(-28, "Support checksum for editlog"),
     UNUSED(-29, "Skipped version"),
     FSIMAGE_NAME_OPTIMIZATION(-30, "Store only last part of path in fsimage"),
     RESERVED_REL20_203(-31, -19, "Reserved for release 0.20.203", true,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24f6a7c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
index 689cacc..81d285a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
@@ -119,7 +119,7 @@ class EditLogBackupInputStream extends EditLogInputStream {
 
     this.version = version;
 
-    reader = new FSEditLogOp.Reader(in, tracker, version);
+    reader = FSEditLogOp.Reader.create(in, tracker, version);
   }
 
   void clear() throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24f6a7c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
index 3e21c24..73a162e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
@@ -157,7 +157,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
               "flags from log");
         }
       }
-      reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion);
+      reader = FSEditLogOp.Reader.create(dataIn, tracker, logVersion);
       reader.setMaxOpSize(maxOpSize);
       state = State.OPEN;
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24f6a7c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index ab36f17..125e1cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -4518,42 +4518,46 @@ public abstract class FSEditLogOp {
   /**
    * Class for reading editlog ops from a stream
    */
-  public static class Reader {
-    private final DataInputStream in;
-    private final StreamLimiter limiter;
-    private final int logVersion;
-    private final Checksum checksum;
-    private final OpInstanceCache cache;
-    private int maxOpSize;
-    private final boolean supportEditLogLength;
+  public abstract static class Reader {
+    final DataInputStream in;
+    final StreamLimiter limiter;
+    final OpInstanceCache cache;
+    final byte[] temp = new byte[4096];
+    final int logVersion;
+    int maxOpSize;
+
+    public static Reader create(DataInputStream in, StreamLimiter limiter,
+                                int logVersion) {
+      if (logVersion < NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION) {
+        // Use the LengthPrefixedReader on edit logs which are newer than what
+        // we can parse.  (Newer layout versions are represented by smaller
+        // negative integers, for historical reasons.) Even though we can't
+        // parse the Ops contained in them, we should still be able to call
+        // scanOp on them.  This is important for the JournalNode during rolling
+        // upgrade.
+        return new LengthPrefixedReader(in, limiter, logVersion);
+      } else if (NameNodeLayoutVersion.supports(
+              NameNodeLayoutVersion.Feature.EDITLOG_LENGTH, logVersion)) {
+        return new LengthPrefixedReader(in, limiter, logVersion);
+      } else if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITS_CHECKSUM, logVersion)) {
+        Checksum checksum = DataChecksum.newCrc32();
+        return new ChecksummedReader(checksum, in, limiter, logVersion);
+      } else {
+        return new LegacyReader(in, limiter, logVersion);
+      }
+    }
 
     /**
      * Construct the reader
-     * @param in The stream to read from.
-     * @param logVersion The version of the data coming from the stream.
+     * @param in            The stream to read from.
+     * @param limiter       The limiter for this stream.
+     * @param logVersion    The version of the data coming from the stream.
      */
-    public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
-      this.logVersion = logVersion;
-      if (NameNodeLayoutVersion.supports(
-          LayoutVersion.Feature.EDITS_CHESKUM, logVersion)) {
-        this.checksum = DataChecksum.newCrc32();
-      } else {
-        this.checksum = null;
-      }
-      // It is possible that the logVersion is actually a future layoutversion
-      // during the rolling upgrade (e.g., the NN gets upgraded first). We
-      // assume future layout will also support length of editlog op.
-      this.supportEditLogLength = NameNodeLayoutVersion.supports(
-          NameNodeLayoutVersion.Feature.EDITLOG_LENGTH, logVersion)
-          || logVersion < NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION;
-
-      if (this.checksum != null) {
-        this.in = new DataInputStream(
-            new CheckedInputStream(in, this.checksum));
-      } else {
-        this.in = in;
-      }
+    Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
+      this.in = in;
       this.limiter = limiter;
+      this.logVersion = logVersion;
       this.cache = new OpInstanceCache();
       this.maxOpSize = DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
     }
@@ -4606,26 +4610,25 @@ public abstract class FSEditLogOp {
       }
     }
 
-    private void verifyTerminator() throws IOException {
+    void verifyTerminator() throws IOException {
       /** The end of the edit log should contain only 0x00 or 0xff bytes.
        * If it contains other bytes, the log itself may be corrupt.
        * It is important to check this; if we don't, a stray OP_INVALID byte 
        * could make us stop reading the edit log halfway through, and we'd never
        * know that we had lost data.
        */
-      byte[] buf = new byte[4096];
       limiter.clearLimit();
       int numRead = -1, idx = 0;
       while (true) {
         try {
           numRead = -1;
           idx = 0;
-          numRead = in.read(buf);
+          numRead = in.read(temp);
           if (numRead == -1) {
             return;
           }
           while (idx < numRead) {
-            if ((buf[idx] != (byte)0) && (buf[idx] != (byte)-1)) {
+            if ((temp[idx] != (byte)0) && (temp[idx] != (byte)-1)) {
               throw new IOException("Read extra bytes after " +
                 "the terminator!");
             }
@@ -4638,7 +4641,7 @@ public abstract class FSEditLogOp {
           if (numRead != -1) { 
             in.reset();
             IOUtils.skipFully(in, idx);
-            in.mark(buf.length + 1);
+            in.mark(temp.length + 1);
             IOUtils.skipFully(in, 1);
           }
         }
@@ -4653,14 +4656,164 @@ public abstract class FSEditLogOp {
      * If an exception is thrown, the stream's mark will be set to the first
      * problematic byte.  This usually means the beginning of the opcode.
      */
-    private FSEditLogOp decodeOp() throws IOException {
-      limiter.setLimit(maxOpSize);
+    public abstract FSEditLogOp decodeOp() throws IOException;
+
+    /**
+     * Similar to decodeOp(), but we only retrieve the transaction ID of the
+     * Op rather than reading it.  If the edit log format supports length
+     * prefixing, this can be much faster than full decoding.
+     *
+     * @return the last txid of the segment, or INVALID_TXID on EOF.
+     */
+    public abstract long scanOp() throws IOException;
+  }
+
+  /**
+   * Reads edit logs which are prefixed with a length.  These edit logs also
+   * include a checksum and transaction ID.
+   */
+  private static class LengthPrefixedReader extends Reader {
+    /**
+     * The minimum length of a length-prefixed Op.
+     *
+     * The minimum Op has:
+     * 1-byte opcode
+     * 4-byte length
+     * 8-byte txid
+     * 0-byte body
+     * 4-byte checksum
+     */
+    private static final int MIN_OP_LENGTH = 17;
+
+    /**
+     * The op id length.
+     *
+     * Not included in the stored length.
+     */
+    private static final int OP_ID_LENGTH = 1;
+
+    /**
+     * The checksum length.
+     *
+     * Not included in the stored length.
+     */
+    private static final int CHECKSUM_LENGTH = 4;
+
+    private final Checksum checksum;
+
+    LengthPrefixedReader(DataInputStream in, StreamLimiter limiter,
+                         int logVersion) {
+      super(in, limiter, logVersion);
+      this.checksum = DataChecksum.newCrc32();
+    }
+
+    @Override
+    public FSEditLogOp decodeOp() throws IOException {
+      long txid = decodeOpFrame();
+      if (txid == HdfsServerConstants.INVALID_TXID) {
+        return null;
+      }
+      in.reset();
       in.mark(maxOpSize);
+      FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(in.readByte());
+      FSEditLogOp op = cache.get(opCode);
+      if (op == null) {
+        throw new IOException("Read invalid opcode " + opCode);
+      }
+      op.setTransactionId(txid);
+      IOUtils.skipFully(in, 4 + 8); // skip length and txid
+      op.readFields(in, logVersion);
+      // skip over the checksum, which we validated above.
+      IOUtils.skipFully(in, CHECKSUM_LENGTH);
+      return op;
+    }
+
+    @Override
+    public long scanOp() throws IOException {
+      return decodeOpFrame();
+    }
 
-      if (checksum != null) {
-        checksum.reset();
+    /**
+     * Decode the opcode "frame".  This includes reading the opcode and
+     * transaction ID, and validating the checksum and length.  It does not
+     * include reading the opcode-specific fields.
+     * The input stream will be advanced to the end of the op at the end of this
+     * function.
+     *
+     * @return        An op with the txid set, but none of the other fields
+     *                  filled in, or null if we hit EOF.
+     */
+    private long decodeOpFrame() throws IOException {
+      limiter.setLimit(maxOpSize);
+      in.mark(maxOpSize);
+      byte opCodeByte;
+      try {
+        opCodeByte = in.readByte();
+      } catch (EOFException eof) {
+        // EOF at an opcode boundary is expected.
+        return HdfsServerConstants.INVALID_TXID;
       }
+      if (opCodeByte == FSEditLogOpCodes.OP_INVALID.getOpCode()) {
+        verifyTerminator();
+        return HdfsServerConstants.INVALID_TXID;
+      }
+      // Here, we verify that the Op size makes sense and that the
+      // data matches its checksum before attempting to construct an Op.
+      // This is important because otherwise we may encounter an
+      // OutOfMemoryException which could bring down the NameNode or
+      // JournalNode when reading garbage data.
+      int opLength =  in.readInt() + OP_ID_LENGTH + CHECKSUM_LENGTH;
+      if (opLength > maxOpSize) {
+        throw new IOException("Op " + (int)opCodeByte + " has size " +
+            opLength + ", but maxOpSize = " + maxOpSize);
+      } else  if (opLength < MIN_OP_LENGTH) {
+        throw new IOException("Op " + (int)opCodeByte + " has size " +
+            opLength + ", but the minimum op size is " + MIN_OP_LENGTH);
+      }
+      long txid = in.readLong();
+      // Verify checksum
+      in.reset();
+      in.mark(maxOpSize);
+      checksum.reset();
+      for (int rem = opLength - CHECKSUM_LENGTH; rem > 0;) {
+        int toRead = Math.min(temp.length, rem);
+        IOUtils.readFully(in, temp, 0, toRead);
+        checksum.update(temp, 0, toRead);
+        rem -= toRead;
+      }
+      int expectedChecksum = in.readInt();
+      int calculatedChecksum = (int)checksum.getValue();
+      if (expectedChecksum != calculatedChecksum) {
+        throw new ChecksumException(
+            "Transaction is corrupt. Calculated checksum is " +
+            calculatedChecksum + " but read checksum " +
+            expectedChecksum, txid);
+      }
+      return txid;
+    }
+  }
+
+  /**
+   * Read edit logs which have a checksum and a transaction ID, but not a
+   * length.
+   */
+  private static class ChecksummedReader extends Reader {
+    private final Checksum checksum;
 
+    ChecksummedReader(Checksum checksum, DataInputStream in,
+                      StreamLimiter limiter, int logVersion) {
+      super(new DataInputStream(
+          new CheckedInputStream(in, checksum)), limiter, logVersion);
+      this.checksum = checksum;
+    }
+
+    @Override
+    public FSEditLogOp decodeOp() throws IOException {
+      limiter.setLimit(maxOpSize);
+      in.mark(maxOpSize);
+      // Reset the checksum.  Since we are using a CheckedInputStream, each
+      // subsequent read from the  stream will update the checksum.
+      checksum.reset();
       byte opCodeByte;
       try {
         opCodeByte = in.readByte();
@@ -4668,88 +4821,89 @@ public abstract class FSEditLogOp {
         // EOF at an opcode boundary is expected.
         return null;
       }
-
       FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
       if (opCode == OP_INVALID) {
         verifyTerminator();
         return null;
       }
-
       FSEditLogOp op = cache.get(opCode);
       if (op == null) {
         throw new IOException("Read invalid opcode " + opCode);
       }
-
-      if (supportEditLogLength) {
-        in.readInt();
+      op.setTransactionId(in.readLong());
+      op.readFields(in, logVersion);
+      // Verify checksum
+      int calculatedChecksum = (int)checksum.getValue();
+      int expectedChecksum = in.readInt();
+      if (expectedChecksum != calculatedChecksum) {
+        throw new ChecksumException(
+            "Transaction is corrupt. Calculated checksum is " +
+                calculatedChecksum + " but read checksum " +
+                expectedChecksum, op.txid);
       }
+      return op;
+    }
 
+    @Override
+    public long scanOp() throws IOException {
+      // Edit logs of this age don't have any length prefix, so we just have
+      // to read the entire Op.
+      FSEditLogOp op = decodeOp();
+      return op == null ?
+          HdfsServerConstants.INVALID_TXID : op.getTransactionId();
+    }
+  }
+
+  /**
+   * Read older edit logs which may or may not have transaction IDs and other
+   * features.  This code is used during upgrades and to allow HDFS INotify to
+   * read older edit log files.
+   */
+  private static class LegacyReader extends Reader {
+    LegacyReader(DataInputStream in,
+                      StreamLimiter limiter, int logVersion) {
+      super(in, limiter, logVersion);
+    }
+
+    @Override
+    public FSEditLogOp decodeOp() throws IOException {
+      limiter.setLimit(maxOpSize);
+      in.mark(maxOpSize);
+      byte opCodeByte;
+      try {
+        opCodeByte = in.readByte();
+      } catch (EOFException eof) {
+        // EOF at an opcode boundary is expected.
+        return null;
+      }
+      FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
+      if (opCode == OP_INVALID) {
+        verifyTerminator();
+        return null;
+      }
+      FSEditLogOp op = cache.get(opCode);
+      if (op == null) {
+        throw new IOException("Read invalid opcode " + opCode);
+      }
       if (NameNodeLayoutVersion.supports(
-          LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
-        // Read the txid
+            LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
         op.setTransactionId(in.readLong());
       } else {
         op.setTransactionId(HdfsServerConstants.INVALID_TXID);
       }
-
       op.readFields(in, logVersion);
-
-      validateChecksum(in, checksum, op.txid);
       return op;
     }
 
-    /**
-     * Similar with decodeOp(), but instead of doing the real decoding, we skip
-     * the content of the op if the length of the editlog is supported.
-     * @return the last txid of the segment, or INVALID_TXID on exception
-     */
+    @Override
     public long scanOp() throws IOException {
-      if (supportEditLogLength) {
-        limiter.setLimit(maxOpSize);
-        in.mark(maxOpSize);
-
-        final byte opCodeByte;
-        try {
-          opCodeByte = in.readByte(); // op code
-        } catch (EOFException e) {
-          return HdfsServerConstants.INVALID_TXID;
-        }
-
-        FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
-        if (opCode == OP_INVALID) {
-          verifyTerminator();
-          return HdfsServerConstants.INVALID_TXID;
-        }
-
-        int length = in.readInt(); // read the length of the op
-        long txid = in.readLong(); // read the txid
-
-        // skip the remaining content
-        IOUtils.skipFully(in, length - 8); 
-        // TODO: do we want to verify checksum for JN? For now we don't.
-        return txid;
-      } else {
-        FSEditLogOp op = decodeOp();
-        return op == null ? HdfsServerConstants.INVALID_TXID : op.getTransactionId();
-      }
-    }
-
-    /**
-     * Validate a transaction's checksum
-     */
-    private void validateChecksum(DataInputStream in,
-                                  Checksum checksum,
-                                  long txid)
-        throws IOException {
-      if (checksum != null) {
-        int calculatedChecksum = (int)checksum.getValue();
-        int readChecksum = in.readInt(); // read in checksum
-        if (readChecksum != calculatedChecksum) {
-          throw new ChecksumException(
-              "Transaction is corrupt. Calculated checksum is " +
-              calculatedChecksum + " but read checksum " + readChecksum, txid);
-        }
+      if (!NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
+        throw new IOException("Can't scan a pre-transactional edit log.");
       }
+      FSEditLogOp op = decodeOp();
+      return op == null ?
+          HdfsServerConstants.INVALID_TXID : op.getTransactionId();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24f6a7c9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
index 68d008f..e59dec4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
@@ -875,7 +875,7 @@ public class TestEditLog {
       tracker = new FSEditLogLoader.PositionTrackingInputStream(in);
       in = new DataInputStream(tracker);
             
-      reader = new FSEditLogOp.Reader(in, tracker, version);
+      reader = FSEditLogOp.Reader.create(in, tracker, version);
     }
   
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24f6a7c9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
index c0eb890..aecdc78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
@@ -25,19 +25,35 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
 import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.EnumMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.util.Holder;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.PathUtils;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 public class TestEditLogFileInputStream {
+  private static final Log LOG =
+      LogFactory.getLog(TestEditLogFileInputStream.class);
   private static final byte[] FAKE_LOG_DATA = TestEditLog.HADOOP20_SOME_EDITS;
 
+  private final static File TEST_DIR = PathUtils
+      .getTestDir(TestEditLogFileInputStream.class);
+
   @Test
   public void testReadURL() throws Exception {
     HttpURLConnection conn = mock(HttpURLConnection.class);
@@ -63,4 +79,68 @@ public class TestEditLogFileInputStream {
     assertEquals(FAKE_LOG_DATA.length, elis.length());
     elis.close();
   }
+
+  /**
+   * Regression test for HDFS-8965 which verifies that
+   * FSEditLogFileInputStream#scanOp verifies Op checksums.
+   */
+  @Test(timeout=60000)
+  public void testScanCorruptEditLog() throws Exception {
+    Configuration conf = new Configuration();
+    File editLog = new File(System.getProperty(
+        "test.build.data", "/tmp"), "testCorruptEditLog");
+
+    LOG.debug("Creating test edit log file: " + editLog);
+    EditLogFileOutputStream elos = new EditLogFileOutputStream(conf,
+        editLog.getAbsoluteFile(), 8192);
+    elos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    FSEditLogOp.OpInstanceCache cache = new FSEditLogOp.OpInstanceCache();
+    FSEditLogOp.MkdirOp mkdirOp = FSEditLogOp.MkdirOp.getInstance(cache);
+    mkdirOp.reset();
+    mkdirOp.setRpcCallId(123);
+    mkdirOp.setTransactionId(1);
+    mkdirOp.setInodeId(789L);
+    mkdirOp.setPath("/mydir");
+    PermissionStatus perms = PermissionStatus.createImmutable(
+        "myuser", "mygroup", FsPermission.createImmutable((short)0777));
+    mkdirOp.setPermissionStatus(perms);
+    elos.write(mkdirOp);
+    mkdirOp.reset();
+    mkdirOp.setRpcCallId(456);
+    mkdirOp.setTransactionId(2);
+    mkdirOp.setInodeId(123L);
+    mkdirOp.setPath("/mydir2");
+    perms = PermissionStatus.createImmutable(
+        "myuser", "mygroup", FsPermission.createImmutable((short)0666));
+    mkdirOp.setPermissionStatus(perms);
+    elos.write(mkdirOp);
+    elos.setReadyToFlush();
+    elos.flushAndSync(false);
+    elos.close();
+    long fileLen = editLog.length();
+
+    LOG.debug("Corrupting last 4 bytes of edit log file " + editLog +
+        ", whose length is " + fileLen);
+    RandomAccessFile rwf = new RandomAccessFile(editLog, "rw");
+    rwf.seek(fileLen - 4);
+    int b = rwf.readInt();
+    rwf.seek(fileLen - 4);
+    rwf.writeInt(b + 1);
+    rwf.close();
+
+    EditLogFileInputStream elis = new EditLogFileInputStream(editLog);
+    Assert.assertEquals(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION,
+        elis.getVersion(true));
+    Assert.assertEquals(1, elis.scanNextOp());
+    LOG.debug("Read transaction 1 from " + editLog);
+    try {
+      elis.scanNextOp();
+      Assert.fail("Expected scanNextOp to fail when op checksum was corrupt.");
+    } catch (IOException e) {
+      LOG.debug("Caught expected checksum error when reading corrupt " +
+          "transaction 2", e);
+      GenericTestUtils.assertExceptionContains("Transaction is corrupt.", e);
+    }
+    elis.close();
+  }
 }