You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2008/06/30 22:59:29 UTC

svn commit: r672918 - /hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/DataBlockScanner.java

Author: hairong
Date: Mon Jun 30 13:59:29 2008
New Revision: 672918

URL: http://svn.apache.org/viewvc?rev=672918&view=rev
Log:
HADOOP-3635. Uncaught exception in DataBlockScanner. Contributed by Tsz Wo (Nicholas), SZE.

Modified:
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/DataBlockScanner.java

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/DataBlockScanner.java?rev=672918&r1=672917&r2=672918&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/DataBlockScanner.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/DataBlockScanner.java Mon Jun 30 13:59:29 2008
@@ -22,6 +22,7 @@
 import java.io.Closeable;
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
@@ -64,7 +65,7 @@
   static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks
   private static final long ONE_DAY = 24*3600*1000L;
   
-  static DateFormat dateFormat = 
+  static final DateFormat dateFormat = 
                     new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
   
   static final String verificationLogFile = "dncp_block_verification.log";
@@ -177,6 +178,19 @@
     }
   }
   
+  /** Update blockMap by the given LogEntry */
+  private synchronized void updateBlockInfo(LogEntry e) {
+    BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
+    
+    if(info != null && e.verificationTime > 0 && 
+        info.lastScanTime < e.verificationTime) {
+      delBlockInfo(info);
+      info.lastScanTime = e.verificationTime;
+      info.lastScanType = ScanType.VERIFICATION_SCAN;
+      addBlockInfo(info);
+    }
+  }
+
   private void init() {
     
     // get the list of blocks and arrange them in random order
@@ -353,12 +367,17 @@
         String name = matcher.group(1);
         String value = matcher.group(2);
         
-        if (name.equals("id")) {
-          entry.blockId = Long.valueOf(value);
-        } else if (name.equals("time")) {
-          entry.verificationTime = Long.valueOf(value);
-        } else if (name.equals("genstamp")) {
-          entry.genStamp = Long.valueOf(value);
+        try {
+          if (name.equals("id")) {
+            entry.blockId = Long.valueOf(value);
+          } else if (name.equals("time")) {
+            entry.verificationTime = Long.valueOf(value);
+          } else if (name.equals("genstamp")) {
+            entry.genStamp = Long.valueOf(value);
+          }
+        } catch(NumberFormatException nfe) {
+          LOG.warn("Cannot parse line: " + line, nfe);
+          return null;
         }
       }
       
@@ -456,11 +475,10 @@
     }
   }
   
+  /** returns false if the process was interrupted
+   * because the thread is marked to exit.
+   */
   private boolean assignInitialVerificationTimes() {
-    /* returns false if the process was interrupted
-     * because the thread is marked to exit.
-     */
-    
     int numBlocks = 1;
     synchronized (this) {
       numBlocks = Math.max(blockMap.size(), 1);
@@ -470,7 +488,7 @@
     LogFileHandler.Reader logReader = null;
     try {
       if (verificationLog != null) {
-        logReader = verificationLog.newReader();
+        logReader = verificationLog.new Reader(false);
       }
     } catch (IOException e) {
       LOG.warn("Could not read previous verification times : " +
@@ -482,24 +500,13 @@
     }
     
     // update verification times from the verificationLog.
-    Block tmpBlock = new Block();
     while (logReader != null && logReader.hasNext()) {
       if (!datanode.shouldRun || Thread.interrupted()) {
         return false;
       }
-      String line = logReader.next();
-      LogEntry entry = LogEntry.parseEntry(line);
-      synchronized (this) {
-        tmpBlock.set(entry.blockId, 0, entry.genStamp);
-        BlockScanInfo info = blockMap.get(tmpBlock);
-        
-        if(info != null && entry.verificationTime > 0 && 
-            info.lastScanTime < entry.verificationTime) {
-          delBlockInfo(info);
-          info.lastScanTime = entry.verificationTime;
-          info.lastScanType = ScanType.VERIFICATION_SCAN;
-          addBlockInfo(info);
-        }
+      LogEntry entry = LogEntry.parseEntry(logReader.next());
+      if (entry != null) {
+        updateBlockInfo(entry);
       }
     }
     
@@ -669,7 +676,7 @@
     private static final String curFileSuffix = ".curr";
     private static final String prevFileSuffix = ".prev";
     
-    /// Don't roll files more aften than this
+    // Don't roll files more often than this
     private static final long minRollingPeriod = 6 * 3600 * 1000L; // 6 hours
     private static final long minWarnPeriod = minRollingPeriod;
     private static final int minLineLimit = 1000;
@@ -687,14 +694,14 @@
     
     long lastWarningTime = 0;
     
-    PrintStream out;
+    private PrintStream out;
     
     int numReaders = 0;
         
     /**
      * Opens the log file for appending.
      * Note that rolling will happen only after "updateLineCount()" is 
-     * called. This is so that line count could be updated in a seprate
+     * called. This is so that line count could be updated in a separate
      * thread without delaying start up.
      * 
      * @param dir where the logs files are located.
@@ -717,7 +724,7 @@
     }
     
     /**
-     * This appends "line\n". Note "\n".
+     * Append "\n" + line.
      * If the log file need to be rolled, it will done after 
      * appending the text.
      * This does not throw IOException when there is an error while 
@@ -726,7 +733,8 @@
      * return true if append was successful.
      */
     synchronized boolean appendLine(String line) {
-      out.println(line);
+      out.println();
+      out.print(line);
       curNumLines += (curNumLines < 0) ? -1 : 1;
       try {
         rollIfRequired();
@@ -746,10 +754,8 @@
       }
     }
     
-    private void openCurFile() throws IOException {
-      if (out != null) {
-        out.close();
-      }
+    private synchronized void openCurFile() throws FileNotFoundException {
+      close();
       out = new PrintStream(new FileOutputStream(curFile, true));
     }
     
@@ -783,7 +789,7 @@
         throw new IOException("Could not delete " + prevFile);
       }
       
-      out.close();
+      close();
 
       if (!curFile.renameTo(prevFile)) {
         openCurFile();
@@ -802,10 +808,6 @@
       }
     }
     
-    Reader newReader() throws IOException {
-      return new Reader(false);
-    }
-    
     /**
      * This is used to read the lines in order.
      * If the data is not read completely (i.e, untill hasNext() returns