You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/17 00:58:27 UTC

svn commit: r464710 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/fs/

Author: cutting
Date: Mon Oct 16 15:58:26 2006
New Revision: 464710

URL: http://svn.apache.org/viewvc?view=rev&rev=464710
Log:
HADOOP-498.  Re-implement DFS integrity checker to run server-side for much improved performance.  Contributed by Milind.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=464710&r1=464709&r2=464710
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Oct 16 15:58:26 2006
@@ -11,6 +11,9 @@
     attempts from 'info' to 'debug', so they are not normally shown.
     (Konstantin Shvachko via cutting)
 
+ 3. HADOOP-498.  Re-implement DFS integrity checker to run server-side,
+    for much improved performance.  (Milind Bhandarkar via cutting)
+
 
 Release 0.7.1 - 2006-10-11
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java?view=diff&rev=464710&r1=464709&r2=464710
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java Mon Oct 16 15:58:26 2006
@@ -15,22 +15,16 @@
  */
 package org.apache.hadoop.dfs;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.Random;
-import java.util.TreeSet;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLEncoder;
 
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSOutputStream;
-import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.util.ToolBase;
 
 /**
@@ -60,361 +54,32 @@
 public class DFSck extends ToolBase {
   private static final Log LOG = LogFactory.getLog(DFSck.class.getName());
 
-  /** Don't attempt any fixing . */
-  public static final int FIXING_NONE = 0;
-  /** Move corrupted files to /lost+found . */
-  public static final int FIXING_MOVE = 1;
-  /** Delete corrupted files. */
-  public static final int FIXING_DELETE = 2;
-  
-  private DFSClient dfs;
-  private UTF8 lostFound = null;
-  private boolean lfInited = false;
-  private boolean lfInitedOk = false;
-  private boolean showFiles = false;
-  private boolean showBlocks = false;
-  private boolean showLocations = false;
-  private int fixing;
- 
-  DFSck() {
-  }
+  DFSck() {}
   
   /**
    * Filesystem checker.
    * @param conf current Configuration
-   * @param fixing one of pre-defined values
-   * @param showFiles show each file being checked
-   * @param showBlocks for each file checked show its block information
-   * @param showLocations for each block in each file show block locations
    * @throws Exception
    */
