You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/06/21 18:46:44 UTC

svn commit: r1138085 - in /hadoop/common/branches/HDFS-1073/hdfs: ./ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/

Author: todd
Date: Tue Jun 21 16:46:43 2011
New Revision: 1138085

URL: http://svn.apache.org/viewvc?rev=1138085&view=rev
Log:
HDFS-2074. Determine edit log validity by truly reading and validating transactions. Contributed by Todd Lipcon.

Modified:
    hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java

Modified: hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt Tue Jun 21 16:46:43 2011
@@ -50,3 +50,5 @@ HDFS-2048. Add upgrade tests and fix upg
            (todd)
 HDFS-2027. Image inspector should return finalized logs before unfinalized
            logs. (todd)
+HDFS-2074. Determine edit log validity by truly reading and validating
+           transactions. (todd)

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Tue Jun 21 16:46:43 2011
@@ -34,6 +34,8 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
+
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogHeader;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -356,13 +358,10 @@ public class BackupImage extends FSImage
       BufferedInputStream bin = new BufferedInputStream(edits);
       DataInputStream in = new DataInputStream(bin);
       FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
-      int logVersion = logLoader.readLogVersion(in);
-      Checksum checksum = null;
-      if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
-        checksum = FSEditLog.getChecksum();
-        in = new DataInputStream(new CheckedInputStream(bin, checksum));
-      }
-      int loaded = logLoader.loadEditRecords(logVersion, in, checksum, false,
+
+      LogHeader header = FSEditLogOp.LogHeader.read(in);
+      int loaded = logLoader.loadEditRecords(
+          header.logVersion, in, header.checksum, false,
           lastAppliedTxId + 1);
 
       lastAppliedTxId += loaded;
@@ -370,8 +369,9 @@ public class BackupImage extends FSImage
 
       // first time reached the end of spool
       jsState = JSpoolState.WAIT;
-      loaded = logLoader.loadEditRecords(logVersion, in, checksum,
-                                         true, lastAppliedTxId + 1);
+      loaded = logLoader.loadEditRecords(
+          header.logVersion, in, header.checksum,
+          true, lastAppliedTxId + 1);
       numEdits += loaded;
       lastAppliedTxId += loaded;
 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Tue Jun 21 16:46:43 2011
@@ -22,9 +22,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 
-import org.apache.hadoop.io.IOUtils;
 
 /**
  * An implementation of the abstract class {@link EditLogInputStream}, which
@@ -74,50 +72,5 @@ class EditLogFileInputStream extends Edi
     // file size + size of both buffers
     return file.length();
   }
-  
-  /**
-   * Return the length of non-zero bytes in the given file.
-   * @param chunkSizeToRead chunk size for disk reads
-   * @throws IOException if the file cannot be read
-   */
-  static long getValidLength(File f) throws IOException {
-    return getValidLength(f, 1024*1024); // 1M chunks
-  }
-
-  /**
-   * Return the length of bytes in the given file after subtracting
-   * the trailer of 0xFF (OP_INVALID)s.
-   * This seeks to the end of the file and reads chunks backwards until
-   * it finds a non-0xFF byte.
-   * @param chunkSizeToRead chunk size for disk reads
-   * @throws IOException if the file cannot be read
-   */
-  static long getValidLength(File f, int chunkSizeToRead) throws IOException {    
-    FileInputStream fis = new FileInputStream(f);
-    try {
-      
-      byte buf[] = new byte[chunkSizeToRead];
-
-      FileChannel fc = fis.getChannel();
-      long size = fc.size();
-      long pos = size - (size % chunkSizeToRead);
-      
-      while (pos >= 0) {
-        fc.position(pos);
-
-        int readLen = (int) Math.min(size - pos, chunkSizeToRead);
-        IOUtils.readFully(fis, buf, 0, readLen);
-        for (int i = readLen - 1; i >= 0; i--) {
-          if (buf[i] != FSEditLogOpCodes.OP_INVALID.getOpCode()) {
-            return pos + i + 1; // + 1 since we count this byte!
-          }
-        }
-        
-        pos -= chunkSizeToRead;
-      }
-      return 0;
-    } finally {
-      fis.close();
-    }
-  }
+
 }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Tue Jun 21 16:46:43 2011
@@ -19,7 +19,8 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
-import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -34,6 +35,8 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogHeader;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Reader;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
 
