You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/02 07:58:51 UTC
[39/50] [abbrv] hadoop git commit: HDFS-8965. Harden edit log reading
code against out of memory errors (cmccabe)
HDFS-8965. Harden edit log reading code against out of memory errors (cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/24f6a7c9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/24f6a7c9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/24f6a7c9
Branch: refs/heads/HDFS-7285
Commit: 24f6a7c9563757234f53ca23e12f9c9208b53082
Parents: 8fa41d9
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Mon Aug 31 17:31:29 2015 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Mon Aug 31 18:06:30 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../bkjournal/BookKeeperEditLogInputStream.java | 2 +-
.../hadoop/hdfs/protocol/LayoutVersion.java | 2 +-
.../namenode/EditLogBackupInputStream.java | 2 +-
.../server/namenode/EditLogFileInputStream.java | 2 +-
.../hdfs/server/namenode/FSEditLogOp.java | 354 +++++++++++++------
.../hdfs/server/namenode/TestEditLog.java | 2 +-
.../namenode/TestEditLogFileInputStream.java | 80 +++++
8 files changed, 341 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/24f6a7c9/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 6584c84..57ddcb2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -872,6 +872,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8946. Improve choosing datanode storage for block placement. (yliu)
+ HDFS-8965. Harden edit log reading code against out of memory errors (cmccabe)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/24f6a7c9/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
index e2098dd..86da807 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
@@ -83,7 +83,7 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
DataInputStream in = new DataInputStream(tracker);
- reader = new FSEditLogOp.Reader(in, tracker, logVersion);
+ reader = FSEditLogOp.Reader.create(in, tracker, logVersion);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/24f6a7c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
index c893744..1750790 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
@@ -87,7 +87,7 @@ public class LayoutVersion {
FSIMAGE_COMPRESSION(-25, "Support for fsimage compression"),
FSIMAGE_CHECKSUM(-26, "Support checksum for fsimage"),
REMOVE_REL13_DISK_LAYOUT_SUPPORT(-27, "Remove support for 0.13 disk layout"),
- EDITS_CHESKUM(-28, "Support checksum for editlog"),
+ EDITS_CHECKSUM(-28, "Support checksum for editlog"),
UNUSED(-29, "Skipped version"),
FSIMAGE_NAME_OPTIMIZATION(-30, "Store only last part of path in fsimage"),
RESERVED_REL20_203(-31, -19, "Reserved for release 0.20.203", true,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/24f6a7c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
index 689cacc..81d285a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
@@ -119,7 +119,7 @@ class EditLogBackupInputStream extends EditLogInputStream {
this.version = version;
- reader = new FSEditLogOp.Reader(in, tracker, version);
+ reader = FSEditLogOp.Reader.create(in, tracker, version);
}
void clear() throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/24f6a7c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
index 3e21c24..73a162e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
@@ -157,7 +157,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
"flags from log");
}
}
- reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion);
+ reader = FSEditLogOp.Reader.create(dataIn, tracker, logVersion);
reader.setMaxOpSize(maxOpSize);
state = State.OPEN;
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/24f6a7c9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index ab36f17..125e1cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -4518,42 +4518,46 @@ public abstract class FSEditLogOp {
/**
* Class for reading editlog ops from a stream
*/
- public static class Reader {
- private final DataInputStream in;
- private final StreamLimiter limiter;
- private final int logVersion;
- private final Checksum checksum;
- private final OpInstanceCache cache;
- private int maxOpSize;
- private final boolean supportEditLogLength;
+ public abstract static class Reader {
+ final DataInputStream in;
+ final StreamLimiter limiter;
+ final OpInstanceCache cache;
+ final byte[] temp = new byte[4096];
+ final int logVersion;
+ int maxOpSize;
+
+ public static Reader create(DataInputStream in, StreamLimiter limiter,
+ int logVersion) {
+ if (logVersion < NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION) {
+ // Use the LengthPrefixedReader on edit logs which are newer than what
+ // we can parse. (Newer layout versions are represented by smaller
+ // negative integers, for historical reasons.) Even though we can't
+ // parse the Ops contained in them, we should still be able to call
+ // scanOp on them. This is important for the JournalNode during rolling
+ // upgrade.
+ return new LengthPrefixedReader(in, limiter, logVersion);
+ } else if (NameNodeLayoutVersion.supports(
+ NameNodeLayoutVersion.Feature.EDITLOG_LENGTH, logVersion)) {
+ return new LengthPrefixedReader(in, limiter, logVersion);
+ } else if (NameNodeLayoutVersion.supports(
+ LayoutVersion.Feature.EDITS_CHECKSUM, logVersion)) {
+ Checksum checksum = DataChecksum.newCrc32();
+ return new ChecksummedReader(checksum, in, limiter, logVersion);
+ } else {
+ return new LegacyReader(in, limiter, logVersion);
+ }
+ }
/**
* Construct the reader
- * @param in The stream to read from.
- * @param logVersion The version of the data coming from the stream.
+ * @param in The stream to read from.
+ * @param limiter The limiter for this stream.
+ * @param logVersion The version of the data coming from the stream.
*/
- public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
- this.logVersion = logVersion;
- if (NameNodeLayoutVersion.supports(
- LayoutVersion.Feature.EDITS_CHESKUM, logVersion)) {
- this.checksum = DataChecksum.newCrc32();
- } else {
- this.checksum = null;
- }
- // It is possible that the logVersion is actually a future layoutversion
- // during the rolling upgrade (e.g., the NN gets upgraded first). We
- // assume future layout will also support length of editlog op.
- this.supportEditLogLength = NameNodeLayoutVersion.supports(
- NameNodeLayoutVersion.Feature.EDITLOG_LENGTH, logVersion)
- || logVersion < NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION;
-
- if (this.checksum != null) {
- this.in = new DataInputStream(
- new CheckedInputStream(in, this.checksum));
- } else {
- this.in = in;
- }
+ Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
+ this.in = in;
this.limiter = limiter;
+ this.logVersion = logVersion;
this.cache = new OpInstanceCache();
this.maxOpSize = DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
}
@@ -4606,26 +4610,25 @@ public abstract class FSEditLogOp {
}
}
- private void verifyTerminator() throws IOException {
+ void verifyTerminator() throws IOException {
/** The end of the edit log should contain only 0x00 or 0xff bytes.
* If it contains other bytes, the log itself may be corrupt.
* It is important to check this; if we don't, a stray OP_INVALID byte
* could make us stop reading the edit log halfway through, and we'd never
* know that we had lost data.
*/
- byte[] buf = new byte[4096];
limiter.clearLimit();
int numRead = -1, idx = 0;
while (true) {
try {
numRead = -1;
idx = 0;
- numRead = in.read(buf);
+ numRead = in.read(temp);
if (numRead == -1) {
return;
}
while (idx < numRead) {
- if ((buf[idx] != (byte)0) && (buf[idx] != (byte)-1)) {
+ if ((temp[idx] != (byte)0) && (temp[idx] != (byte)-1)) {
throw new IOException("Read extra bytes after " +
"the terminator!");
}
@@ -4638,7 +4641,7 @@ public abstract class FSEditLogOp {
if (numRead != -1) {
in.reset();
IOUtils.skipFully(in, idx);
- in.mark(buf.length + 1);
+ in.mark(temp.length + 1);
IOUtils.skipFully(in, 1);
}
}
@@ -4653,14 +4656,164 @@ public abstract class FSEditLogOp {
* If an exception is thrown, the stream's mark will be set to the first
* problematic byte. This usually means the beginning of the opcode.
*/
- private FSEditLogOp decodeOp() throws IOException {
- limiter.setLimit(maxOpSize);
+ public abstract FSEditLogOp decodeOp() throws IOException;
+
+ /**
+ * Similar to decodeOp(), but we only retrieve the transaction ID of the
+ * Op rather than reading it. If the edit log format supports length
+ * prefixing, this can be much faster than full decoding.
+ *
+ * @return the last txid of the segment, or INVALID_TXID on EOF.
+ */
+ public abstract long scanOp() throws IOException;
+ }
+
+ /**
+ * Reads edit logs which are prefixed with a length. These edit logs also
+ * include a checksum and transaction ID.
+ */
+ private static class LengthPrefixedReader extends Reader {
+ /**
+ * The minimum length of a length-prefixed Op.
+ *
+ * The minimum Op has:
+ * 1-byte opcode
+ * 4-byte length
+ * 8-byte txid
+ * 0-byte body
+ * 4-byte checksum
+ */
+ private static final int MIN_OP_LENGTH = 17;
+
+ /**
+ * The op id length.
+ *
+ * Not included in the stored length.
+ */
+ private static final int OP_ID_LENGTH = 1;
+
+ /**
+ * The checksum length.
+ *
+ * Not included in the stored length.
+ */
+ private static final int CHECKSUM_LENGTH = 4;
+
+ private final Checksum checksum;
+
+ LengthPrefixedReader(DataInputStream in, StreamLimiter limiter,
+ int logVersion) {
+ super(in, limiter, logVersion);
+ this.checksum = DataChecksum.newCrc32();
+ }
+
+ @Override
+ public FSEditLogOp decodeOp() throws IOException {
+ long txid = decodeOpFrame();
+ if (txid == HdfsServerConstants.INVALID_TXID) {
+ return null;
+ }
+ in.reset();
in.mark(maxOpSize);
+ FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(in.readByte());
+ FSEditLogOp op = cache.get(opCode);
+ if (op == null) {
+ throw new IOException("Read invalid opcode " + opCode);
+ }
+ op.setTransactionId(txid);
+ IOUtils.skipFully(in, 4 + 8); // skip length and txid
+ op.readFields(in, logVersion);
+ // skip over the checksum, which we validated above.
+ IOUtils.skipFully(in, CHECKSUM_LENGTH);
+ return op;
+ }
+
+ @Override
+ public long scanOp() throws IOException {
+ return decodeOpFrame();
+ }
- if (checksum != null) {
- checksum.reset();
+ /**
+ * Decode the opcode "frame". This includes reading the opcode and
+ * transaction ID, and validating the checksum and length. It does not
+ * include reading the opcode-specific fields.
+ * The input stream will be advanced to the end of the op at the end of this
+ * function.
+ *
+ * @return An op with the txid set, but none of the other fields
+ * filled in, or null if we hit EOF.
+ */
+ private long decodeOpFrame() throws IOException {
+ limiter.setLimit(maxOpSize);
+ in.mark(maxOpSize);
+ byte opCodeByte;
+ try {
+ opCodeByte = in.readByte();
+ } catch (EOFException eof) {
+ // EOF at an opcode boundary is expected.
+ return HdfsServerConstants.INVALID_TXID;
}
+ if (opCodeByte == FSEditLogOpCodes.OP_INVALID.getOpCode()) {
+ verifyTerminator();
+ return HdfsServerConstants.INVALID_TXID;
+ }
+ // Here, we verify that the Op size makes sense and that the
+ // data matches its checksum before attempting to construct an Op.
+ // This is important because otherwise we may encounter an
+ // OutOfMemoryException which could bring down the NameNode or
+ // JournalNode when reading garbage data.
+ int opLength = in.readInt() + OP_ID_LENGTH + CHECKSUM_LENGTH;
+ if (opLength > maxOpSize) {
+ throw new IOException("Op " + (int)opCodeByte + " has size " +
+ opLength + ", but maxOpSize = " + maxOpSize);
+ } else if (opLength < MIN_OP_LENGTH) {
+ throw new IOException("Op " + (int)opCodeByte + " has size " +
+ opLength + ", but the minimum op size is " + MIN_OP_LENGTH);
+ }
+ long txid = in.readLong();
+ // Verify checksum
+ in.reset();
+ in.mark(maxOpSize);
+ checksum.reset();
+ for (int rem = opLength - CHECKSUM_LENGTH; rem > 0;) {
+ int toRead = Math.min(temp.length, rem);
+ IOUtils.readFully(in, temp, 0, toRead);
+ checksum.update(temp, 0, toRead);
+ rem -= toRead;
+ }
+ int expectedChecksum = in.readInt();
+ int calculatedChecksum = (int)checksum.getValue();
+ if (expectedChecksum != calculatedChecksum) {
+ throw new ChecksumException(
+ "Transaction is corrupt. Calculated checksum is " +
+ calculatedChecksum + " but read checksum " +
+ expectedChecksum, txid);
+ }
+ return txid;
+ }
+ }
+
+ /**
+ * Read edit logs which have a checksum and a transaction ID, but not a
+ * length.
+ */
+ private static class ChecksummedReader extends Reader {
+ private final Checksum checksum;
+ ChecksummedReader(Checksum checksum, DataInputStream in,
+ StreamLimiter limiter, int logVersion) {
+ super(new DataInputStream(
+ new CheckedInputStream(in, checksum)), limiter, logVersion);
+ this.checksum = checksum;
+ }
+
+ @Override
+ public FSEditLogOp decodeOp() throws IOException {
+ limiter.setLimit(maxOpSize);
+ in.mark(maxOpSize);
+ // Reset the checksum. Since we are using a CheckedInputStream, each
+ // subsequent read from the stream will update the checksum.
+ checksum.reset();
byte opCodeByte;
try {
opCodeByte = in.readByte();
@@ -4668,88 +4821,89 @@ public abstract class FSEditLogOp {
// EOF at an opcode boundary is expected.
return null;
}
-
FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
if (opCode == OP_INVALID) {
verifyTerminator();
return null;
}
-
FSEditLogOp op = cache.get(opCode);
if (op == null) {
throw new IOException("Read invalid opcode " + opCode);
}
-
- if (supportEditLogLength) {
- in.readInt();
+ op.setTransactionId(in.readLong());
+ op.readFields(in, logVersion);
+ // Verify checksum
+ int calculatedChecksum = (int)checksum.getValue();
+ int expectedChecksum = in.readInt();
+ if (expectedChecksum != calculatedChecksum) {
+ throw new ChecksumException(
+ "Transaction is corrupt. Calculated checksum is " +
+ calculatedChecksum + " but read checksum " +
+ expectedChecksum, op.txid);
}
+ return op;
+ }
+ @Override
+ public long scanOp() throws IOException {
+ // Edit logs of this age don't have any length prefix, so we just have
+ // to read the entire Op.
+ FSEditLogOp op = decodeOp();
+ return op == null ?
+ HdfsServerConstants.INVALID_TXID : op.getTransactionId();
+ }
+ }
+
+ /**
+ * Read older edit logs which may or may not have transaction IDs and other
+ * features. This code is used during upgrades and to allow HDFS INotify to
+ * read older edit log files.
+ */
+ private static class LegacyReader extends Reader {
+ LegacyReader(DataInputStream in,
+ StreamLimiter limiter, int logVersion) {
+ super(in, limiter, logVersion);
+ }
+
+ @Override
+ public FSEditLogOp decodeOp() throws IOException {
+ limiter.setLimit(maxOpSize);
+ in.mark(maxOpSize);
+ byte opCodeByte;
+ try {
+ opCodeByte = in.readByte();
+ } catch (EOFException eof) {
+ // EOF at an opcode boundary is expected.
+ return null;
+ }
+ FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
+ if (opCode == OP_INVALID) {
+ verifyTerminator();
+ return null;
+ }
+ FSEditLogOp op = cache.get(opCode);
+ if (op == null) {
+ throw new IOException("Read invalid opcode " + opCode);
+ }
if (NameNodeLayoutVersion.supports(
- LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
- // Read the txid
+ LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
op.setTransactionId(in.readLong());
} else {
op.setTransactionId(HdfsServerConstants.INVALID_TXID);
}
-
op.readFields(in, logVersion);
-
- validateChecksum(in, checksum, op.txid);
return op;
}
- /**
- * Similar with decodeOp(), but instead of doing the real decoding, we skip
- * the content of the op if the length of the editlog is supported.
- * @return the last txid of the segment, or INVALID_TXID on exception
- */
+ @Override
public long scanOp() throws IOException {
- if (supportEditLogLength) {
- limiter.setLimit(maxOpSize);
- in.mark(maxOpSize);
-
- final byte opCodeByte;
- try {
- opCodeByte = in.readByte(); // op code
- } catch (EOFException e) {
- return HdfsServerConstants.INVALID_TXID;
- }
-
- FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
- if (opCode == OP_INVALID) {
- verifyTerminator();
- return HdfsServerConstants.INVALID_TXID;
- }
-
- int length = in.readInt(); // read the length of the op
- long txid = in.readLong(); // read the txid
-
- // skip the remaining content
- IOUtils.skipFully(in, length - 8);
- // TODO: do we want to verify checksum for JN? For now we don't.
- return txid;
- } else {
- FSEditLogOp op = decodeOp();
- return op == null ? HdfsServerConstants.INVALID_TXID : op.getTransactionId();
- }
- }
-
- /**
- * Validate a transaction's checksum
- */
- private void validateChecksum(DataInputStream in,
- Checksum checksum,
- long txid)
- throws IOException {
- if (checksum != null) {
- int calculatedChecksum = (int)checksum.getValue();
- int readChecksum = in.readInt(); // read in checksum
- if (readChecksum != calculatedChecksum) {
- throw new ChecksumException(
- "Transaction is corrupt. Calculated checksum is " +
- calculatedChecksum + " but read checksum " + readChecksum, txid);
- }
+ if (!NameNodeLayoutVersion.supports(
+ LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
+ throw new IOException("Can't scan a pre-transactional edit log.");
}
+ FSEditLogOp op = decodeOp();
+ return op == null ?
+ HdfsServerConstants.INVALID_TXID : op.getTransactionId();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/24f6a7c9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
index 68d008f..e59dec4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
@@ -875,7 +875,7 @@ public class TestEditLog {
tracker = new FSEditLogLoader.PositionTrackingInputStream(in);
in = new DataInputStream(tracker);
- reader = new FSEditLogOp.Reader(in, tracker, version);
+ reader = FSEditLogOp.Reader.create(in, tracker, version);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/24f6a7c9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
index c0eb890..aecdc78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
@@ -25,19 +25,35 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.EnumMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.util.Holder;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.PathUtils;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
public class TestEditLogFileInputStream {
+ private static final Log LOG =
+ LogFactory.getLog(TestEditLogFileInputStream.class);
private static final byte[] FAKE_LOG_DATA = TestEditLog.HADOOP20_SOME_EDITS;
+ private final static File TEST_DIR = PathUtils
+ .getTestDir(TestEditLogFileInputStream.class);
+
@Test
public void testReadURL() throws Exception {
HttpURLConnection conn = mock(HttpURLConnection.class);
@@ -63,4 +79,68 @@ public class TestEditLogFileInputStream {
assertEquals(FAKE_LOG_DATA.length, elis.length());
elis.close();
}
+
+ /**
+ * Regression test for HDFS-8965 which verifies that
+ * FSEditLogFileInputStream#scanOp verifies Op checksums.
+ */
+ @Test(timeout=60000)
+ public void testScanCorruptEditLog() throws Exception {
+ Configuration conf = new Configuration();
+ File editLog = new File(System.getProperty(
+ "test.build.data", "/tmp"), "testCorruptEditLog");
+
+ LOG.debug("Creating test edit log file: " + editLog);
+ EditLogFileOutputStream elos = new EditLogFileOutputStream(conf,
+ editLog.getAbsoluteFile(), 8192);
+ elos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+ FSEditLogOp.OpInstanceCache cache = new FSEditLogOp.OpInstanceCache();
+ FSEditLogOp.MkdirOp mkdirOp = FSEditLogOp.MkdirOp.getInstance(cache);
+ mkdirOp.reset();
+ mkdirOp.setRpcCallId(123);
+ mkdirOp.setTransactionId(1);
+ mkdirOp.setInodeId(789L);
+ mkdirOp.setPath("/mydir");
+ PermissionStatus perms = PermissionStatus.createImmutable(
+ "myuser", "mygroup", FsPermission.createImmutable((short)0777));
+ mkdirOp.setPermissionStatus(perms);
+ elos.write(mkdirOp);
+ mkdirOp.reset();
+ mkdirOp.setRpcCallId(456);
+ mkdirOp.setTransactionId(2);
+ mkdirOp.setInodeId(123L);
+ mkdirOp.setPath("/mydir2");
+ perms = PermissionStatus.createImmutable(
+ "myuser", "mygroup", FsPermission.createImmutable((short)0666));
+ mkdirOp.setPermissionStatus(perms);
+ elos.write(mkdirOp);
+ elos.setReadyToFlush();
+ elos.flushAndSync(false);
+ elos.close();
+ long fileLen = editLog.length();
+
+ LOG.debug("Corrupting last 4 bytes of edit log file " + editLog +
+ ", whose length is " + fileLen);
+ RandomAccessFile rwf = new RandomAccessFile(editLog, "rw");
+ rwf.seek(fileLen - 4);
+ int b = rwf.readInt();
+ rwf.seek(fileLen - 4);
+ rwf.writeInt(b + 1);
+ rwf.close();
+
+ EditLogFileInputStream elis = new EditLogFileInputStream(editLog);
+ Assert.assertEquals(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION,
+ elis.getVersion(true));
+ Assert.assertEquals(1, elis.scanNextOp());
+ LOG.debug("Read transaction 1 from " + editLog);
+ try {
+ elis.scanNextOp();
+ Assert.fail("Expected scanNextOp to fail when op checksum was corrupt.");
+ } catch (IOException e) {
+ LOG.debug("Caught expected checksum error when reading corrupt " +
+ "transaction 2", e);
+ GenericTestUtils.assertExceptionContains("Transaction is corrupt.", e);
+ }
+ elis.close();
+ }
}