-  public DFSck(Configuration conf, int fixing, boolean showFiles, boolean showBlocks, boolean showLocations) throws Exception {
+  public DFSck(Configuration conf) throws Exception {
     setConf(conf);
-    init(fixing, showFiles, showBlocks, showLocations);
-  }
-  
-  public void init(int fixing, boolean showFiles, 
-          boolean showBlocks, boolean showLocations) throws IOException {
-      String fsName = conf.get("fs.default.name", "local");
-      if (fsName.equals("local")) {
-        throw new IOException("This tool only checks DFS, but your config uses 'local' FS.");
-      }
-      this.dfs = new DFSClient(DataNode.createSocketAddr(fsName), conf);
-      this.fixing = fixing;
-      this.showFiles = showFiles;
-      this.showBlocks = showBlocks;
-      this.showLocations = showLocations;
-  }
-  
-  /**
-   * Check files on DFS, starting from the indicated path.
-   * @param path starting point
-   * @return result of checking
-   * @throws Exception
-   */
-  public Result fsck(String path) throws Exception {
-    DFSFileInfo[] files = dfs.listPaths(new UTF8(path));
-    Result res = new Result();
-    res.setReplication(dfs.getDefaultReplication());
-    for (int i = 0; i < files.length; i++) {
-      check(files[i], res);
-    }
-    return res;
-  }
-  
-  private void check(DFSFileInfo file, Result res) throws Exception {
-    if (file.isDir()) {
-      if (showFiles)
-        System.out.println(file.getPath() + " <dir>");
-      res.totalDirs++;
-      DFSFileInfo[] files = dfs.listPaths(new UTF8(file.getPath()));
-      for (int i = 0; i < files.length; i++) {
-        check(files[i], res);
-      }
-      return;
-    }
-    res.totalFiles++;
-    res.totalSize += file.getLen();
-    LocatedBlock[] blocks = dfs.namenode.open(file.getPath());
-    res.totalBlocks += blocks.length;
-    if (showFiles) {
-      System.out.print(file.getPath() + " " + file.getLen() + ", " + blocks.length + " block(s): ");
-    } else {
-      System.out.print('.');
-      System.out.flush();
-      if (res.totalFiles % 100 == 0) System.out.println();
-    }
-    int missing = 0;
-    long missize = 0;
-    StringBuffer report = new StringBuffer();
-    for (int i = 0; i < blocks.length; i++) {
-      Block block = blocks[i].getBlock();
-      long id = block.getBlockId();
-      DatanodeInfo[] locs = blocks[i].getLocations();
-      short targetFileReplication = file.getReplication();
-      if (locs.length > targetFileReplication) res.overReplicatedBlocks += (locs.length - targetFileReplication);
-      if (locs.length < targetFileReplication && locs.length > 0) res.underReplicatedBlocks += (targetFileReplication - locs.length);
-      report.append(i + ". " + id + " len=" + block.getNumBytes());
-      if (locs == null || locs.length == 0) {
-        report.append(" MISSING!");
-        res.addMissing(block.getBlockName(), block.getNumBytes());
-        missing++;
-        missize += block.getNumBytes();
-      } else {
-        report.append(" repl=" + locs.length);
-        if (showLocations) {
-          StringBuffer sb = new StringBuffer("[");
-          for (int j = 0; j < locs.length; j++) {
-            if (j > 0) sb.append(", ");
-            sb.append(locs[j]);
-          }
-          sb.append(']');
-          report.append(" " + sb.toString());
-        }
-      }
-      report.append('\n');
-    }
-    if (missing > 0) {
-      if (!showFiles)
-        System.out.println("\nMISSING " + missing + " blocks of total size " + missize + " B");
-      res.corruptFiles++;
-      switch (fixing) {
-        case FIXING_NONE: // do nothing
-          System.err.println("\n - ignoring corrupted " + file.getPath());
-          break;
-        case FIXING_MOVE:
-          System.err.println("\n - moving to /lost+found: " + file.getPath());
-          lostFoundMove(file, blocks);
-          break;
-        case FIXING_DELETE:
-          System.err.println("\n - deleting corrupted " + file.getPath());
-          dfs.delete(new UTF8(file.getPath()));
-      }
-    }
-    if (showFiles) {
-      if (missing > 0) {
-        System.out.println(" MISSING " + missing + " blocks of total size " + missize + " B");
-      } else System.out.println(" OK");
-      if (showBlocks) System.out.println(report.toString());
-    }
-  }
-  
-  private void lostFoundMove(DFSFileInfo file, LocatedBlock[] blocks) {
-    if (!lfInited) {
-      lostFoundInit();
-    }
-    if (!lfInitedOk) {
-      return;
-    }
-    UTF8 target = new UTF8(lostFound.toString() + file.getPath());
-    String errmsg = "Failed to move " + file.getPath() + " to /lost+found";
-    try {
-      if (!dfs.mkdirs(target)) {
-        System.err.println(errmsg);
-        return;
-      }
-      // create chains
-      int chain = 0;
-      FSOutputStream fos = null;
-      for (int i = 0; i < blocks.length; i++) {
-        LocatedBlock lblock = blocks[i];
-        DatanodeInfo[] locs = lblock.getLocations();
-        if (locs == null || locs.length == 0) {
-          if (fos != null) {
-            fos.flush();
-            fos.close();
-            fos = null;
-          }
-          continue;
-        }
-        if (fos == null) {
-          fos = dfs.create(new UTF8(target.toString() + "/" + chain), true);
-          if (fos != null) chain++;
-        }
-        if (fos == null) {
-          System.err.println(errmsg + ": could not store chain " + chain);
-          // perhaps we should bail out here...
-          // return;
-          continue;
-        }
-        
-        // copy the block. It's a pity it's not abstracted from DFSInputStream ...
-        try {
-          copyBlock(lblock, fos);
-        } catch (Exception e) {
-          e.printStackTrace();
-          // something went wrong copying this block...
-          System.err.println(" - could not copy block " + lblock.getBlock().getBlockName() + " to " + target);
-          fos.flush();
-          fos.close();
-          fos = null;
-        }
-      }
-      if (fos != null) fos.close();
-      System.err.println("\n - moved corrupted file " + file.getPath() + " to /lost+found");
-      dfs.delete(new UTF8(file.getPath()));
-    } catch (Exception e) {
-      e.printStackTrace();
-      System.err.println(errmsg + ": " + e.getMessage());
-    }
-  }
-  
-  /*
-   * XXX (ab) Bulk of this method is copied verbatim from {@link DFSClient}, which is
-   * bad. Both places should be refactored to provide a method to copy blocks
-   * around.
-   */
-  private void copyBlock(LocatedBlock lblock, FSOutputStream fos) throws Exception {
-    int failures = 0;
-    InetSocketAddress targetAddr = null;
-    TreeSet deadNodes = new TreeSet();
-    Socket s = null;
-    DataInputStream in = null;
-    DataOutputStream out = null;
-    while (s == null) {
-        DatanodeInfo chosenNode;
-
-        try {
-            chosenNode = bestNode(lblock.getLocations(), deadNodes);
-            targetAddr = DataNode.createSocketAddr(chosenNode.getName());
-        } catch (IOException ie) {
-            if (failures >= DFSClient.MAX_BLOCK_ACQUIRE_FAILURES) {
-                throw new IOException("Could not obtain block " + lblock);
-            }
-            LOG.info("Could not obtain block from any node:  " + ie);
-            try {
-                Thread.sleep(10000);
-            } catch (InterruptedException iex) {
-            }
-            deadNodes.clear();
-            failures++;
-            continue;
-        }
-        try {
-            s = new Socket();
-            s.connect(targetAddr, FSConstants.READ_TIMEOUT);
-            s.setSoTimeout(FSConstants.READ_TIMEOUT);
-
-            //
-            // Xmit header info to datanode
-            //
-            out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
-            out.write(FSConstants.OP_READSKIP_BLOCK);
-            lblock.getBlock().write(out);
-            out.writeLong(0L);
-            out.flush();
-
-            //
-            // Get bytes in block, set streams
-            //
-            in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
-            long curBlockSize = in.readLong();
-            long amtSkipped = in.readLong();
-            if (curBlockSize != lblock.getBlock().len) {
-                throw new IOException("Recorded block size is " + lblock.getBlock().len + ", but datanode reports size of " + curBlockSize);
-            }
-            if (amtSkipped != 0L) {
-                throw new IOException("Asked for offset of " + 0L + ", but only received offset of " + amtSkipped);
-            }
-        } catch (IOException ex) {
-            // Put chosen node into dead list, continue
-            LOG.info("Failed to connect to " + targetAddr + ":" + ex);
-            deadNodes.add(chosenNode);
-            if (s != null) {
-                try {
-                    s.close();
-                } catch (IOException iex) {
-                }                        
-            }
-            s = null;
-        }
-    }
-    if (in == null) {
-      throw new Exception("Could not open data stream for " + lblock.getBlock().getBlockName());
-    }
-    byte[] buf = new byte[1024];
-    int cnt = 0;
-    boolean success = true;
-    try {
-      while ((cnt = in.read(buf)) > 0) {
-        fos.write(buf, 0, cnt);
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-      success = false;
-    } finally {
-      try {in.close(); } catch (Exception e1) {}
-      try {out.close(); } catch (Exception e1) {}
-      try {s.close(); } catch (Exception e1) {}
-    }
-    if (!success)
-      throw new Exception("Could not copy block data for " + lblock.getBlock().getBlockName());
   }
   
-  /*
-   * XXX (ab) See comment above for copyBlock().
-   * 
-   * Pick the best node from which to stream the data.
-   * That's the local one, if available.
-   */
-  Random r = new Random();
-  private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException {
-      if ((nodes == null) || 
-          (nodes.length - deadNodes.size() < 1)) {
-          throw new IOException("No live nodes contain current block");
-      }
-      DatanodeInfo chosenNode = null;
-      for (int i = 0; i < nodes.length; i++) {
-          if (deadNodes.contains(nodes[i])) {
-              continue;
-          }
-          String nodename = nodes[i].getName();
-          int colon = nodename.indexOf(':');
-          if (colon >= 0) {
-              nodename = nodename.substring(0, colon);
-          }
-          if (dfs.localName.equals(nodename)) {
-              chosenNode = nodes[i];
-              break;
-          }
-      }
-      if (chosenNode == null) {
-          do {
-              chosenNode = nodes[Math.abs(r.nextInt()) % nodes.length];
-          } while (deadNodes.contains(chosenNode));
-      }
-      return chosenNode;
-  }
-
-  private void lostFoundInit() {
-    lfInited = true;
-    try {
-      UTF8 lfName = new UTF8("/lost+found");
-      // check that /lost+found exists
-      if (!dfs.exists(lfName)) {
-        lfInitedOk = dfs.mkdirs(lfName);
-        lostFound = lfName;
-      } else if (!dfs.isDirectory(lfName)) {
-        System.err.println("Cannot use /lost+found : a regular file with this name exists.");
-        lfInitedOk = false;
-      } else { // exists and isDirectory
-        lostFound = lfName;
-        lfInitedOk = true;
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-      lfInitedOk = false;
-    }
-    if (lostFound == null) {
-      System.err.println("Cannot initialize /lost+found .");
-      lfInitedOk = false;
-    }
+  private String getInfoServer() throws IOException {
+    String fsName = conf.get("fs.default.name", "local");
+    if (fsName.equals("local")) {
+      throw new IOException("This tool only checks DFS, but your config uses 'local' FS.");
+    }
+    String[] splits = fsName.split(":", 2);
+    int infoPort = conf.getInt("dfs.info.port", 50070);
+    return splits[0]+":"+infoPort;
   }
   
   /**
    * @param args
    */
   public int run(String[] args) throws Exception {
+    String fsName = getInfoServer();
     if (args.length == 0) {
       System.err.println("Usage: DFSck <path> [-move | -delete] [-files] [-blocks [-locations]]");
       System.err.println("\t<path>\tstart checking from this path");
@@ -425,26 +90,33 @@
       System.err.println("\t-locations\tprint out locations for every block");
       return -1;
     }
-    String path = args[0];
-    boolean showFiles = false;
-    boolean showBlocks = false;
-    boolean showLocations = false;
-    int fixing = FIXING_NONE;
-    for (int i = 1; i < args.length; i++) {
-      if (args[i].equals("-files")) showFiles = true;
-      if (args[i].equals("-blocks")) showBlocks = true;
-      if (args[i].equals("-locations")) showLocations = true;
-      if (args[i].equals("-move")) fixing = FIXING_MOVE;
-      if (args[i].equals("-delete")) fixing = FIXING_DELETE;
-    }
-    init(fixing, showFiles, showBlocks, showLocations);
-    Result res = fsck(path);
-    System.out.println();
-    System.out.println(res);
-    if (res.isHealthy()) {
-      System.out.println("\n\nThe filesystem under path '" + args[0] + "' is HEALTHY");
-    } else {
-      System.out.println("\n\nThe filesystem under path '" + args[0] + "' is CORRUPT");
+    StringBuffer url = new StringBuffer("http://"+fsName+"/fsck?path=");
+    String dir = "/";
+    // find top-level dir first
+    for (int idx = 0; idx < args.length; idx++) {
+      if (!args[idx].startsWith("-")) { dir = args[idx]; break; }
+    }
+    url.append(URLEncoder.encode(dir, "UTF-8"));
+    for (int idx = 1; idx < args.length; idx++) {
+      if (args[idx].equals("-move")) { url.append("&move"); }
+      if (args[idx].equals("-delete")) { url.append("&delete"); }
+      if (args[idx].equals("-files")) { url.append("&files"); }
+      if (args[idx].equals("-blocks")) { url.append("&blocks"); }
+      if (args[idx].equals("-locations")) { url.append("&locations"); }
+    }
+    URL path = new URL(url.toString());
+    URLConnection connection = path.openConnection();
+    InputStream stream = connection.getInputStream();
+    InputStreamReader input =
+        new InputStreamReader(stream, "UTF-8");
+    try {
+      int c = input.read();
+      while (c != -1) {
+        System.out.print((char) c);
+        c = input.read();
+      }
+    } finally {
+      input.close();
     }
     return 0;
   }
@@ -453,163 +125,4 @@
       int res = new DFSck().doMain(new Configuration(), args);
       System.exit(res);
   }
-
-  /**
-   * Result of checking, plus overall DFS statistics.
-   * @author Andrzej Bialecki
-   */
-  public static class Result {
-    private ArrayList missingIds = new ArrayList();
-    private long missingSize = 0L;
-    private long corruptFiles = 0L;
-    private long overReplicatedBlocks = 0L;
-    private long underReplicatedBlocks = 0L;
-    private int replication = 0;
-    private long totalBlocks = 0L;
-    private long totalFiles = 0L;
-    private long totalDirs = 0L;
-    private long totalSize = 0L;
-    
-    /**
-     * DFS is considered healthy if there are no missing blocks.
-     * @return
-     */
-    public boolean isHealthy() {
-      return missingIds.size() == 0;
-    }
-    
-    /** Add a missing block name, plus its size. */
-    public void addMissing(String id, long size) {
-      missingIds.add(id);
-      missingSize += size;
-    }
-    
-    /** Return a list of missing block names (as list of Strings). */
-    public ArrayList getMissingIds() {
-      return missingIds;
-    }
-
-    /** Return total size of missing data, in bytes. */
-    public long getMissingSize() {
-      return missingSize;
-    }
-
-    public void setMissingSize(long missingSize) {
-      this.missingSize = missingSize;
-    }
-
-    /** Return the number of over-replicsted blocks. */
-    public long getOverReplicatedBlocks() {
-      return overReplicatedBlocks;
-    }
-
-    public void setOverReplicatedBlocks(long overReplicatedBlocks) {
-      this.overReplicatedBlocks = overReplicatedBlocks;
-    }
-
-    /** Return the actual replication factor. */
-    public float getReplicationFactor() {
-      if (totalBlocks != 0)
-        return (float)(totalBlocks * replication + overReplicatedBlocks - underReplicatedBlocks) / (float)totalBlocks;
-      else return 0.0f;
-    }
-
-    /** Return the number of under-replicated blocks. Note: missing blocks are not counted here.*/
-    public long getUnderReplicatedBlocks() {
-      return underReplicatedBlocks;
-    }
-
-    public void setUnderReplicatedBlocks(long underReplicatedBlocks) {
-      this.underReplicatedBlocks = underReplicatedBlocks;
-    }
-
-    /** Return total number of directories encountered during this scan. */
-    public long getTotalDirs() {
-      return totalDirs;
-    }
-
-    public void setTotalDirs(long totalDirs) {
-      this.totalDirs = totalDirs;
-    }
-
-    /** Return total number of files encountered during this scan. */
-    public long getTotalFiles() {
-      return totalFiles;
-    }
-
-    public void setTotalFiles(long totalFiles) {
-      this.totalFiles = totalFiles;
-    }
-
-    /** Return total size of scanned data, in bytes. */
-    public long getTotalSize() {
-      return totalSize;
-    }
-
-    public void setTotalSize(long totalSize) {
-      this.totalSize = totalSize;
-    }
-
-    /** Return the intended replication factor, against which the over/under-
-     * replicated blocks are counted. Note: this values comes from the current
-     * Configuration supplied for the tool, so it may be different from the
-     * value in DFS Configuration.
-     */
-    public int getReplication() {
-      return replication;
-    }
-
-    public void setReplication(int replication) {
-      this.replication = replication;
-    }
-
-    /** Return the total number of blocks in the scanned area. */
-    public long getTotalBlocks() {
-      return totalBlocks;
-    }
-
-    public void setTotalBlocks(long totalBlocks) {
-      this.totalBlocks = totalBlocks;
-    }
-    
-    public String toString() {
-      StringBuffer res = new StringBuffer();
-      res.append("Status: " + (isHealthy() ? "HEALTHY" : "CORRUPT"));
-      res.append("\n Total size:\t" + totalSize + " B");
-      res.append("\n Total blocks:\t" + totalBlocks);
-      if (totalBlocks > 0) res.append(" (avg. block size "
-              + (totalSize / totalBlocks) + " B)");
-      res.append("\n Total dirs:\t" + totalDirs);
-      res.append("\n Total files:\t" + totalFiles);
-      if (missingSize > 0) {
-        res.append("\n  ********************************");
-        res.append("\n  CORRUPT FILES:\t" + corruptFiles);
-        res.append("\n  MISSING BLOCKS:\t" + missingIds.size());
-        res.append("\n  MISSING SIZE:\t\t" + missingSize + " B");
-        res.append("\n  ********************************");
-      }
-      res.append("\n Over-replicated blocks:\t" + overReplicatedBlocks);
-      if (totalBlocks > 0) res.append(" ("
-              + ((float)(overReplicatedBlocks * 100) / (float)totalBlocks)
-              + " %)");
-      res.append("\n Under-replicated blocks:\t" + underReplicatedBlocks);
-      if (totalBlocks > 0) res.append(" ("
-              + ((float)(underReplicatedBlocks * 100) / (float)totalBlocks)
-              + " %)");
-      res.append("\n Target replication factor:\t" + replication);
-      res.append("\n Real replication factor:\t" + getReplicationFactor());
-      return res.toString();
-    }
-
-    /** Return the number of currupted files. */
-    public long getCorruptFiles() {
-      return corruptFiles;
-    }
-
-    public void setCorruptFiles(long corruptFiles) {
-      this.corruptFiles = corruptFiles;
-    }
-  }
-
-
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=464710&r1=464709&r2=464710
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon Oct 16 15:58:26 2006
@@ -26,6 +26,13 @@
 import java.io.*;
 import java.net.InetSocketAddress;
 import java.util.*;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskTracker;
 
 /***************************************************
  * FSNamesystem does the actual bookkeeping work for the
@@ -190,7 +197,7 @@
      * dir is where the filesystem directory state 
      * is stored
      */
-    public FSNamesystem(File dir, Configuration conf) throws IOException {
+    public FSNamesystem(File dir, NameNode nn, Configuration conf) throws IOException {
         fsNamesystemObject = this;
         InetSocketAddress addr = DataNode.createSocketAddr(conf.get("fs.default.name", "local"));
         this.maxReplication = conf.getInt("dfs.replication.max", 512);
@@ -229,6 +236,10 @@
         this.infoPort = conf.getInt("dfs.info.port", 50070);
         this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0");
         this.infoServer = new StatusHttpServer("dfs",infoBindAddress, infoPort, false);
+        this.infoServer.setAttribute("name.system", this);
+        this.infoServer.setAttribute("name.node", nn);
+        this.infoServer.setAttribute("name.conf", conf);
+        this.infoServer.addServlet("fsck", "/fsck", FsckServlet.class);
         this.infoServer.start();
     }
     /** Return the FSNamesystem object
@@ -2463,5 +2474,30 @@
       if( ! isInSafeMode() )
         return "";
       return safeMode.getTurnOffTip();
+    }
+    
+    /**
+     * This class is used in Namesystem's jetty to do fsck on namenode
+     * @author Milind Bhandarkar
+     */
+    public static class FsckServlet extends HttpServlet {
+      public void doGet(HttpServletRequest request,
+          HttpServletResponse response
+          ) throws ServletException, IOException {
+        Map<String,String[]> pmap = request.getParameterMap();
+        try {
+          ServletContext context = getServletContext();
+          NameNode nn = (NameNode) context.getAttribute("name.node");
+          Configuration conf = (Configuration) context.getAttribute("name.conf");
+          NamenodeFsck fscker = new NamenodeFsck(conf, nn, pmap, response);
+          fscker.fsck();
+        } catch (IOException ie) {
+          StringUtils.stringifyException(ie);
+          LOG.warn(ie);
+          String errMsg = "Fsck on path " + pmap.get("path") + " failed.";
+          response.sendError(HttpServletResponse.SC_GONE, errMsg);
+          throw ie;
+        }
+      }
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=464710&r1=464709&r2=464710
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Mon Oct 16 15:58:26 2006
@@ -23,8 +23,6 @@
 import org.apache.hadoop.conf.*;
 
 import java.io.*;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
 
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.Metrics;
@@ -129,7 +127,7 @@
      * Create a NameNode at the specified location and start it.
      */
     public NameNode(File dir, String bindAddress, int port, Configuration conf) throws IOException {
-        this.namesystem = new FSNamesystem(dir, conf);
+        this.namesystem = new FSNamesystem(dir, this, conf);
         this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
         this.server = RPC.getServer(this, bindAddress, port, handlerCount, false, conf);
         this.server.start();
@@ -546,7 +544,7 @@
       if( version != DFS_CURRENT_VERSION )
         throw new IncorrectVersionException( version, "data node" );
     }
-
+    
     /**
      */
     public static void main(String argv[]) throws Exception {

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java?view=auto&rev=464710
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java Mon Oct 16 15:58:26 2006
@@ -0,0 +1,593 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeSet;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.io.UTF8;
+
+
+/**
+ * This class provides rudimentary checking of DFS volumes for errors and
+ * sub-optimal conditions.
+ * <p>The tool scans all files and directories, starting from an indicated
+ *  root path. The following abnormal conditions are detected and handled:</p>
+ * <ul>
+ * <li>files with blocks that are completely missing from all datanodes.<br/>
+ * In this case the tool can perform one of the following actions:
+ *  <ul>
+ *      <li>none ({@link #FIXING_NONE})</li>
+ *      <li>move corrupted files to /lost+found directory on DFS
+ *      ({@link #FIXING_MOVE}). Remaining data blocks are saved as a
+ *      block chains, representing longest consecutive series of valid blocks.</li>
+ *      <li>delete corrupted files ({@link #FIXING_DELETE})</li>
+ *  </ul>
+ *  </li>
+ *  <li>detect files with under-replicated or over-replicated blocks</li>
+ *  </ul>
+ *  Additionally, the tool collects a detailed overall DFS statistics, and
+ *  optionally can print detailed statistics on block locations and replication
+ *  factors of each file.
+ *
+ * @author Andrzej Bialecki
+ */
+public class NamenodeFsck {
+  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.NameNode");
+  
+  /** Don't attempt any fixing . */
+  public static final int FIXING_NONE = 0;
+  /** Move corrupted files to /lost+found . */
+  public static final int FIXING_MOVE = 1;
+  /** Delete corrupted files. */
+  public static final int FIXING_DELETE = 2;
+  
+  private NameNode nn;
+  private UTF8 lostFound = null;
+  private boolean lfInited = false;
+  private boolean lfInitedOk = false;
+  private boolean showFiles = false;
+  private boolean showBlocks = false;
+  private boolean showLocations = false;
+  private int fixing = FIXING_NONE;
+  private String path = "/";
+  
+  private Configuration conf;
+  private HttpServletResponse response;
+  private PrintWriter out;
+  
+  /**
+   * Filesystem checker.
+   * @param conf current Configuration
+   * @param fixing one of pre-defined values
+   * @param showFiles show each file being checked
+   * @param showBlocks for each file checked show its block information
+   * @param showLocations for each block in each file show block locations
+   * @throws Exception
+   */
+  public NamenodeFsck(Configuration conf,
+      NameNode nn,
+      Map<String,String[]> pmap,
+      HttpServletResponse response) throws IOException {
+    this.conf = conf;
+    this.nn = nn;
+    this.response = response;
+    this.out = response.getWriter();
+    for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
+      String key = it.next();
+      if (key.equals("path")) { this.path = pmap.get("path")[0]; }
+      else if (key.equals("move")) { this.fixing = FIXING_MOVE; }
+      else if (key.equals("delete")) { this.fixing = FIXING_DELETE; }
+      else if (key.equals("files")) { this.showFiles = true; }
+      else if (key.equals("blocks")) { this.showBlocks = true; }
+      else if (key.equals("locations")) { this.showLocations = true; }
+    }
+  }
+  
+  /**
+   * Check files on DFS, starting from the indicated path.
+   * @throws Exception
+   */
+  public void fsck() throws IOException {
+    try {
+      DFSFileInfo[] files = nn.getListing(path);
+      FsckResult res = new FsckResult();
+      res.setReplication((short) conf.getInt("dfs.replication", 3));
+      if (files != null) {
+        for (int i = 0; i < files.length; i++) {
+          check(files[i], res);
+        }
+      }
+      out.println(res);
+      if (res.isHealthy()) {
+        out.println("\n\nThe filesystem under path '" + path + "' is HEALTHY");
+      }  else {
+        out.println("\n\nThe filesystem under path '" + path + "' is CORRUPT");
+      }
+    } finally {
+      out.close();
+    }
+  }
+  
+  private void check(DFSFileInfo file, FsckResult res) throws IOException {
+    if (file.isDir()) {
+      if (showFiles)
+        out.println(file.getPath() + " <dir>");
+      res.totalDirs++;
+      DFSFileInfo[] files = nn.getListing(file.getPath());
+      for (int i = 0; i < files.length; i++) {
+        check(files[i], res);
+      }
+      return;
+    }
+    res.totalFiles++;
+    res.totalSize += file.getLen();
+    LocatedBlock[] blocks = nn.open(file.getPath());
+    res.totalBlocks += blocks.length;
+    if (showFiles) {
+      out.print(file.getPath() + " " + file.getLen() + ", " + blocks.length + " block(s): ");
+    }  else {
+      out.print('.');
+      out.flush();
+      if (res.totalFiles % 100 == 0)        out.println();
+    }
+    int missing = 0;
+    long missize = 0;
+    StringBuffer report = new StringBuffer();
+    for (int i = 0; i < blocks.length; i++) {
+      Block block = blocks[i].getBlock();
+      long id = block.getBlockId();
+      DatanodeInfo[] locs = blocks[i].getLocations();
+      short targetFileReplication = file.getReplication();
+      if (locs.length > targetFileReplication) res.overReplicatedBlocks += (locs.length - targetFileReplication);
+      if (locs.length < targetFileReplication && locs.length > 0) res.underReplicatedBlocks += (targetFileReplication - locs.length);
+      report.append(i + ". " + id + " len=" + block.getNumBytes());
+      if (locs == null || locs.length == 0) {
+        report.append(" MISSING!");
+        res.addMissing(block.getBlockName(), block.getNumBytes());
+        missing++;
+        missize += block.getNumBytes();
+      } else {
+        report.append(" repl=" + locs.length);
+        if (showLocations) {
+          StringBuffer sb = new StringBuffer("[");
+          for (int j = 0; j < locs.length; j++) {
+            if (j > 0) sb.append(", ");
+            sb.append(locs[j]);
+          }
+          sb.append(']');
+          report.append(" " + sb.toString());
+        }
+      }
+      report.append('\n');
+    }
+    if (missing > 0) {
+      if (!showFiles)
+        out.println("\nMISSING " + missing + " blocks of total size " + missize + " B");
+      res.corruptFiles++;
+      switch(fixing) {
+        case FIXING_NONE:
+          break;
+        case FIXING_MOVE:
+          lostFoundMove(file, blocks);
+          break;
+        case FIXING_DELETE:
+          nn.delete(file.getPath());
+      }
+    }
+    if (showFiles) {
+      if (missing > 0) {
+        out.println(" MISSING " + missing + " blocks of total size " + missize + " B");
+      }  else        out.println(" OK");
+      if (showBlocks)        out.println(report.toString());
+    }
+  }
+  
+  private void lostFoundMove(DFSFileInfo file, LocatedBlock[] blocks)
+  throws IOException {
+    DFSClient dfs = new DFSClient(DataNode.createSocketAddr(
+        conf.get("fs.default.name", "local")), conf);
+    if (!lfInited) {
+      lostFoundInit(dfs);
+    }
+    if (!lfInitedOk) {
+      return;
+    }
+    String target = lostFound.toString() + file.getPath();
+    String errmsg = "Failed to move " + file.getPath() + " to /lost+found";
+    try {
+      if (!nn.mkdirs(target)) {
+        LOG.warn(errmsg);
+        return;
+      }
+      // create chains
+      int chain = 0;
+      FSOutputStream fos = null;
+      for (int i = 0; i < blocks.length; i++) {
+        LocatedBlock lblock = blocks[i];
+        DatanodeInfo[] locs = lblock.getLocations();
+        if (locs == null || locs.length == 0) {
+          if (fos != null) {
+            fos.flush();
+            fos.close();
+            fos = null;
+          }
+          continue;
+        }
+        if (fos == null) {
+          fos = dfs.create(new UTF8(target.toString() + "/" + chain), true);
+          if (fos != null) chain++;
+        }
+        if (fos == null) {
+          LOG.warn(errmsg + ": could not store chain " + chain);
+          // perhaps we should bail out here...
+          // return;
+          continue;
+        }
+        
+        // copy the block. It's a pity it's not abstracted from DFSInputStream ...
+        try {
+          copyBlock(dfs, lblock, fos);
+        } catch (Exception e) {
+          e.printStackTrace();
+          // something went wrong copying this block...
+          LOG.warn(" - could not copy block " + lblock.getBlock().getBlockName() + " to " + target);
+          fos.flush();
+          fos.close();
+          fos = null;
+        }
+      }
+      if (fos != null) fos.close();
+      LOG.warn("\n - moved corrupted file " + file.getPath() + " to /lost+found");
+      dfs.delete(new UTF8(file.getPath()));
+    }  catch (Exception e) {
+      e.printStackTrace();
+      LOG.warn(errmsg + ": " + e.getMessage());
+    }
+  }
+      
+  /*
+   * XXX (ab) Bulk of this method is copied verbatim from {@link DFSClient}, which is
+   * bad. Both places should be refactored to provide a method to copy blocks
+   * around.
+   */
+      private void copyBlock(DFSClient dfs, LocatedBlock lblock,
+          FSOutputStream fos) throws Exception {
+    int failures = 0;
+    InetSocketAddress targetAddr = null;
+    TreeSet deadNodes = new TreeSet();
+    Socket s = null;
+    DataInputStream in = null;
+    DataOutputStream out = null;
+    while (s == null) {
+      DatanodeInfo chosenNode;
+      
+      try {
+        chosenNode = bestNode(dfs, lblock.getLocations(), deadNodes);
+        targetAddr = DataNode.createSocketAddr(chosenNode.getName());
+      }  catch (IOException ie) {
+        if (failures >= DFSClient.MAX_BLOCK_ACQUIRE_FAILURES) {
+          throw new IOException("Could not obtain block " + lblock);
+        }
+        LOG.info("Could not obtain block from any node:  " + ie);
+        try {
+          Thread.sleep(10000);
+        }  catch (InterruptedException iex) {
+        }
+        deadNodes.clear();
+        failures++;
+        continue;
+      }
+      try {
+        s = new Socket();
+        s.connect(targetAddr, FSConstants.READ_TIMEOUT);
+        s.setSoTimeout(FSConstants.READ_TIMEOUT);
+        
+        //
+        // Xmit header info to datanode
+        //
+        out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
+        out.write(FSConstants.OP_READSKIP_BLOCK);
+        lblock.getBlock().write(out);
+        out.writeLong(0L);
+        out.flush();
+        
+        //
+        // Get bytes in block, set streams
+        //
+        in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
+        long curBlockSize = in.readLong();
+        long amtSkipped = in.readLong();
+        if (curBlockSize != lblock.getBlock().len) {
+          throw new IOException("Recorded block size is " + lblock.getBlock().len + ", but datanode reports size of " + curBlockSize);
+        }
+        if (amtSkipped != 0L) {
+          throw new IOException("Asked for offset of " + 0L + ", but only received offset of " + amtSkipped);
+        }
+      }  catch (IOException ex) {
+        // Put chosen node into dead list, continue
+        LOG.info("Failed to connect to " + targetAddr + ":" + ex);
+        deadNodes.add(chosenNode);
+        if (s != null) {
+          try {
+            s.close();
+          } catch (IOException iex) {
+          }
+        }
+        s = null;
+      }
+    }
+    if (in == null) {
+      throw new Exception("Could not open data stream for " + lblock.getBlock().getBlockName());
+    }
+    byte[] buf = new byte[1024];
+    int cnt = 0;
+    boolean success = true;
+    try {
+      while ((cnt = in.read(buf)) > 0) {
+        fos.write(buf, 0, cnt);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      success = false;
+    } finally {
+      try {in.close(); } catch (Exception e1) {}
+      try {out.close(); } catch (Exception e1) {}
+      try {s.close(); } catch (Exception e1) {}
+    }
+    if (!success)
+      throw new Exception("Could not copy block data for " + lblock.getBlock().getBlockName());
+  }
+      
+  /*
+   * XXX (ab) See comment above for copyBlock().
+   *
+   * Pick the best node from which to stream the data.
+   * That's the local one, if available.
+   */
+      Random r = new Random();
+  private DatanodeInfo bestNode(DFSClient dfs, DatanodeInfo[] nodes,
+      TreeSet deadNodes) throws IOException {
+    if ((nodes == null) ||
+            (nodes.length - deadNodes.size() < 1)) {
+      throw new IOException("No live nodes contain current block");
+    }
+    DatanodeInfo chosenNode = null;
+    for (int i = 0; i < nodes.length; i++) {
+      if (deadNodes.contains(nodes[i])) {
+        continue;
+      }
+      String nodename = nodes[i].getName();
+      int colon = nodename.indexOf(':');
+      if (colon >= 0) {
+        nodename = nodename.substring(0, colon);
+      }
+      if (dfs.localName.equals(nodename)) {
+        chosenNode = nodes[i];
+        break;
+      }
+    }
+    if (chosenNode == null) {
+      do  {
+        chosenNode = nodes[Math.abs(r.nextInt())  % nodes.length];
+      } while (deadNodes.contains(chosenNode));
+    }
+    return chosenNode;
+  }
+  
+  private void lostFoundInit(DFSClient dfs) {
+    lfInited = true;
+    try {
+      UTF8 lfName = new UTF8("/lost+found");
+      // check that /lost+found exists
+      if (!dfs.exists(lfName)) {
+        lfInitedOk = dfs.mkdirs(lfName);
+        lostFound = lfName;
+      } else        if (!dfs.isDirectory(lfName)) {
+          LOG.warn("Cannot use /lost+found : a regular file with this name exists.");
+          lfInitedOk = false;
+        }  else { // exists and isDirectory
+          lostFound = lfName;
+          lfInitedOk = true;
+        }
+    }  catch (Exception e) {
+      e.printStackTrace();
+      lfInitedOk = false;
+    }
+    if (lostFound == null) {
+      LOG.warn("Cannot initialize /lost+found .");
+      lfInitedOk = false;
+    }
+  }
+  
+  /**
+   * @param args
+   */
+  public int run(String[] args) throws Exception {
+    
+    return 0;
+  }
+  
+  /**
+   * FsckResult of checking, plus overall DFS statistics.
+   *
+   * @author Andrzej Bialecki
+   */
+  public class FsckResult {
+    private ArrayList missingIds = new ArrayList();
+    private long missingSize = 0L;
+    private long corruptFiles = 0L;
+    private long overReplicatedBlocks = 0L;
+    private long underReplicatedBlocks = 0L;
+    private int replication = 0;
+    private long totalBlocks = 0L;
+    private long totalFiles = 0L;
+    private long totalDirs = 0L;
+    private long totalSize = 0L;
+    
+    /**
+     * DFS is considered healthy if there are no missing blocks.
+     * @return
+     */
+    public boolean isHealthy() {
+      return missingIds.size() == 0;
+    }
+    
+    /** Add a missing block name, plus its size. */
+    public void addMissing(String id, long size) {
+      missingIds.add(id);
+      missingSize += size;
+    }
+    
+    /** Return a list of missing block names (as list of Strings). */
+    public ArrayList getMissingIds() {
+      return missingIds;
+    }
+    
+    /** Return total size of missing data, in bytes. */
+    public long getMissingSize() {
+      return missingSize;
+    }
+    
+    public void setMissingSize(long missingSize) {
+      this.missingSize = missingSize;
+    }
+    
+    /** Return the number of over-replicsted blocks. */
+    public long getOverReplicatedBlocks() {
+      return overReplicatedBlocks;
+    }
+    
+    public void setOverReplicatedBlocks(long overReplicatedBlocks) {
+      this.overReplicatedBlocks = overReplicatedBlocks;
+    }
+    
+    /** Return the actual replication factor. */
+    public float getReplicationFactor() {
+      if (totalBlocks != 0)
+        return (float) (totalBlocks * replication + overReplicatedBlocks - underReplicatedBlocks) / (float) totalBlocks;
+      else
+        return 0.0f;
+    }
+    
+    /** Return the number of under-replicated blocks. Note: missing blocks are not counted here.*/
+    public long getUnderReplicatedBlocks() {
+      return underReplicatedBlocks;
+    }
+    
+    public void setUnderReplicatedBlocks(long underReplicatedBlocks) {
+      this.underReplicatedBlocks = underReplicatedBlocks;
+    }
+    
+    /** Return total number of directories encountered during this scan. */
+    public long getTotalDirs() {
+      return totalDirs;
+    }
+    
+    public void setTotalDirs(long totalDirs) {
+      this.totalDirs = totalDirs;
+    }
+    
+    /** Return total number of files encountered during this scan. */
+    public long getTotalFiles() {
+      return totalFiles;
+    }
+    
+    public void setTotalFiles(long totalFiles) {
+      this.totalFiles = totalFiles;
+    }
+    
+    /** Return total size of scanned data, in bytes. */
+    public long getTotalSize() {
+      return totalSize;
+    }
+    
+    public void setTotalSize(long totalSize) {
+      this.totalSize = totalSize;
+    }
+    
+    /** Return the intended replication factor, against which the over/under-
+     * replicated blocks are counted. Note: this values comes from the current
+     * Configuration supplied for the tool, so it may be different from the
+     * value in DFS Configuration.
+     */
+    public int getReplication() {
+      return replication;
+    }
+    
+    public void setReplication(int replication) {
+      this.replication = replication;
+    }
+    
+    /** Return the total number of blocks in the scanned area. */
+    public long getTotalBlocks() {
+      return totalBlocks;
+    }
+    
+    public void setTotalBlocks(long totalBlocks) {
+      this.totalBlocks = totalBlocks;
+    }
+    
+    public String toString() {
+      StringBuffer res = new StringBuffer();
+      res.append("Status: " + (isHealthy() ? "HEALTHY" : "CORRUPT"));
+      res.append("\n Total size:\t" + totalSize + " B");
+      res.append("\n Total blocks:\t" + totalBlocks);
+      if (totalBlocks > 0) res.append(" (avg. block size "
+          + (totalSize / totalBlocks) + " B)");
+      res.append("\n Total dirs:\t" + totalDirs);
+      res.append("\n Total files:\t" + totalFiles);
+      if (missingSize > 0) {
+        res.append("\n  ********************************");
+        res.append("\n  CORRUPT FILES:\t" + corruptFiles);
+        res.append("\n  MISSING BLOCKS:\t" + missingIds.size());
+        res.append("\n  MISSING SIZE:\t\t" + missingSize + " B");
+        res.append("\n  ********************************");
+      }
+      res.append("\n Over-replicated blocks:\t" + overReplicatedBlocks);
+      if (totalBlocks > 0)        res.append(" (" + ((float) (overReplicatedBlocks * 100) / (float) totalBlocks) + " %)");
+      res.append("\n Under-replicated blocks:\t" + underReplicatedBlocks);
+      if (totalBlocks > 0)        res.append(" (" + ((float) (underReplicatedBlocks * 100) / (float) totalBlocks) + " %)");
+      res.append("\n Target replication factor:\t" + replication);
+      res.append("\n Real replication factor:\t" + getReplicationFactor());
+      return res.toString();
+    }
+    
+    /** Return the number of currupted files. */
+    public long getCorruptFiles() {
+      return corruptFiles;
+    }
+    
+    public void setCorruptFiles(long corruptFiles) {
+      this.corruptFiles = corruptFiles;
+    }
+  }
+}
\ No newline at end of file

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java?view=auto&rev=464710
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java Mon Oct 16 15:58:26 2006
@@ -0,0 +1,148 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.Random;
+import junit.framework.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.CopyFiles;
+
+
+/**
+ * A JUnit test for doing fsck
+ *
+ * @author Milind Bhandarkar
+ */
+public class TestFsck extends TestCase {
+  
+  private static final int NFILES = 20;
+  private static String TEST_ROOT_DIR =
+    new Path(System.getProperty("test.build.data","/tmp"))
+    .toString().replace(' ', '+');
+
+  /** class MyFile contains enough information to recreate the contents of
+   * a single file.
+   */
+  private static class MyFile {
+    private static Random gen = new Random();
+    private static final int MAX_LEVELS = 3;
+    private static final int MAX_SIZE = 8*1024;
+    private static String[] dirNames = {
+      "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"
+    };
+    private String name = "";
+    private int size;
+    private long seed;
+    
+    MyFile() {
+      int nLevels = gen.nextInt(MAX_LEVELS);
+      if(nLevels != 0) {
+        int[] levels = new int[nLevels];
+        for (int idx = 0; idx < nLevels; idx++) {
+          levels[idx] = gen.nextInt(10);
+        }
+        StringBuffer sb = new StringBuffer();
+        for (int idx = 0; idx < nLevels; idx++) {
+          sb.append(dirNames[levels[idx]]);
+          sb.append("/");
+        }
+        name = sb.toString();
+      }
+      long fidx = -1;
+      while (fidx < 0) { fidx = gen.nextLong(); }
+      name = name + Long.toString(fidx);
+      size = gen.nextInt(MAX_SIZE);
+      seed = gen.nextLong();
+    }
+    
+    String getName() { return name; }
+    int getSize() { return size; }
+    long getSeed() { return seed; }
+  }
+  
+  public TestFsck(String testName) {
+    super(testName);
+  }
+
+  
+  
+  protected void setUp() throws Exception {
+  }
+
+  protected void tearDown() throws Exception {
+  }
+  
+  /** create NFILES with random names and directory hierarchies
+   * with random (but reproducible) data in them.
+   */
+  private static MyFile[] createFiles(String fsname, String topdir)
+  throws IOException {
+    MyFile[] files = new MyFile[NFILES];
+    
+    for (int idx = 0; idx < NFILES; idx++) {
+      files[idx] = new MyFile();
+    }
+    
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(fsname, conf);
+    Path root = new Path(topdir);
+    
+    for (int idx = 0; idx < NFILES; idx++) {
+      Path fPath = new Path(root, files[idx].getName());
+      fs.mkdirs(fPath.getParent());
+      FSDataOutputStream out = fs.create(fPath);
+      byte[] toWrite = new byte[files[idx].getSize()];
+      Random rb = new Random(files[idx].getSeed());
+      rb.nextBytes(toWrite);
+      out.write(toWrite);
+      out.close();
+      toWrite = null;
+    }
+    
+    return files;
+  }
+  
+  /** delete directory and everything underneath it.*/
+  private static void deldir(String fsname, String topdir)
+  throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(fsname, conf);
+    Path root = new Path(topdir);
+    fs.delete(root);
+  }
+  
+  /** do fsck */
+  public void testFsck() throws Exception {
+    String namenode = null;
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster(65314, conf, false);
+      namenode = conf.get("fs.default.name", "local");
+      if (!"local".equals(namenode)) {
+        MyFile[] files = createFiles(namenode, "/srcdat");
+        assertEquals(0, new DFSck().doMain(conf, new String[] {"/"}));
+        deldir(namenode, "/srcdat");
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+}

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?view=diff&rev=464710&r1=464709&r2=464710
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Mon Oct 16 15:58:26 2006
@@ -159,8 +159,9 @@
   /** copy files from local file system to local file system */
   public void testCopyFromLocalToLocal() throws Exception {
     MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
-    CopyFiles.main(new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
-      "file://"+TEST_ROOT_DIR+"/destdat"});
+    new CopyFiles().doMain(new Configuration(),
+        new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
+          "file://"+TEST_ROOT_DIR+"/destdat"});
     assertTrue("Source and destination directories do not match.",
         checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
     deldir("local", TEST_ROOT_DIR+"/destdat");
@@ -177,7 +178,7 @@
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");
-        CopyFiles.main(new String[] {"dfs://"+namenode+"/srcdat",
+        new CopyFiles().doMain(conf, new String[] {"dfs://"+namenode+"/srcdat",
         "dfs://"+namenode+"/destdat"});
         assertTrue("Source and destination directories do not match.",
             checkFiles(namenode, "/destdat", files));
@@ -199,7 +200,7 @@
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
-        CopyFiles.main(new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
+        new CopyFiles().doMain(conf, new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
         "dfs://"+namenode+"/destdat"});
         assertTrue("Source and destination directories do not match.",
             checkFiles(namenode, "/destdat", files));
@@ -221,7 +222,7 @@
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");
-        CopyFiles.main(new String[] {"dfs://"+namenode+"/srcdat",
+        new CopyFiles().doMain(conf, new String[] {"dfs://"+namenode+"/srcdat",
         "file://"+TEST_ROOT_DIR+"/destdat"});
         assertTrue("Source and destination directories do not match.",
             checkFiles("local", TEST_ROOT_DIR+"/destdat", files));