@@ -59,38 +62,6 @@ public class FSEditLogLoader {
     return numEdits;
   }
 
-  /**
-   * Read the header of fsedit log
-   * @param in fsedit stream
-   * @return the edit log version number
-   * @throws IOException if error occurs
-   */
-  int readLogVersion(DataInputStream in) throws IOException {
-    int logVersion = 0;
-    // Read log file version. Could be missing.
-    in.mark(4);
-    // If edits log is greater than 2G, available method will return negative
-    // numbers, so we avoid having to call available
-    boolean available = true;
-    try {
-      logVersion = in.readByte();
-    } catch (EOFException e) {
-      available = false;
-    }
-    if (available) {
-      in.reset();
-      logVersion = in.readInt();
-      if (logVersion < FSConstants.LAYOUT_VERSION) // future version
-        throw new IOException(
-            "Unexpected version of the file system log file: "
-            + logVersion + ". Current version = "
-            + FSConstants.LAYOUT_VERSION + ".");
-    }
-    assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
-      "Unsupported version " + logVersion;
-    return logVersion;
-  }
-  
   int loadFSEdits(EditLogInputStream edits, boolean closeOnExit,
       long expectedStartingTxId)
   throws IOException {
@@ -98,17 +69,11 @@ public class FSEditLogLoader {
     DataInputStream in = new DataInputStream(bin);
 
     int numEdits = 0;
-    int logVersion = 0;
 
     try {
-      logVersion = readLogVersion(in);
-      Checksum checksum = null;
-      if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
-        checksum = FSEditLog.getChecksum();
-        in = new DataInputStream(new CheckedInputStream(bin, checksum));
-      }
-
-      numEdits = loadEditRecords(logVersion, in, checksum, false,
+      LogHeader header = LogHeader.read(in);
+      numEdits = loadEditRecords(
+          header.logVersion, in, header.checksum, false,
           expectedStartingTxId);
     } finally {
       if(closeOnExit)
@@ -488,6 +453,61 @@ public class FSEditLogLoader {
   }
   
   /**
+   * Return the number of valid transactions in the file. If the file is
+   * truncated during the header, returns a value indicating that there are
+   * 0 valid transactions.
+   * @throws IOException if the file cannot be read due to an IO error (eg
+   *                     if the log does not exist)
+   */
+  static EditLogValidation validateEditLog(File f) throws IOException {
+    FileInputStream fis = new FileInputStream(f);
+    try {
+      PositionTrackingInputStream tracker = new PositionTrackingInputStream(
+          new BufferedInputStream(fis));
+      DataInputStream dis = new DataInputStream(tracker);
+      LogHeader header; 
+      try {
+        header = LogHeader.read(dis);
+      } catch (Throwable t) {
+        FSImage.LOG.debug("Unable to read header from " + f +
+            " -> no valid transactions in this file.");
+        return new EditLogValidation(0, 0);
+      }
+      
+      Reader reader = new FSEditLogOp.Reader(dis, header.logVersion, header.checksum);
+      long numValid = 0;
+      long lastPos = 0;
+      try {
+        while (true) {
+          lastPos = tracker.getPos();
+          if (reader.readOp() == null) {
+            break;
+          }
+          numValid++;
+        }
+      } catch (Throwable t) {
+        // Catch Throwable and not just IOE, since bad edits may generate
+        // NumberFormatExceptions, AssertionErrors, OutOfMemoryErrors, etc.
+        FSImage.LOG.debug("Caught exception after reading " + numValid +
+            " ops from " + f + " while determining its valid length.", t);
+      }
+      return new EditLogValidation(lastPos, numValid);
+    } finally {
+      fis.close();
+    }
+  }
+  
+  static class EditLogValidation {
+    long validLength;
+    long numTransactions;
+    
+    EditLogValidation(long validLength, long numTransactions) {
+      this.validLength = validLength;
+      this.numTransactions = numTransactions;
+    }
+  }
+
+  /**
    * Stream wrapper that keeps track of the current file position.
    */
   private static class PositionTrackingInputStream extends FilterInputStream {

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Tue Jun 21 16:46:43 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 import java.util.EnumMap;
 
@@ -29,9 +30,12 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.Storage;
+
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.io.BytesWritable;
@@ -650,6 +654,62 @@ public abstract class FSEditLogOp {
       return longWritable.get();
     }
   }
+  
+  /**
+   * Class to encapsulate the header at the top of a log file.
+   */
+  static class LogHeader {
+    final int logVersion;
+    final Checksum checksum;
+
+    public LogHeader(int logVersion, Checksum checksum) {
+      this.logVersion = logVersion;
+      this.checksum = checksum;
+    }
+
+    static LogHeader read(DataInputStream in) throws IOException {
+      int logVersion = 0;
+
+      logVersion = FSEditLogOp.LogHeader.readLogVersion(in);
+      Checksum checksum = null;
+      if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
+        checksum = FSEditLog.getChecksum();
+      }
+      return new LogHeader(logVersion, checksum);
+    }
+    
+    /**
+     * Read the header of fsedit log
+     * @param in fsedit stream
+     * @return the edit log version number
+     * @throws IOException if error occurs
+     */
+    private static int readLogVersion(DataInputStream in) throws IOException {
+      int logVersion = 0;
+      // Read log file version. Could be missing.
+      in.mark(4);
+      // If edits log is greater than 2G, available method will return negative
+      // numbers, so we avoid having to call available
+      boolean available = true;
+      try {
+        logVersion = in.readByte();
+      } catch (EOFException e) {
+        available = false;
+      }
+      if (available) {
+        in.reset();
+        logVersion = in.readInt();
+        if (logVersion < FSConstants.LAYOUT_VERSION) // future version
+          throw new IOException(
+              "Unexpected version of the file system log file: "
+              + logVersion + ". Current version = "
+              + FSConstants.LAYOUT_VERSION + ".");
+      }
+      assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
+        "Unsupported version " + logVersion;
+      return logVersion;
+    }
+  }
 
   /**
    * Class for reading editlog ops from a stream
@@ -668,7 +728,12 @@ public abstract class FSEditLogOp {
     @SuppressWarnings("deprecation")
     public Reader(DataInputStream in, int logVersion,
                   Checksum checksum) {
-      this.in = in;
+      if (checksum != null) {
+        this.in = new DataInputStream(
+            new CheckedInputStream(in, checksum));
+      } else {
+        this.in = in;
+      }
       this.logVersion = logVersion;
       this.checksum = checksum;
       opInstances = new EnumMap<FSEditLogOpCodes, FSEditLogOp>(

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java Tue Jun 21 16:46:43 2011
@@ -36,6 +36,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
@@ -398,17 +399,19 @@ class FSImageTransactionalStorageInspect
         return;
       }
 
-      long maxValidLength = Long.MIN_VALUE;
+      long maxValidTxnCount = Long.MIN_VALUE;
       for (FoundEditLog log : logs) {
-        long validLength = log.getValidLength();
-        LOG.warn("  Log " + log.getFile() + " valid length=" + validLength);
-        maxValidLength = Math.max(maxValidLength, validLength);
+        long validTxnCount = log.validateLog().numTransactions;
+        LOG.warn("  Log " + log.getFile() + " valid txns=" + validTxnCount);
+        maxValidTxnCount = Math.max(maxValidTxnCount, validTxnCount);
       }        
 
       for (FoundEditLog log : logs) {
-        if (log.getValidLength() < maxValidLength) {
+        long txns = log.validateLog().numTransactions;
+        if (txns < maxValidTxnCount) {
           LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
-              "it is shorter than " + maxValidLength + " bytes");
+                   "it is has only " + txns + " valid txns whereas another " +
+                   "log has " + maxValidTxnCount);
           log.markCorrupt();
         }
       }
@@ -475,7 +478,7 @@ class FSImageTransactionalStorageInspect
     final long startTxId;
     final long lastTxId;
     
-    private long cachedValidLength = -1;
+    private EditLogValidation cachedValidation = null;
     private boolean isCorrupt = false;
     
     static final long UNKNOWN_END = -1;
@@ -500,11 +503,11 @@ class FSImageTransactionalStorageInspect
       return lastTxId;
     }
 
-    long getValidLength() throws IOException {
-      if (cachedValidLength == -1) {
-        cachedValidLength = EditLogFileInputStream.getValidLength(file);
+    EditLogValidation validateLog() throws IOException {
+      if (cachedValidation == null) {
+        cachedValidation = FSEditLogLoader.validateEditLog(file);
       }
-      return cachedValidLength;
+      return cachedValidation;
     }
 
     boolean isInProgress() {

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java Tue Jun 21 16:46:43 2011
@@ -47,6 +47,7 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.io.Files;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.doReturn;
@@ -95,6 +96,23 @@ public abstract class FSImageTestUtil {
     return inspector;
   }
 
+  
+  /**
+   * Return a standalone instance of FSEditLog that will log into the given
+   * log directory. The returned instance is not yet opened.
+   */
+  public static FSEditLog createStandaloneEditLog(File logDir)
+      throws IOException {
+    assertTrue(logDir.mkdirs() || logDir.exists());
+    Files.deleteDirectoryContents(logDir);
+    NNStorage storage = Mockito.mock(NNStorage.class);
+    List<StorageDirectory> sds = Lists.newArrayList(
+        FSImageTestUtil.mockStorageDirectory(logDir, NameNodeDirType.EDITS));
+    Mockito.doReturn(sds).when(storage).dirIterable(NameNodeDirType.EDITS);
+
+    return new FSEditLog(storage);
+  }
+  
   /**
    * Assert that all of the given directories have the same newest filename
    * for fsimage that they hold the same data.

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Tue Jun 21 16:46:43 2011
@@ -605,54 +605,6 @@ public class TestEditLog extends TestCas
     }
   }
   
-
-  public void testGetValidLength() throws Exception {
-    assertTrue(TEST_DIR.mkdirs() || TEST_DIR.exists());
-
-    // For each of a combination of valid bytes followed by invalid,
-    // make sure we determine the proper non-trailer size of the file.
-    // Sizes designed to find off-by-one errors around 1024 byte chunk size.
-    final int VALID_SIZES[] = new int[] {0, 1, 789, 1023, 1024, 1025};
-    final int TRAILER_SIZES[] = new int[] {0, 1, 1024-789-1, 1024-789,
-        1024-789+1, 1023, 1024, 1025};
-    
-    for (int validPart : VALID_SIZES) {
-      for (int trailerPart : TRAILER_SIZES) {
-        doValidLengthTest(validPart, trailerPart);
-        doValidLengthTest(validPart*3, trailerPart*3);
-      }
-    }
-  }
-
-  private void doValidLengthTest(int validPart, int trailerPart) throws Exception {
-    File file = new File(TEST_DIR, "validLengthTest");
-    FileOutputStream fos = new FileOutputStream(file);
-    try {
-      try {
-        byte[] valid = new byte[validPart];
-        for (int i = 0; i < validPart; i++) {
-          valid[i] = (byte)(i & 0xFF);
-        }
-        // The valid data shouldn't end in a trailer byte, or else we'd think it was one shorter
-        // than actual length
-        if (validPart > 0) valid[validPart - 1] = 1;
-        fos.write(valid);
-        
-        byte[] trailer = new byte[trailerPart];
-        Arrays.fill(trailer, TRAILER_BYTE);
-        fos.write(trailer);
-      } finally {
-        fos.close();
-      }
-      
-      long computedValid = EditLogFileInputStream.getValidLength(file, 1024);
-      assertEquals("Testing valid=" + validPart + " trailer=" + trailerPart,
-          validPart, computedValid);
-    } finally {
-      file.delete();
-    }
-  }
-
   private static class EditLogByteInputStream extends EditLogInputStream {
     private InputStream input;
     private long len;

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java Tue Jun 21 16:46:43 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.DU;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.Test;
@@ -48,8 +49,10 @@ public class TestEditLogFileOutputStream
       .getStorage().getStorageDir(0);
     File editLog = NNStorage.getInProgressEditsFile(sd, 1);
 
+    EditLogValidation validation = FSEditLogLoader.validateEditLog(editLog);
     assertEquals("Edit log should contain a header as valid length",
-        HEADER_LEN, EditLogFileInputStream.getValidLength(editLog));
+        HEADER_LEN, validation.validLength);
+    assertEquals(1, validation.numTransactions);
     assertEquals("Edit log should have 1MB of bytes allocated",
         1024*1024, editLog.length());
     
@@ -57,9 +60,11 @@ public class TestEditLogFileOutputStream
     cluster.getFileSystem().mkdirs(new Path("/tmp"),
         new FsPermission((short)777));
 
+    validation = FSEditLogLoader.validateEditLog(editLog);
     assertEquals("Edit log should have more valid data after writing a txn",
         MKDIR_LEN + HEADER_LEN,
-        EditLogFileInputStream.getValidLength(editLog));
+        validation.validLength);
+    assertEquals(2, validation.numTransactions);
 
     assertEquals("Edit log should be 1MB long",
         1024 * 1024, editLog.length());

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java Tue Jun 21 16:46:43 2011
@@ -20,9 +20,14 @@ package org.apache.hadoop.hdfs.server.na
 import static org.junit.Assert.*;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.Map;
+import java.util.SortedMap;
 
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
@@ -32,12 +37,24 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Level;
 import org.junit.Test;
 
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+
 public class TestFSEditLogLoader {
   
+  static {
+    ((Log4JLogger)FSImage.LOG).getLogger().setLevel(Level.ALL);
+  }
+  
+  private static final File TEST_DIR = new File(
+      System.getProperty("test.build.data","build/test/data"));
+
   private static final int NUM_DATA_NODES = 0;
   
   @Test
@@ -128,4 +145,168 @@ public class TestFSEditLogLoader {
       }
     }
   }
+  
+  /**
+   * Test that the valid number of transactions can be counted from a file.
+   * @throws IOException 
+   */
+  @Test
+  public void testCountValidTransactions() throws IOException {
+    File testDir = new File(TEST_DIR, "testCountValidTransactions");
+    File logFile = new File(testDir,
+        NNStorage.getInProgressEditsFileName(1));
+    
+    // Create a log file, and return the offsets at which each
+    // transaction starts.
+    FSEditLog fsel = null;
+    final int NUM_TXNS = 30;
+    SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap();
+    try {
+      fsel = FSImageTestUtil.createStandaloneEditLog(testDir);
+      fsel.open();
+      assertTrue("should exist: " + logFile, logFile.exists());
+      
+      for (int i = 0; i < NUM_TXNS; i++) {
+        long trueOffset = getNonTrailerLength(logFile);
+        long thisTxId = fsel.getLastWrittenTxId() + 1;
+        offsetToTxId.put(trueOffset, thisTxId);
+        System.err.println("txid " + thisTxId + " at offset " + trueOffset);
+        fsel.logDelete("path" + i, i);
+        fsel.logSync();
+      }
+    } finally {
+      if (fsel != null) {
+        fsel.close();
+      }
+    }
+
+    // The file got renamed when the log was closed.
+    logFile = testDir.listFiles()[0];
+    long validLength = getNonTrailerLength(logFile);
+
+    // Make sure that uncorrupted log has the expected length and number
+    // of transactions.
+    EditLogValidation validation = FSEditLogLoader.validateEditLog(logFile);
+    assertEquals(NUM_TXNS + 2, validation.numTransactions);
+    assertEquals(validLength, validation.validLength);
+    
+    // Back up the uncorrupted log
+    File logFileBak = new File(testDir, logFile.getName() + ".bak");
+    Files.copy(logFile, logFileBak);
+
+    // Corrupt the log file in various ways for each txn
+    for (Map.Entry<Long, Long> entry : offsetToTxId.entrySet()) {
+      long txOffset = entry.getKey();
+      long txid = entry.getValue();
+      
+      // Restore backup, truncate the file exactly before the txn
+      Files.copy(logFileBak, logFile);
+      truncateFile(logFile, txOffset);
+      validation = FSEditLogLoader.validateEditLog(logFile);
+      assertEquals("Failed when truncating to length " + txOffset,
+          txid - 1, validation.numTransactions);
+      assertEquals(txOffset, validation.validLength);
+
+      // Restore backup, truncate the file with one byte in the txn,
+      // also isn't valid
+      Files.copy(logFileBak, logFile);
+      truncateFile(logFile, txOffset + 1);
+      validation = FSEditLogLoader.validateEditLog(logFile);
+      assertEquals("Failed when truncating to length " + (txOffset + 1),
+          txid - 1, validation.numTransactions);
+      assertEquals(txOffset, validation.validLength);
+
+      // Restore backup, corrupt the txn opcode
+      Files.copy(logFileBak, logFile);
+      corruptByteInFile(logFile, txOffset);
+      validation = FSEditLogLoader.validateEditLog(logFile);
+      assertEquals("Failed when corrupting txn opcode at " + txOffset,
+          txid - 1, validation.numTransactions);
+      assertEquals(txOffset, validation.validLength);
+
+      // Restore backup, corrupt a byte a few bytes into the txn
+      Files.copy(logFileBak, logFile);
+      corruptByteInFile(logFile, txOffset+5);
+      validation = FSEditLogLoader.validateEditLog(logFile);
+      assertEquals("Failed when corrupting txn data at " + (txOffset+5),
+          txid - 1, validation.numTransactions);
+      assertEquals(txOffset, validation.validLength);
+    }
+    
+    // Corrupt the log at every offset to make sure that validation itself
+    // never throws an exception, and that the calculated lengths are monotonically
+    // increasing
+    long prevNumValid = 0;
+    for (long offset = 0; offset < validLength; offset++) {
+      Files.copy(logFileBak, logFile);
+      corruptByteInFile(logFile, offset);
+      EditLogValidation val = FSEditLogLoader.validateEditLog(logFile);
+      assertTrue(val.numTransactions >= prevNumValid);
+      prevNumValid = val.numTransactions;
+    }
+  }
+
+  /**
+   * Corrupt the byte at the given offset in the given file,
+   * by subtracting 1 from it.
+   */
+  private void corruptByteInFile(File file, long offset)
+      throws IOException {
+    RandomAccessFile raf = new RandomAccessFile(file, "rw");
+    try {
+      raf.seek(offset);
+      int origByte = raf.read();
+      raf.seek(offset);
+      raf.writeByte(origByte - 1);
+    } finally {
+      IOUtils.closeStream(raf);
+    }
+  }
+
+  /**
+   * Truncate the given file to the given length
+   */
+  private void truncateFile(File logFile, long newLength)
+      throws IOException {
+    RandomAccessFile raf = new RandomAccessFile(logFile, "rw");
+    raf.setLength(newLength);
+    raf.close();
+  }
+
+  /**
+   * Return the length of bytes in the given file after subtracting
+   * the trailer of 0xFF (OP_INVALID)s.
+   * This seeks to the end of the file and reads chunks backwards until
+   * it finds a non-0xFF byte.
+   * @throws IOException if the file cannot be read
+   */
+  private static long getNonTrailerLength(File f) throws IOException {
+    final int chunkSizeToRead = 256*1024;
+    FileInputStream fis = new FileInputStream(f);
+    try {
+      
+      byte buf[] = new byte[chunkSizeToRead];
+  
+      FileChannel fc = fis.getChannel();
+      long size = fc.size();
+      long pos = size - (size % chunkSizeToRead);
+      
+      while (pos >= 0) {
+        fc.position(pos);
+  
+        int readLen = (int) Math.min(size - pos, chunkSizeToRead);
+        IOUtils.readFully(fis, buf, 0, readLen);
+        for (int i = readLen - 1; i >= 0; i--) {
+          if (buf[i] != FSEditLogOpCodes.OP_INVALID.getOpCode()) {
+            return pos + i + 1; // + 1 since we count this byte!
+          }
+        }
+        
+        pos -= chunkSizeToRead;
+      }
+      return 0;
+    } finally {
+      fis.close();
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java?rev=1138085&r1=1138084&r2=1138085&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java Tue Jun 21 16:46:43 2011
@@ -249,14 +249,15 @@ public class TestFSImageStorageInspector
     LogGroup lg = inspector.logGroups.get(123L);
     assertEquals(3, lg.logs.size());
     
-    // Inject spies to return the lengths we would like to see
-    long validLengths[] = new long[] { 2000, 2000, 1000 };
+    // Inject spies to return the valid counts we would like to see
+    long validTxnCounts[] = new long[] { 2000, 2000, 1000 };
     for (int i = 0; i < 3; i++) {
       FoundEditLog inProgressLog = lg.logs.get(i);
       assertTrue(inProgressLog.isInProgress());
       
       inProgressLog = spy(inProgressLog);
-      doReturn(validLengths[i]).when(inProgressLog).getValidLength();
+      doReturn(new FSEditLogLoader.EditLogValidation(-1, validTxnCounts[i]))
+        .when(inProgressLog).validateLog();
       lg.logs.set(i, inProgressLog);      
     }