You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/06/21 18:46:44 UTC
svn commit: r1138085 - in /hadoop/common/branches/HDFS-1073/hdfs: ./
src/java/org/apache/hadoop/hdfs/server/namenode/
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Author: todd
Date: Tue Jun 21 16:46:43 2011
New Revision: 1138085
URL: http://svn.apache.org/viewvc?rev=1138085&view=rev
Log:
HDFS-2074. Determine edit log validity by truly reading and validating transactions. Contributed by Todd Lipcon.
Modified:
hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java
Modified: hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt Tue Jun 21 16:46:43 2011
@@ -50,3 +50,5 @@ HDFS-2048. Add upgrade tests and fix upg
(todd)
HDFS-2027. Image inspector should return finalized logs before unfinalized
logs. (todd)
+HDFS-2074. Determine edit log validity by truly reading and validating
+ transactions. (todd)
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Tue Jun 21 16:46:43 2011
@@ -34,6 +34,8 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import static org.apache.hadoop.hdfs.server.common.Util.now;
+
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogHeader;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -356,13 +358,10 @@ public class BackupImage extends FSImage
BufferedInputStream bin = new BufferedInputStream(edits);
DataInputStream in = new DataInputStream(bin);
FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
- int logVersion = logLoader.readLogVersion(in);
- Checksum checksum = null;
- if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
- checksum = FSEditLog.getChecksum();
- in = new DataInputStream(new CheckedInputStream(bin, checksum));
- }
- int loaded = logLoader.loadEditRecords(logVersion, in, checksum, false,
+
+ LogHeader header = FSEditLogOp.LogHeader.read(in);
+ int loaded = logLoader.loadEditRecords(
+ header.logVersion, in, header.checksum, false,
lastAppliedTxId + 1);
lastAppliedTxId += loaded;
@@ -370,8 +369,9 @@ public class BackupImage extends FSImage
// first time reached the end of spool
jsState = JSpoolState.WAIT;
- loaded = logLoader.loadEditRecords(logVersion, in, checksum,
- true, lastAppliedTxId + 1);
+ loaded = logLoader.loadEditRecords(
+ header.logVersion, in, header.checksum,
+ true, lastAppliedTxId + 1);
numEdits += loaded;
lastAppliedTxId += loaded;
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Tue Jun 21 16:46:43 2011
@@ -22,9 +22,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import org.apache.hadoop.io.IOUtils;
/**
* An implementation of the abstract class {@link EditLogInputStream}, which
@@ -74,50 +72,5 @@ class EditLogFileInputStream extends Edi
// file size + size of both buffers
return file.length();
}
-
- /**
- * Return the length of non-zero bytes in the given file.
- * @param chunkSizeToRead chunk size for disk reads
- * @throws IOException if the file cannot be read
- */
- static long getValidLength(File f) throws IOException {
- return getValidLength(f, 1024*1024); // 1M chunks
- }
-
- /**
- * Return the length of bytes in the given file after subtracting
- * the trailer of 0xFF (OP_INVALID)s.
- * This seeks to the end of the file and reads chunks backwards until
- * it finds a non-0xFF byte.
- * @param chunkSizeToRead chunk size for disk reads
- * @throws IOException if the file cannot be read
- */
- static long getValidLength(File f, int chunkSizeToRead) throws IOException {
- FileInputStream fis = new FileInputStream(f);
- try {
-
- byte buf[] = new byte[chunkSizeToRead];
-
- FileChannel fc = fis.getChannel();
- long size = fc.size();
- long pos = size - (size % chunkSizeToRead);
-
- while (pos >= 0) {
- fc.position(pos);
-
- int readLen = (int) Math.min(size - pos, chunkSizeToRead);
- IOUtils.readFully(fis, buf, 0, readLen);
- for (int i = readLen - 1; i >= 0; i--) {
- if (buf[i] != FSEditLogOpCodes.OP_INVALID.getOpCode()) {
- return pos + i + 1; // + 1 since we count this byte!
- }
- }
-
- pos -= chunkSizeToRead;
- }
- return 0;
- } finally {
- fis.close();
- }
- }
+
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Tue Jun 21 16:46:43 2011
@@ -19,7 +19,8 @@ package org.apache.hadoop.hdfs.server.na
import java.io.BufferedInputStream;
import java.io.DataInputStream;
-import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -34,6 +35,8 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogHeader;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Reader;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
@@ -59,38 +62,6 @@ public class FSEditLogLoader {
return numEdits;
}
- /**
- * Read the header of fsedit log
- * @param in fsedit stream
- * @return the edit log version number
- * @throws IOException if error occurs
- */
- int readLogVersion(DataInputStream in) throws IOException {
- int logVersion = 0;
- // Read log file version. Could be missing.
- in.mark(4);
- // If edits log is greater than 2G, available method will return negative
- // numbers, so we avoid having to call available
- boolean available = true;
- try {
- logVersion = in.readByte();
- } catch (EOFException e) {
- available = false;
- }
- if (available) {
- in.reset();
- logVersion = in.readInt();
- if (logVersion < FSConstants.LAYOUT_VERSION) // future version
- throw new IOException(
- "Unexpected version of the file system log file: "
- + logVersion + ". Current version = "
- + FSConstants.LAYOUT_VERSION + ".");
- }
- assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
- "Unsupported version " + logVersion;
- return logVersion;
- }
-
int loadFSEdits(EditLogInputStream edits, boolean closeOnExit,
long expectedStartingTxId)
throws IOException {
@@ -98,17 +69,11 @@ public class FSEditLogLoader {
DataInputStream in = new DataInputStream(bin);
int numEdits = 0;
- int logVersion = 0;
try {
- logVersion = readLogVersion(in);
- Checksum checksum = null;
- if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
- checksum = FSEditLog.getChecksum();
- in = new DataInputStream(new CheckedInputStream(bin, checksum));
- }
-
- numEdits = loadEditRecords(logVersion, in, checksum, false,
+ LogHeader header = LogHeader.read(in);
+ numEdits = loadEditRecords(
+ header.logVersion, in, header.checksum, false,
expectedStartingTxId);
} finally {
if(closeOnExit)
@@ -488,6 +453,61 @@ public class FSEditLogLoader {
}
/**
+ * Return the number of valid transactions in the file. If the file is
+ * truncated during the header, returns a value indicating that there are
+ * 0 valid transactions.
+ * @throws IOException if the file cannot be read due to an IO error (eg
+ * if the log does not exist)
+ */
+ static EditLogValidation validateEditLog(File f) throws IOException {
+ FileInputStream fis = new FileInputStream(f);
+ try {
+ PositionTrackingInputStream tracker = new PositionTrackingInputStream(
+ new BufferedInputStream(fis));
+ DataInputStream dis = new DataInputStream(tracker);
+ LogHeader header;
+ try {
+ header = LogHeader.read(dis);
+ } catch (Throwable t) {
+ FSImage.LOG.debug("Unable to read header from " + f +
+ " -> no valid transactions in this file.");
+ return new EditLogValidation(0, 0);
+ }
+
+ Reader reader = new FSEditLogOp.Reader(dis, header.logVersion, header.checksum);
+ long numValid = 0;
+ long lastPos = 0;
+ try {
+ while (true) {
+ lastPos = tracker.getPos();
+ if (reader.readOp() == null) {
+ break;
+ }
+ numValid++;
+ }
+ } catch (Throwable t) {
+ // Catch Throwable and not just IOE, since bad edits may generate
+ // NumberFormatExceptions, AssertionErrors, OutOfMemoryErrors, etc.
+ FSImage.LOG.debug("Caught exception after reading " + numValid +
+ " ops from " + f + " while determining its valid length.", t);
+ }
+ return new EditLogValidation(lastPos, numValid);
+ } finally {
+ fis.close();
+ }
+ }
+
+ static class EditLogValidation {
+ long validLength;
+ long numTransactions;
+
+ EditLogValidation(long validLength, long numTransactions) {
+ this.validLength = validLength;
+ this.numTransactions = numTransactions;
+ }
+ }
+
+ /**
* Stream wrapper that keeps track of the current file position.
*/
private static class PositionTrackingInputStream extends FilterInputStream {
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Tue Jun 21 16:46:43 2011
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import java.util.EnumMap;
@@ -29,9 +30,12 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.Storage;
+
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.io.BytesWritable;
@@ -650,6 +654,62 @@ public abstract class FSEditLogOp {
return longWritable.get();
}
}
+
+ /**
+ * Class to encapsulate the header at the top of a log file.
+ */
+ static class LogHeader {
+ final int logVersion;
+ final Checksum checksum;
+
+ public LogHeader(int logVersion, Checksum checksum) {
+ this.logVersion = logVersion;
+ this.checksum = checksum;
+ }
+
+ static LogHeader read(DataInputStream in) throws IOException {
+ int logVersion = 0;
+
+ logVersion = FSEditLogOp.LogHeader.readLogVersion(in);
+ Checksum checksum = null;
+ if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
+ checksum = FSEditLog.getChecksum();
+ }
+ return new LogHeader(logVersion, checksum);
+ }
+
+ /**
+ * Read the header of fsedit log
+ * @param in fsedit stream
+ * @return the edit log version number
+ * @throws IOException if error occurs
+ */
+ private static int readLogVersion(DataInputStream in) throws IOException {
+ int logVersion = 0;
+ // Read log file version. Could be missing.
+ in.mark(4);
+ // If edits log is greater than 2G, available method will return negative
+ // numbers, so we avoid having to call available
+ boolean available = true;
+ try {
+ logVersion = in.readByte();
+ } catch (EOFException e) {
+ available = false;
+ }
+ if (available) {
+ in.reset();
+ logVersion = in.readInt();
+ if (logVersion < FSConstants.LAYOUT_VERSION) // future version
+ throw new IOException(
+ "Unexpected version of the file system log file: "
+ + logVersion + ". Current version = "
+ + FSConstants.LAYOUT_VERSION + ".");
+ }
+ assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
+ "Unsupported version " + logVersion;
+ return logVersion;
+ }
+ }
/**
* Class for reading editlog ops from a stream
@@ -668,7 +728,12 @@ public abstract class FSEditLogOp {
@SuppressWarnings("deprecation")
public Reader(DataInputStream in, int logVersion,
Checksum checksum) {
- this.in = in;
+ if (checksum != null) {
+ this.in = new DataInputStream(
+ new CheckedInputStream(in, checksum));
+ } else {
+ this.in = in;
+ }
this.logVersion = logVersion;
this.checksum = checksum;
opInstances = new EnumMap<FSEditLogOpCodes, FSEditLogOp>(
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java Tue Jun 21 16:46:43 2011
@@ -36,6 +36,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
@@ -398,17 +399,19 @@ class FSImageTransactionalStorageInspect
return;
}
- long maxValidLength = Long.MIN_VALUE;
+ long maxValidTxnCount = Long.MIN_VALUE;
for (FoundEditLog log : logs) {
- long validLength = log.getValidLength();
- LOG.warn(" Log " + log.getFile() + " valid length=" + validLength);
- maxValidLength = Math.max(maxValidLength, validLength);
+ long validTxnCount = log.validateLog().numTransactions;
+ LOG.warn(" Log " + log.getFile() + " valid txns=" + validTxnCount);
+ maxValidTxnCount = Math.max(maxValidTxnCount, validTxnCount);
}
for (FoundEditLog log : logs) {
- if (log.getValidLength() < maxValidLength) {
+ long txns = log.validateLog().numTransactions;
+ if (txns < maxValidTxnCount) {
LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
- "it is shorter than " + maxValidLength + " bytes");
+ "it is has only " + txns + " valid txns whereas another " +
+ "log has " + maxValidTxnCount);
log.markCorrupt();
}
}
@@ -475,7 +478,7 @@ class FSImageTransactionalStorageInspect
final long startTxId;
final long lastTxId;
- private long cachedValidLength = -1;
+ private EditLogValidation cachedValidation = null;
private boolean isCorrupt = false;
static final long UNKNOWN_END = -1;
@@ -500,11 +503,11 @@ class FSImageTransactionalStorageInspect
return lastTxId;
}
- long getValidLength() throws IOException {
- if (cachedValidLength == -1) {
- cachedValidLength = EditLogFileInputStream.getValidLength(file);
+ EditLogValidation validateLog() throws IOException {
+ if (cachedValidation == null) {
+ cachedValidation = FSEditLogLoader.validateEditLog(file);
}
- return cachedValidLength;
+ return cachedValidation;
}
boolean isInProgress() {
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java Tue Jun 21 16:46:43 2011
@@ -47,6 +47,7 @@ import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.io.Files;
import static org.junit.Assert.*;
import static org.mockito.Mockito.doReturn;
@@ -95,6 +96,23 @@ public abstract class FSImageTestUtil {
return inspector;
}
+
+ /**
+ * Return a standalone instance of FSEditLog that will log into the given
+ * log directory. The returned instance is not yet opened.
+ */
+ public static FSEditLog createStandaloneEditLog(File logDir)
+ throws IOException {
+ assertTrue(logDir.mkdirs() || logDir.exists());
+ Files.deleteDirectoryContents(logDir);
+ NNStorage storage = Mockito.mock(NNStorage.class);
+ List<StorageDirectory> sds = Lists.newArrayList(
+ FSImageTestUtil.mockStorageDirectory(logDir, NameNodeDirType.EDITS));
+ Mockito.doReturn(sds).when(storage).dirIterable(NameNodeDirType.EDITS);
+
+ return new FSEditLog(storage);
+ }
+
/**
* Assert that all of the given directories have the same newest filename
* for fsimage that they hold the same data.
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Tue Jun 21 16:46:43 2011
@@ -605,54 +605,6 @@ public class TestEditLog extends TestCas
}
}
-
- public void testGetValidLength() throws Exception {
- assertTrue(TEST_DIR.mkdirs() || TEST_DIR.exists());
-
- // For each of a combination of valid bytes followed by invalid,
- // make sure we determine the proper non-trailer size of the file.
- // Sizes designed to find off-by-one errors around 1024 byte chunk size.
- final int VALID_SIZES[] = new int[] {0, 1, 789, 1023, 1024, 1025};
- final int TRAILER_SIZES[] = new int[] {0, 1, 1024-789-1, 1024-789,
- 1024-789+1, 1023, 1024, 1025};
-
- for (int validPart : VALID_SIZES) {
- for (int trailerPart : TRAILER_SIZES) {
- doValidLengthTest(validPart, trailerPart);
- doValidLengthTest(validPart*3, trailerPart*3);
- }
- }
- }
-
- private void doValidLengthTest(int validPart, int trailerPart) throws Exception {
- File file = new File(TEST_DIR, "validLengthTest");
- FileOutputStream fos = new FileOutputStream(file);
- try {
- try {
- byte[] valid = new byte[validPart];
- for (int i = 0; i < validPart; i++) {
- valid[i] = (byte)(i & 0xFF);
- }
- // The valid data shouldn't end in a trailer byte, or else we'd think it was one shorter
- // than actual length
- if (validPart > 0) valid[validPart - 1] = 1;
- fos.write(valid);
-
- byte[] trailer = new byte[trailerPart];
- Arrays.fill(trailer, TRAILER_BYTE);
- fos.write(trailer);
- } finally {
- fos.close();
- }
-
- long computedValid = EditLogFileInputStream.getValidLength(file, 1024);
- assertEquals("Testing valid=" + validPart + " trailer=" + trailerPart,
- validPart, computedValid);
- } finally {
- file.delete();
- }
- }
-
private static class EditLogByteInputStream extends EditLogInputStream {
private InputStream input;
private long len;
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java Tue Jun 21 16:46:43 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Test;
@@ -48,8 +49,10 @@ public class TestEditLogFileOutputStream
.getStorage().getStorageDir(0);
File editLog = NNStorage.getInProgressEditsFile(sd, 1);
+ EditLogValidation validation = FSEditLogLoader.validateEditLog(editLog);
assertEquals("Edit log should contain a header as valid length",
- HEADER_LEN, EditLogFileInputStream.getValidLength(editLog));
+ HEADER_LEN, validation.validLength);
+ assertEquals(1, validation.numTransactions);
assertEquals("Edit log should have 1MB of bytes allocated",
1024*1024, editLog.length());
@@ -57,9 +60,11 @@ public class TestEditLogFileOutputStream
cluster.getFileSystem().mkdirs(new Path("/tmp"),
new FsPermission((short)777));
+ validation = FSEditLogLoader.validateEditLog(editLog);
assertEquals("Edit log should have more valid data after writing a txn",
MKDIR_LEN + HEADER_LEN,
- EditLogFileInputStream.getValidLength(editLog));
+ validation.validLength);
+ assertEquals(2, validation.numTransactions);
assertEquals("Edit log should be 1MB long",
1024 * 1024, editLog.length());
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java Tue Jun 21 16:46:43 2011
@@ -20,9 +20,14 @@ package org.apache.hadoop.hdfs.server.na
import static org.junit.Assert.*;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.Map;
+import java.util.SortedMap;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
@@ -32,12 +37,24 @@ import org.apache.hadoop.hdfs.DFSTestUti
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Level;
import org.junit.Test;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+
public class TestFSEditLogLoader {
+ static {
+ ((Log4JLogger)FSImage.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ private static final File TEST_DIR = new File(
+ System.getProperty("test.build.data","build/test/data"));
+
private static final int NUM_DATA_NODES = 0;
@Test
@@ -128,4 +145,168 @@ public class TestFSEditLogLoader {
}
}
}
+
+ /**
+ * Test that the valid number of transactions can be counted from a file.
+ * @throws IOException
+ */
+ @Test
+ public void testCountValidTransactions() throws IOException {
+ File testDir = new File(TEST_DIR, "testCountValidTransactions");
+ File logFile = new File(testDir,
+ NNStorage.getInProgressEditsFileName(1));
+
+ // Create a log file, and return the offsets at which each
+ // transaction starts.
+ FSEditLog fsel = null;
+ final int NUM_TXNS = 30;
+ SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap();
+ try {
+ fsel = FSImageTestUtil.createStandaloneEditLog(testDir);
+ fsel.open();
+ assertTrue("should exist: " + logFile, logFile.exists());
+
+ for (int i = 0; i < NUM_TXNS; i++) {
+ long trueOffset = getNonTrailerLength(logFile);
+ long thisTxId = fsel.getLastWrittenTxId() + 1;
+ offsetToTxId.put(trueOffset, thisTxId);
+ System.err.println("txid " + thisTxId + " at offset " + trueOffset);
+ fsel.logDelete("path" + i, i);
+ fsel.logSync();
+ }
+ } finally {
+ if (fsel != null) {
+ fsel.close();
+ }
+ }
+
+ // The file got renamed when the log was closed.
+ logFile = testDir.listFiles()[0];
+ long validLength = getNonTrailerLength(logFile);
+
+ // Make sure that uncorrupted log has the expected length and number
+ // of transactions.
+ EditLogValidation validation = FSEditLogLoader.validateEditLog(logFile);
+ assertEquals(NUM_TXNS + 2, validation.numTransactions);
+ assertEquals(validLength, validation.validLength);
+
+ // Back up the uncorrupted log
+ File logFileBak = new File(testDir, logFile.getName() + ".bak");
+ Files.copy(logFile, logFileBak);
+
+ // Corrupt the log file in various ways for each txn
+ for (Map.Entry<Long, Long> entry : offsetToTxId.entrySet()) {
+ long txOffset = entry.getKey();
+ long txid = entry.getValue();
+
+ // Restore backup, truncate the file exactly before the txn
+ Files.copy(logFileBak, logFile);
+ truncateFile(logFile, txOffset);
+ validation = FSEditLogLoader.validateEditLog(logFile);
+ assertEquals("Failed when truncating to length " + txOffset,
+ txid - 1, validation.numTransactions);
+ assertEquals(txOffset, validation.validLength);
+
+ // Restore backup, truncate the file with one byte in the txn,
+ // also isn't valid
+ Files.copy(logFileBak, logFile);
+ truncateFile(logFile, txOffset + 1);
+ validation = FSEditLogLoader.validateEditLog(logFile);
+ assertEquals("Failed when truncating to length " + (txOffset + 1),
+ txid - 1, validation.numTransactions);
+ assertEquals(txOffset, validation.validLength);
+
+ // Restore backup, corrupt the txn opcode
+ Files.copy(logFileBak, logFile);
+ corruptByteInFile(logFile, txOffset);
+ validation = FSEditLogLoader.validateEditLog(logFile);
+ assertEquals("Failed when corrupting txn opcode at " + txOffset,
+ txid - 1, validation.numTransactions);
+ assertEquals(txOffset, validation.validLength);
+
+ // Restore backup, corrupt a byte a few bytes into the txn
+ Files.copy(logFileBak, logFile);
+ corruptByteInFile(logFile, txOffset+5);
+ validation = FSEditLogLoader.validateEditLog(logFile);
+ assertEquals("Failed when corrupting txn data at " + (txOffset+5),
+ txid - 1, validation.numTransactions);
+ assertEquals(txOffset, validation.validLength);
+ }
+
+ // Corrupt the log at every offset to make sure that validation itself
+ // never throws an exception, and that the calculated lengths are monotonically
+ // increasing
+ long prevNumValid = 0;
+ for (long offset = 0; offset < validLength; offset++) {
+ Files.copy(logFileBak, logFile);
+ corruptByteInFile(logFile, offset);
+ EditLogValidation val = FSEditLogLoader.validateEditLog(logFile);
+ assertTrue(val.numTransactions >= prevNumValid);
+ prevNumValid = val.numTransactions;
+ }
+ }
+
+ /**
+ * Corrupt the byte at the given offset in the given file,
+ * by subtracting 1 from it.
+ */
+ private void corruptByteInFile(File file, long offset)
+ throws IOException {
+ RandomAccessFile raf = new RandomAccessFile(file, "rw");
+ try {
+ raf.seek(offset);
+ int origByte = raf.read();
+ raf.seek(offset);
+ raf.writeByte(origByte - 1);
+ } finally {
+ IOUtils.closeStream(raf);
+ }
+ }
+
+ /**
+ * Truncate the given file to the given length
+ */
+ private void truncateFile(File logFile, long newLength)
+ throws IOException {
+ RandomAccessFile raf = new RandomAccessFile(logFile, "rw");
+ raf.setLength(newLength);
+ raf.close();
+ }
+
+ /**
+ * Return the length of bytes in the given file after subtracting
+ * the trailer of 0xFF (OP_INVALID)s.
+ * This seeks to the end of the file and reads chunks backwards until
+ * it finds a non-0xFF byte.
+ * @throws IOException if the file cannot be read
+ */
+ private static long getNonTrailerLength(File f) throws IOException {
+ final int chunkSizeToRead = 256*1024;
+ FileInputStream fis = new FileInputStream(f);
+ try {
+
+ byte buf[] = new byte[chunkSizeToRead];
+
+ FileChannel fc = fis.getChannel();
+ long size = fc.size();
+ long pos = size - (size % chunkSizeToRead);
+
+ while (pos >= 0) {
+ fc.position(pos);
+
+ int readLen = (int) Math.min(size - pos, chunkSizeToRead);
+ IOUtils.readFully(fis, buf, 0, readLen);
+ for (int i = readLen - 1; i >= 0; i--) {
+ if (buf[i] != FSEditLogOpCodes.OP_INVALID.getOpCode()) {
+ return pos + i + 1; // + 1 since we count this byte!
+ }
+ }
+
+ pos -= chunkSizeToRead;
+ }
+ return 0;
+ } finally {
+ fis.close();
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java Tue Jun 21 16:46:43 2011
@@ -249,14 +249,15 @@ public class TestFSImageStorageInspector
LogGroup lg = inspector.logGroups.get(123L);
assertEquals(3, lg.logs.size());
- // Inject spies to return the lengths we would like to see
- long validLengths[] = new long[] { 2000, 2000, 1000 };
+ // Inject spies to return the valid counts we would like to see
+ long validTxnCounts[] = new long[] { 2000, 2000, 1000 };
for (int i = 0; i < 3; i++) {
FoundEditLog inProgressLog = lg.logs.get(i);
assertTrue(inProgressLog.isInProgress());
inProgressLog = spy(inProgressLog);
- doReturn(validLengths[i]).when(inProgressLog).getValidLength();
+ doReturn(new FSEditLogLoader.EditLogValidation(-1, validTxnCounts[i]))
+ .when(inProgressLog).validateLog();
lg.logs.set(i, inProgressLog);
}