You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/06 00:17:17 UTC

svn commit: r440508 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/util/ src/test/org/apache/hadoop/dfs/

Author: cutting
Date: Tue Sep  5 15:17:14 2006
New Revision: 440508

URL: http://svn.apache.org/viewvc?view=rev&rev=440508
Log:
Manage multiple volumes with a single DataNode.  Contributed by Milind.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Sep  5 15:17:14 2006
@@ -114,6 +114,12 @@
     replacing them with Text for better performance.
     (Hairong Kuang via cutting)
 
+29. HADOOP-64.  Manage multiple volumes with a single DataNode.
+    Previously DataNode would create a separate daemon per configured
+    volume, each with its own connection to the NameNode.  Now all
+    volumes are handled by a single DataNode daemon, reducing the load
+    on the NameNode.  (Milind Bhandarkar via cutting)
+
 
 Release 0.5.0 - 2006-08-04
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Tue Sep  5 15:17:14 2006
@@ -140,6 +140,14 @@
 </property>
 
 <property>
+  <name>dfs.client.buffer.dir</name>
+  <value>${hadoop.tmp.dir}/dfs/tmp</value>
+  <description>Determines where on the local filesystem an DFS client
+  should store its blocks before it sends them to the datanode.
+  </description>
+</property>
+
+<property>
   <name>dfs.data.dir</name>
   <value>${hadoop.tmp.dir}/dfs/data</value>
   <description>Determines where on the local filesystem an DFS data node

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue Sep  5 15:17:14 2006
@@ -776,7 +776,7 @@
         }
 
         private File newBackupFile() throws IOException {
-          File result = conf.getFile("dfs.data.dir",
+          File result = conf.getFile("dfs.client.buffer.dir",
                                      "tmp"+File.separator+
                                      "client-"+Math.abs(r.nextLong()));
           result.deleteOnExit();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue Sep  5 15:17:14 2006
@@ -86,7 +86,6 @@
         return new InetSocketAddress(host, port);
     }
 
-    private static Map subDataNodeList = null;
     DatanodeProtocol namenode;
     FSDataset data;
     DatanodeRegistration dnRegistration;
@@ -148,12 +147,12 @@
     DataNodeMetrics myMetrics = new DataNodeMetrics();
 
     /**
-     * Create the DataNode given a configuration and a dataDir.
-     * 'dataDir' is where the blocks are stored.
+     * Create the DataNode given a configuration and an array of dataDirs.
+     * 'dataDirs' is where the blocks are stored.
      */
-    public DataNode(Configuration conf, String datadir) throws IOException {
+    DataNode(Configuration conf, String[] dataDirs) throws IOException {
         this(InetAddress.getLocalHost().getHostName(), 
-             new File(datadir),
+             dataDirs,
              createSocketAddr(conf.get("fs.default.name", "local")), conf);
         int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
         this.infoServer = new StatusHttpServer("datanode", infoServerPort, true);
@@ -176,11 +175,22 @@
      * @see DataStorage
      */
     private DataNode(String machineName, 
-                    File datadir, 
+                    String[] dataDirs, 
                     InetSocketAddress nameNodeAddr, 
                     Configuration conf ) throws IOException {
-      // get storage info and lock the data dir
-      storage = new DataStorage( datadir );
+      File[] volumes = new File[dataDirs.length];
+      for (int idx = 0; idx < dataDirs.length; idx++) {
+        volumes[idx] = new File(dataDirs[idx]);
+      }
+      // get storage info and lock the data dirs
+      storage = new DataStorage( volumes );
+      int numDirs = storage.getNumLocked();
+      if (numDirs == 0) { // all data dirs are in use
+        throw new IOException("Cannot start multiple Datanode instances "
+                              + "sharing the same data directories.\n"
+                              + StringUtils.arrayToString(dataDirs) + " are locked. ");
+      }
+      volumes = storage.getLockedDirs();
       // connect to name node
       this.namenode = (DatanodeProtocol) 
           RPC.waitForProxy(DatanodeProtocol.class,
@@ -207,7 +217,7 @@
                                         -1,
                                         "" );
       // initialize data node internal structure
-      this.data = new FSDataset(datadir, conf);
+      this.data = new FSDataset(volumes, conf);
       this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
 
       long blockReportIntervalBasis =
@@ -251,7 +261,7 @@
       dnRegistration = namenode.register( dnRegistration );
       if( storage.getStorageID().equals("") ) {
         storage.setStorageID( dnRegistration.getStorageID());
-        storage.write();
+        storage.writeAll();
       }
     }
 
@@ -267,24 +277,11 @@
         this.shouldRun = false;
         ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
         try {
-          this.storage.close();
+          this.storage.closeAll();
         } catch (IOException ie) {
         }
     }
 
-    /**
-     * Shut down all datanodes that where started via the run(conf) method.
-     * Returns only after shutdown is complete.
-     */
-    public static void shutdownAll(){
-      if(subDataNodeList != null && !subDataNodeList.isEmpty()){
-        for (Iterator iterator = subDataNodeList.keySet().iterator(); iterator.hasNext();) {
-          DataNode dataNode = (DataNode) iterator.next();
-          dataNode.shutdown();
-        }
-      }
-    }
-
     void handleDiskError( String errMsgr ) {
         LOG.warn( "DataNode is shutting down.\n" + errMsgr );
         try {
@@ -940,7 +937,7 @@
      * Only stop when "shouldRun" is turned off (which can only happen at shutdown).
      */
     public void run() {
-        LOG.info("Starting DataNode in: "+data.data);
+        LOG.info("Starting DataNode in: "+data);
         
         // start dataXceiveServer
         dataXceiveServer.start();
@@ -966,40 +963,50 @@
         } catch (InterruptedException ie) {
         }
         
-        LOG.info("Finishing DataNode in: "+data.data);
+        LOG.info("Finishing DataNode in: "+data);
     }
 
-    /** Start datanode daemons.
-     * Start a datanode daemon for each comma separated data directory
-     * specified in property dfs.data.dir
+    private static ArrayList dataNodeList = new ArrayList();
+    private static ArrayList dataNodeThreadList = new ArrayList();
+    
+    /** Start datanode daemon.
      */
     public static void run(Configuration conf) throws IOException {
         String[] dataDirs = conf.getStrings("dfs.data.dir");
-        subDataNodeList = new HashMap(dataDirs.length);
-        for (int i = 0; i < dataDirs.length; i++) {
-          DataNode dn = makeInstanceForDir(dataDirs[i], conf);
-          if (dn != null) {
-            Thread t = new Thread(dn, "DataNode: "+dataDirs[i]);
-            t.setDaemon(true); // needed for JUnit testing
-            t.start();
-            subDataNodeList.put(dn,t);
-          }
+        DataNode dn = makeInstance(dataDirs, conf);
+        dataNodeList.add(dn);
+        if (dn != null) {
+          Thread t = new Thread(dn, "DataNode: [" +
+              StringUtils.arrayToString(dataDirs) + "]");
+          t.setDaemon(true); // needed for JUnit testing
+          t.start();
+          dataNodeThreadList.add(t);
         }
     }
+    
+    /**
+     * Shut down all datanodes that where started via the run(conf) method.
+     * Returns only after shutdown is complete.
+     */
+    public static void shutdownAll(){
+      if(!dataNodeList.isEmpty()){
+        for (Iterator iterator = dataNodeList.iterator(); iterator.hasNext();) {
+          DataNode dataNode = (DataNode) iterator.next();
+          dataNode.shutdown();
+        }
+      }
+    }
+
 
-  /** Start datanode daemons.
-   * Start a datanode daemon for each comma separated data directory
-   * specified in property dfs.data.dir and wait for them to finish.
-   * If this thread is specifically interrupted, it will stop waiting.
+  /** Start a single datanode daemon and wait for it to finish.
+   *  If this thread is specifically interrupted, it will stop waiting.
    */
   private static void runAndWait(Configuration conf) throws IOException {
     run(conf);
-
-    //  Wait for sub threads to exit
-    for (Iterator iterator = subDataNodeList.entrySet().iterator(); iterator.hasNext();) {
-      Thread threadDataNode = (Thread) ((Map.Entry) iterator.next()).getValue();
+    if (dataNodeThreadList.size() > 0) {
+      Thread t = (Thread) dataNodeThreadList.remove(dataNodeThreadList.size()-1);
       try {
-        threadDataNode.join();
+        t.join();
       } catch (InterruptedException e) {
         if (Thread.currentThread().isInterrupted()) {
           // did someone knock?
@@ -1010,25 +1017,29 @@
   }
 
   /**
-   * Make an instance of DataNode after ensuring that given data directory
-   * (and parent directories, if necessary) can be created.
-   * @param dataDir where the new DataNode instance should keep its files.
+   * Make an instance of DataNode after ensuring that at least one of the
+   * given data directories (and their parent directories, if necessary)
+   * can be created.
+   * @param dataDirs List of directories, where the new DataNode instance should
+   * keep its files.
    * @param conf Configuration instance to use.
-   * @return DataNode instance for given data dir and conf, or null if directory
-   * cannot be created.
+   * @return DataNode instance for given list of data dirs and conf, or null if
+   * no directory from this directory list can be created.
    * @throws IOException
    */
-  static DataNode makeInstanceForDir(String dataDir, Configuration conf) throws IOException {
-    DataNode dn = null;
-    File data = new File(dataDir);
-    try {
+  static DataNode makeInstance(String[] dataDirs, Configuration conf)
+  throws IOException {
+    ArrayList dirs = new ArrayList();
+    for (int i = 0; i < dataDirs.length; i++) {
+      File data = new File(dataDirs[i]);
+      try {
         DiskChecker.checkDir( data );
-        dn = new DataNode(conf, dataDir);
-        return dn;
-    } catch( DiskErrorException e ) {
-        LOG.warn("Can't start DataNode because " + e.getMessage() );
-        return null;
+        dirs.add(dataDirs[i]);
+      } catch( DiskErrorException e ) {
+        LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage() );
+      }
     }
+    return ((dirs.size() > 0) ? new DataNode(conf, dataDirs) : null);
   }
 
   public String toString() {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java Tue Sep  5 15:17:14 2006
@@ -4,6 +4,10 @@
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileLock;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.io.UTF8;
 
@@ -11,33 +15,39 @@
  * Data storage information file.
  * <p>
  * During startup the datanode reads its data storage file.
- * The data storage file is stored in the dfs.data.dir directory.
+ * The data storage file is stored in all the dfs.data.dir directories.
  * It contains version and storageID.
- * Datanode holds a lock on the dataStorage file while it runs so that other 
+ * Datanode holds a lock on all the dataStorage files while it runs so that other 
  * datanodes were not able to start working with the same data storage.
- * The lock is released when the datanode stops (normally or abnormally).
+ * The locks are released when the datanode stops (normally or abnormally).
  * 
  * @author Konstantin Shvachko
  */
 class DataStorage {
   public static final String STORAGE_INFO_FILE_NAME = "storage";
+  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataStorage");
 
   // persistent fields
   private int version = 0;  /// stored version
   private String storageID; /// unique per cluster storageID
   
   // non persistent fields
-  private RandomAccessFile storageFile = null;
-  private FileLock storageLock = null;
+  private ArrayList storageFiles = new ArrayList();
+  private ArrayList storageLocks = new ArrayList();
+  
+  // cache away the names of locked dirs
+  private File[] dirs = null;
+  
+  private int numLocked = 0;
   
   /**
    * Create DataStorage and verify its version.
    * 
-   * @param datadir data storage directory
+   * @param dataDirs array of data storage directories
    * @throws IOException
    */
-  public DataStorage( File datadir ) throws IOException {
-    this( DataNode.DFS_CURRENT_VERSION, datadir );
+  public DataStorage( File[] dataDirs ) throws IOException {
+    this( DataNode.DFS_CURRENT_VERSION, dataDirs );
     
     if( version < FSConstants.DFS_CURRENT_VERSION ) // future version
       throw new IncorrectVersionException( version, "data storage" );
@@ -46,29 +56,43 @@
   /**
    * Create DataStorage.
    * 
-   * Read data storage file if exists or create it if not.
-   * Lock the file.
+   * Read data storage files if they exist or create them if not.
+   * Lock the files.
    * 
    * @param curVersion can be used to read file saved with a previous version.
-   * @param datadir data storage directory
+   * @param dataDirs Array of data storage directories
    * @throws IOException
    */
-  public DataStorage( int curVersion, File datadir ) throws IOException {
+  public DataStorage( int curVersion, File[] dataDirs ) throws IOException {
     this.version = curVersion;
-    storageFile = new RandomAccessFile( 
-                        new File(datadir, STORAGE_INFO_FILE_NAME ), 
-                        "rws" );
-    lock();
-    boolean needToSave;
-    try {
-      needToSave = read();
-    } catch( java.io.EOFException e ) {
-      storageID = "";
-      needToSave = true;
-    }
+    for (int idx = 0; idx < dataDirs.length; idx++) {
+      storageFiles.add(idx, new RandomAccessFile( 
+                          new File(dataDirs[idx], STORAGE_INFO_FILE_NAME ), 
+                          "rws" ));
+      lock(idx);
+      boolean needToSave;
+      try {
+        needToSave = read(idx);
+      } catch( java.io.EOFException e ) {
+        storageID = "";
+        needToSave = true;
+      }
     
-    if( needToSave )
-      write();
+      if( needToSave ) { write(idx); }
+      
+      RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
+      if (file != null) { numLocked++; }
+    }
+    if (numLocked > 0) {
+      this.dirs = new File[numLocked];
+      int curidx = 0;
+      for (int idx = 0; idx < dataDirs.length; idx++) {
+        if (storageFiles.get(idx) != null) {
+          dirs[curidx] = dataDirs[idx];
+          curidx++;
+        }
+      }
+    }
   }
   
   public int getVersion() {
@@ -79,6 +103,14 @@
     return storageID;
   }
   
+  public int getNumLocked() {
+    return numLocked;
+  }
+  
+  public File[] getLockedDirs() {
+    return dirs;
+  }
+  
   public void setStorageID( String newStorageID ) {
     this.storageID = newStorageID;
   }
@@ -88,58 +120,93 @@
   }
   
   /**
-   * Lock datastoarge file.
+   * Lock datastorage file.
    * 
    * @throws IOException
    */
-  public void lock() throws IOException {
-    storageLock = storageFile.getChannel().tryLock();
-    if( storageLock == null )
-      throw new IOException( "Cannot start multiple Datanode instances "
-                              + "sharing the same data directory.\n" 
-                              + STORAGE_INFO_FILE_NAME + " is locked. ");
+  private void lock(int idx) throws IOException {
+    RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
+    FileLock lock = file.getChannel().tryLock();
+    if (lock == null) {
+      // log a warning
+      LOG.warn("Cannot lock storage file in directory "+dirs[idx].getName());
+      // remove the file from fileList, and close it
+      storageFiles.add(idx, null);
+      file.close();
+    }
+    storageLocks.add(idx, lock);
   }
   
   /**
-   * Unlock datastoarge file.
+   * Unlock datastorage file.
+   * @param idx File index
    * 
    * @throws IOException
    */
-  public void unlock() throws IOException {
-    storageLock.release();
+  private void unlock(int idx) throws IOException {
+    FileLock lock = (FileLock) storageLocks.get(idx);
+    if (lock != null) { lock.release(); }
   }
   
   /**
-   * Close datastoarge file.
-   * 
+   * Close a datastorage file.
+   * @param idx file index
    * @throws IOException
    */
-  public void close() throws IOException {
-    storageLock.release();
-    storageFile.close();
+  private void close(int idx) throws IOException {
+    FileLock lock = (FileLock) storageLocks.get(idx);
+    if (lock == null) { return; }
+    lock.release();
+    RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
+    file.close();
+  }
+  
+  /**
+   * Close all datastorage files.
+   * @throws IOException
+   */
+  public void closeAll() throws IOException {
+    for (int idx = 0; idx < dirs.length; idx++) {
+      close(idx);
+    }
   }
   
   /**
    * Read data storage file.
-   * 
+   * @param idx File index
    * @return whether the data storage file need to be updated.
    * @throws IOException
    */
-  public boolean read() throws IOException {
-    storageFile.seek(0);
-    this.version = storageFile.readInt();
-    this.storageID = UTF8.readString( storageFile );
+  private boolean read(int idx) throws IOException {
+    RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
+    if (file == null) { return false; }
+    file.seek(0);
+    this.version = file.readInt();
+    this.storageID = UTF8.readString( file );
     return false;
   }
 
   /**
    * Write data storage file.
-   * 
+   * @param idx File index
    * @throws IOException
    */
-  public void write() throws IOException {
-    storageFile.seek(0);
-    storageFile.writeInt( this.version );
-    UTF8.writeString( storageFile, this.storageID );
+  private void write(int idx) throws IOException {
+    RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
+    if (file == null) { return; }
+    file.seek(0);
+    file.writeInt( this.version );
+    UTF8.writeString( file, this.storageID );
   }
+  
+  /**
+   * Write all data storage files.
+   * @throws IOException
+   */
+  public void writeAll() throws IOException {
+    for (int idx = 0; idx < dirs.length; idx++) {
+      write(idx);
+    }
+  }
+  
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Tue Sep  5 15:17:14 2006
@@ -32,7 +32,6 @@
  ***************************************************/
 class FSDataset implements FSConstants {
 
-		static final double USABLE_DISK_PCT_DEFAULT = 0.98f; 
 	
   /**
      * A node type that can be built into a tree reflecting the
@@ -40,92 +39,72 @@
      */
     class FSDir {
         File dir;
+        int numBlocks = 0;
+        int myIdx = 0;
         FSDir children[];
+        FSDir siblings[];
 
         /**
          */
-        public FSDir(File dir) {
+        public FSDir(File dir, int myIdx, FSDir[] siblings) {
             this.dir = dir;
+            this.myIdx = myIdx;
+            this.siblings = siblings;
             this.children = null;
+            if (! dir.exists()) {
+              dir.mkdirs();
+            } else {
+              File[] files = dir.listFiles();
+              int numChildren = 0;
+              for (int idx = 0; idx < files.length; idx++) {
+                if (files[idx].isDirectory()) {
+                  numChildren++;
+                } else if (Block.isBlockFilename(files[idx])) {
+                  numBlocks++;
+                }
+              }
+              if (numChildren > 0) {
+                children = new FSDir[numChildren];
+                int curdir = 0;
+                for (int idx = 0; idx < files.length; idx++) {
+                  if (files[idx].isDirectory()) {
+                    children[curdir] = new FSDir(files[idx], curdir, children);
+                    curdir++;
+                  }
+                }
+              }
+            }
         }
 
         /**
          */
-        public File getDirName() {
-            return dir;
-        }
-
-        /**
-         */
-        public FSDir[] getChildren() {
-            return children;
-        }
-
-        /**
-         */
-        public void addBlock(Block b, File src) {
-            addBlock(b, src, b.getBlockId(), 0);
-        }
-
-        /**
-         */
-        void addBlock(Block b, File src, long blkid, int depth) {
-            //
-            // Add to the local dir, if no child dirs
-            //
-            if (children == null) {
-                src.renameTo(new File(dir, b.getBlockName()));
-
-                //
-                // Test whether this dir's contents should be busted 
-                // up into subdirs.
-                //
-
-                // REMIND - mjc - sometime soon, we'll want this code
-                // working.  It prevents the datablocks from all going
-                // into a single huge directory.
-                /**
-                File localFiles[] = dir.listFiles();
-                if (localFiles.length == 16) {
-                    //
-                    // Create all the necessary subdirs
-                    //
-                    this.children = new FSDir[16];
-                    for (int i = 0; i < children.length; i++) {
-                        String str = Integer.toBinaryString(i);
-                        try {
-                            File subdir = new File(dir, "dir_" + str);
-                            subdir.mkdir();
-                            children[i] = new FSDir(subdir);
-                        } catch (StringIndexOutOfBoundsException excep) {
-                            excep.printStackTrace();
-                            System.out.println("Ran into problem when i == " + i + " an str = " + str);
-                        }
-                    }
-
-                    //
-                    // Move existing files into new dirs
-                    //
-                    for (int i = 0; i < localFiles.length; i++) {
-                        Block srcB = new Block(localFiles[i]);
-                        File dst = getBlockFilename(srcB, blkid, depth);
-                        if (!src.renameTo(dst)) {
-                            System.out.println("Unexpected problem in renaming " + src);
-                        }
-                    }
-                }
-                **/
+        public File addBlock(Block b, File src) {
+            if (numBlocks < maxBlocksPerDir) {
+              File dest = new File(dir, b.getBlockName());
+              src.renameTo(dest);
+              numBlocks += 1;
+              return dest;
             } else {
-                // Find subdir
-                children[getHalfByte(blkid, depth)].addBlock(b, src, blkid, depth+1);
+              if (siblings != null && myIdx != (siblings.length-1)) {
+                File dest = siblings[myIdx+1].addBlock(b, src);
+                if (dest != null) { return dest; }
+              }
+              if (children == null) {
+                children = new FSDir[maxBlocksPerDir];
+                for (int idx = 0; idx < maxBlocksPerDir; idx++) {
+                  children[idx] = new FSDir(
+                      new File(dir, "subdir"+idx), idx, children);
+                }
+              }
+              return children[0].addBlock(b, src);
             }
         }
 
         /**
-         * Fill in the given blockSet with any child blocks
+         * Populate the given blockSet with any child blocks
          * found at this node.
          */
-        public void getBlockInfo(TreeSet blockSet) {
+        public void getBlockInfo(TreeSet<Block> blockSet) {
             if (children != null) {
                 for (int i = 0; i < children.length; i++) {
                     children[i].getBlockInfo(blockSet);
@@ -140,38 +119,36 @@
             }
         }
 
-        /**
-         * Find the file that corresponds to the given Block
-         */
-        public File getBlockFilename(Block b) {
-            return getBlockFilename(b, b.getBlockId(), 0);
-        }
 
-        /**
-         * Helper method to find file for a Block
-         */         
-        private File getBlockFilename(Block b, long blkid, int depth) {
-            if (children == null) {
-                return new File(dir, b.getBlockName());
-            } else {
-                // 
-                // Lift the 4 bits starting at depth, going left->right.
-                // That means there are 2^4 possible children, or 16.
-                // The max depth is thus ((len(long) / 4) == 16).
-                //
-                return children[getHalfByte(blkid, depth)].getBlockFilename(b, blkid, depth+1);
+        void getVolumeMap(HashMap<Block, FSVolume> volumeMap, FSVolume volume) {
+          if (children != null) {
+                for (int i = 0; i < children.length; i++) {
+                    children[i].getVolumeMap(volumeMap, volume);
+                }
             }
-        }
 
-        /**
-         * Returns a number 0-15, inclusive.  Pulls out the right
-         * half-byte from the indicated long.
-         */
-        private int getHalfByte(long blkid, int halfByteIndex) {
-            blkid = blkid >> ((15 - halfByteIndex) * 4);
-            return (int) ((0x000000000000000F) & blkid);
+            File blockFiles[] = dir.listFiles();
+            for (int i = 0; i < blockFiles.length; i++) {
+                if (Block.isBlockFilename(blockFiles[i])) {
+                    volumeMap.put(new Block(blockFiles[i], blockFiles[i].length()), volume);
+                }
+            }
         }
         
+        void getBlockMap(HashMap<Block, File> blockMap) {
+          if (children != null) {
+                for (int i = 0; i < children.length; i++) {
+                    children[i].getBlockMap(blockMap);
+                }
+            }
+
+            File blockFiles[] = dir.listFiles();
+            for (int i = 0; i < blockFiles.length; i++) {
+                if (Block.isBlockFilename(blockFiles[i])) {
+                    blockMap.put(new Block(blockFiles[i], blockFiles[i].length()), blockFiles[i]);
+                }
+            }
+        }
         /**
          * check if a data diretory is healthy
          * @throws DiskErrorException
@@ -195,50 +172,210 @@
         }
     }
 
+    class FSVolume {
+      static final double USABLE_DISK_PCT_DEFAULT = 0.98f; 
+
+      private File dir;
+      private FSDir dataDir;
+      private File tmpDir;
+      private DF usage;
+      private long reserved;
+      private double usableDiskPct = USABLE_DISK_PCT_DEFAULT;
+    
+      FSVolume(File dir, Configuration conf) throws IOException {
+        this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
+        this.usableDiskPct = conf.getFloat("dfs.datanode.du.pct",
+            (float) USABLE_DISK_PCT_DEFAULT);
+        this.dir = dir;
+        this.dataDir = new FSDir(new File(dir, "data"), 0, null);
+        this.tmpDir = new File(dir, "tmp");
+        if (tmpDir.exists()) {
+          FileUtil.fullyDelete(tmpDir);
+        }
+        tmpDir.mkdirs();
+        this.usage = new DF(dir, conf);
+      }
+      
+      long getCapacity() throws IOException {
+        return usage.getCapacity();
+      }
+      
+      long getAvailable() throws IOException {
+        return ((long) Math.round(usableDiskPct *
+                usage.getAvailable()) - reserved);
+      }
+      
+      String getMount() throws IOException {
+        return usage.getMount();
+      }
+      
+      File createTmpFile(Block b) throws IOException {
+        File f = new File(tmpDir, b.getBlockName());
+        try {
+          if (f.exists()) {
+            throw new IOException("Unexpected problem in creating temporary file for "+
+                b + ".  File " + f + " should not be present, but is.");
+          }
+          // Create the zero-length temp file
+          //
+          if (!f.createNewFile()) {
+            throw new IOException("Unexpected problem in creating temporary file for "+
+                b + ".  File " + f + " should be creatable, but is already present.");
+          }
+        } catch (IOException ie) {
+          System.out.println("Exception!  " + ie);
+          throw ie;
+        }
+        reserved -= b.getNumBytes();
+        return f;
+      }
+      
+      File addBlock(Block b, File f) {
+        return dataDir.addBlock(b, f);
+      }
+      
+      void checkDirs() throws DiskErrorException {
+        dataDir.checkDirTree();
+        DiskChecker.checkDir(tmpDir);
+      }
+      
+      void getBlockInfo(TreeSet<Block> blockSet) {
+        dataDir.getBlockInfo(blockSet);
+      }
+      
+      void getVolumeMap(HashMap<Block, FSVolume> volumeMap) {
+        dataDir.getVolumeMap(volumeMap, this);
+      }
+      
+      void getBlockMap(HashMap<Block, File> blockMap) {
+        dataDir.getBlockMap(blockMap);
+      }
+      
+      public String toString() {
+        return dir.getAbsolutePath();
+      }
+    }
+    
+    class FSVolumeSet {
+      FSVolume[] volumes = null;
+      int curVolume = 0;
+      HashMap<String,Long> mountMap = new HashMap<String,Long>();
+      
+      FSVolumeSet(FSVolume[] volumes) {
+        this.volumes = volumes;
+      }
+      
+      FSVolume getNextVolume(long blockSize) throws IOException {
+        int startVolume = curVolume;
+        while (true) {
+          FSVolume volume = volumes[curVolume];
+          curVolume = (curVolume + 1) % volumes.length;
+          if (volume.getAvailable() >= blockSize) { return volume; }
+          if (curVolume == startVolume) {
+            throw new DiskOutOfSpaceException("Insufficient space for an additional block");
+          }
+        }
+      }
+      
+      synchronized long getCapacity() throws IOException {
+        for (int idx = 0; idx < volumes.length; idx++) {
+          String mount = volumes[idx].getMount();
+          Long capacity = new Long(volumes[idx].getCapacity());
+          mountMap.put(mount, capacity);
+        }
+        long capacity = 0L;
+        for (Iterator<Long> iter = mountMap.values().iterator(); iter.hasNext();) {
+          capacity += iter.next().longValue();
+        }
+        return capacity;
+      }
+      
+      synchronized long getRemaining() throws IOException {
+        for (int idx = 0; idx < volumes.length; idx++) {
+          String mount = volumes[idx].getMount();
+          Long remaining = new Long(volumes[idx].getCapacity());
+          mountMap.put(mount, remaining);
+        }
+        long remaining = 0L;
+        for (Iterator<Long> iter = mountMap.values().iterator(); iter.hasNext();) {
+          remaining += iter.next().longValue();
+        }
+        return remaining;
+      }
+      
+      void getBlockInfo(TreeSet<Block> blockSet) {
+        for (int idx = 0; idx < volumes.length; idx++) {
+          volumes[idx].getBlockInfo(blockSet);
+        }
+      }
+      
+      void getVolumeMap(HashMap<Block, FSVolume> volumeMap) {
+        for (int idx = 0; idx < volumes.length; idx++) {
+          volumes[idx].getVolumeMap(volumeMap);
+        }
+      }
+      
+      void getBlockMap(HashMap<Block, File> blockMap) {
+        for (int idx = 0; idx < volumes.length; idx++) {
+          volumes[idx].getBlockMap(blockMap);
+        }
+      }
+      
+      void checkDirs() throws DiskErrorException {
+        for (int idx = 0; idx < volumes.length; idx++) {
+          volumes[idx].checkDirs();
+        }
+      }
+      
+      public String toString() {
+        StringBuffer sb = new StringBuffer();
+        for (int idx = 0; idx < volumes.length; idx++) {
+          sb.append(volumes[idx].toString());
+          if (idx != volumes.length - 1) { sb.append(","); }
+        }
+        return sb.toString();
+      }
+    }
     //////////////////////////////////////////////////////
     //
     // FSDataSet
     //
     //////////////////////////////////////////////////////
 
-    DF diskUsage;
-    File data = null, tmp = null;
-    long reserved = 0;
-    double usableDiskPct = USABLE_DISK_PCT_DEFAULT;
-    FSDir dirTree;
-    TreeSet ongoingCreates = new TreeSet();
+    FSVolumeSet volumes;
+    private HashMap<Block,File> ongoingCreates = new HashMap<Block,File>();
+    private int maxBlocksPerDir = 0;
+    private HashMap<Block,FSVolume> volumeMap = null;
+    private HashMap<Block,File> blockMap = null;
 
     /**
      * An FSDataset has a directory where it loads its data files.
      */
-    public FSDataset(File dir, Configuration conf) throws IOException {
-    		this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
-    		this.usableDiskPct = conf.getFloat("dfs.datanode.du.pct", (float) USABLE_DISK_PCT_DEFAULT);
-        diskUsage = new DF( dir, conf); 
-        this.data = new File(dir, "data");
-        if (! data.exists()) {
-            data.mkdirs();
-        }
-        this.tmp = new File(dir, "tmp");
-        if (tmp.exists()) {
-            FileUtil.fullyDelete(tmp);
-        }
-        this.tmp.mkdirs();
-        this.dirTree = new FSDir(data);
+    public FSDataset(File[] dirs, Configuration conf) throws IOException {
+    	this.maxBlocksPerDir = conf.getInt("dfs.datanode.numblocks", 64);
+        FSVolume[] volArray = new FSVolume[dirs.length];
+        for (int idx = 0; idx < dirs.length; idx++) {
+          volArray[idx] = new FSVolume(dirs[idx], conf);
+        }
+        volumes = new FSVolumeSet(volArray);
+        volumeMap = new HashMap<Block,FSVolume>();
+        volumes.getVolumeMap(volumeMap);
+        blockMap = new HashMap<Block,File>();
+        volumes.getBlockMap(blockMap);
     }
 
     /**
      * Return total capacity, used and unused
      */
     public long getCapacity() throws IOException {
-        return diskUsage.getCapacity();
+        return volumes.getCapacity();
     }
 
     /**
      * Return how many bytes can still be stored in the FSDataset
      */
     public long getRemaining() throws IOException {
-        return ((long) Math.round(usableDiskPct * diskUsage.getAvailable())) - reserved;
+        return volumes.getRemaining();
     }
 
     /**
@@ -263,19 +400,6 @@
     }
 
     /**
-     * A Block b will be coming soon!
-     */
-    public boolean startBlock(Block b) throws IOException {
-        //
-        // Make sure the block isn't 'valid'
-        //
-        if (isValidBlock(b)) {
-            throw new IOException("Block " + b + " is valid, and cannot be created.");
-        }
-        return true;
-    }
-
-    /**
      * Start writing to a block file
      */
     public OutputStream writeToBlock(Block b) throws IOException {
@@ -295,41 +419,17 @@
             //
             // Is it already in the create process?
             //
-            if (ongoingCreates.contains(b)) {
-                throw new IOException("Block " + b + " has already been started (though not completed), and thus cannot be created.");
+            if (ongoingCreates.containsKey(b)) {
+                throw new IOException("Block " + b +
+                    " has already been started (though not completed), and thus cannot be created.");
             }
 
-            //
-            // Check if we have too little space
-            //
-            if (getRemaining() < blockSize) {
-                throw new DiskOutOfSpaceException("Insufficient space for an additional block");
-            }
-
-            //
-            // OK, all's well.  Register the create, adjust 
-            // 'reserved' size, & create file
-            //
-            ongoingCreates.add(b);
-            reserved += blockSize;
-            f = getTmpFile(b);
-	    try {
-		if (f.exists()) {
-		    throw new IOException("Unexpected problem in startBlock() for " + b + ".  File " + f + " should not be present, but is.");
-		}
-
-		//
-		// Create the zero-length temp file
-		//
-		if (!f.createNewFile()) {
-		    throw new IOException("Unexpected problem in startBlock() for " + b + ".  File " + f + " should be creatable, but is already present.");
-		}
-	    } catch (IOException ie) {
-                System.out.println("Exception!  " + ie);
-		ongoingCreates.remove(b);		
-		reserved -= blockSize;
-                throw ie;
-	    }
+            FSVolume v = volumes.getNextVolume(blockSize);
+            
+            // create temporary file to hold block in the designated volume
+            f = v.createTmpFile(b);
+            ongoingCreates.put(b, f);
+            volumeMap.put(b, v);
         }
 
         //
@@ -352,49 +452,31 @@
      * Complete the block write!
      */
     public void finalizeBlock(Block b) throws IOException {
-        File f = getTmpFile(b);
-        if (! f.exists()) {
-            throw new IOException("No temporary file " + f + " for block " + b);
-        }
+      synchronized (ongoingCreates) {
+        File f = ongoingCreates.get(b);
+        if (f == null || ! f.exists()) {
+          throw new IOException("No temporary file " + f + " for block " + b);
+        }
+        long finalLen = f.length();
+        b.setNumBytes(finalLen);
+        FSVolume v = volumeMap.get(b);
         
-        synchronized (ongoingCreates) {
-            //
-            // Make sure still registered as ongoing
-            //
-            if (! ongoingCreates.contains(b)) {
-                throw new IOException("Tried to finalize block " + b + ", but not in ongoingCreates table");
-            }
-
-            long finalLen = f.length();
-            b.setNumBytes(finalLen);
-
-            //
-            // Move the file
-            // (REMIND - mjc - shame to move the file within a synch
-            // section!  Maybe remove this?)
-            //
-            dirTree.addBlock(b, f);
-
-            //
-            // Done, so deregister from ongoingCreates
-            //
-            if (! ongoingCreates.remove(b)) {
-                throw new IOException("Tried to finalize block " + b + ", but could not find it in ongoingCreates after file-move!");
-            } 
-            reserved -= b.getNumBytes();
-        }
+        File dest = v.addBlock(b, f);
+        blockMap.put(b, dest);
+        ongoingCreates.remove(b);
+      }
     }
 
     /**
      * Return a table of block data
      */
     public Block[] getBlockReport() {
-        TreeSet blockSet = new TreeSet();
-        dirTree.getBlockInfo(blockSet);
+        TreeSet<Block> blockSet = new TreeSet<Block>();
+        volumes.getBlockInfo(blockSet);
         Block blockTable[] = new Block[blockSet.size()];
         int i = 0;
-        for (Iterator it = blockSet.iterator(); it.hasNext(); i++) {
-            blockTable[i] = (Block) it.next();
+        for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) {
+            blockTable[i] = it.next();
         }
         return blockTable;
     }
@@ -404,11 +486,7 @@
      */
     public boolean isValidBlock(Block b) {
         File f = getFile(b);
-        if (f.exists()) {
-            return true;
-        } else {
-            return false;
-        }
+        return (f!= null && f.exists());
     }
 
     /**
@@ -417,31 +495,22 @@
      * just get rid of it.
      */
     public void invalidate(Block invalidBlks[]) throws IOException {
-        for (int i = 0; i < invalidBlks.length; i++) {
-            File f = getFile(invalidBlks[i]);
-
-            // long len = f.length();
-            if (!f.delete()) {
-                throw new IOException("Unexpected error trying to delete block " + invalidBlks[i] + " at file " + f);
-            }
-            DataNode.LOG.info("Deleting block " + invalidBlks[i]);
-        }
+      for (int i = 0; i < invalidBlks.length; i++) {
+        File f = getFile(invalidBlks[i]);
+        if (!f.delete()) {
+          throw new IOException("Unexpected error trying to delete block "
+              + invalidBlks[i] + " at file " + f);
+        }
+        blockMap.remove(invalidBlks[i]);
+        DataNode.LOG.info("Deleting block " + invalidBlks[i]);
+      }
     }
 
     /**
      * Turn the block identifier into a filename.
      */
     File getFile(Block b) {
-        // REMIND - mjc - should cache this result for performance
-        return dirTree.getBlockFilename(b);
-    }
-
-    /**
-     * Get the temp file, if this block is still being created.
-     */
-    File getTmpFile(Block b) {
-        // REMIND - mjc - should cache this result for performance
-        return new File(tmp, b.getBlockName());
+      return blockMap.get(b);
     }
 
     /**
@@ -450,15 +519,12 @@
      * @author hairong
      */
     void checkDataDir() throws DiskErrorException {
-        dirTree.checkDirTree();
-        DiskChecker.checkDir( tmp );
+        volumes.checkDirs();
     }
     
 
     public String toString() {
-      return "FSDataset{" +
-        "dirpath='" + diskUsage.getDirPath() + "'" +
-        "}";
+      return "FSDataset{dirpath='"+volumes+"'}";
     }
 
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java Tue Sep  5 15:17:14 2006
@@ -79,4 +79,22 @@
     }
     return numFormat.format(result) + suffix;
   }
+  
+  /**
+   * Given an array of strings, return a comma-separated list of its elements.
+   * @param strs Array of strings
+   * @return Empty string if strs.length is 0, comma separated list of strings
+   * otherwise
+   */
+  
+  public static String arrayToString(String[] strs) {
+    if (strs.length == 0) { return ""; }
+    StringBuffer sbuf = new StringBuffer();
+    sbuf.append(strs[0]);
+    for (int idx = 1; idx < strs.length; idx++) {
+      sbuf.append(",");
+      sbuf.append(strs[idx]);
+    }
+    return sbuf.toString();
+  }
 }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java Tue Sep  5 15:17:14 2006
@@ -227,12 +227,13 @@
       conf.set("fs.default.name", nameNodeSocketAddr);
       for (int i = 0; i < initialDNcount; i++) {
         // uniquely config real fs path for data storage for this datanode
-        String dataDir = baseDirSpecified + "/datanode" + i;
-        conf.set("dfs.data.dir", dataDir);
-        DataNode dn = DataNode.makeInstanceForDir(dataDir, conf);
+        String dataDirs[] = new String[1];
+        dataDirs[0] = baseDirSpecified + "/datanode" + i;
+        conf.set("dfs.data.dir", dataDirs[0]);
+        DataNode dn = DataNode.makeInstance(dataDirs, conf);
         if (dn != null) {
           listOfDataNodeDaemons.add(dn);
-          (new Thread(dn, "DataNode" + i + ": " + dataDir)).start();
+          (new Thread(dn, "DataNode" + i + ": " + dataDirs[0])).start();
         }
       }
       try {
@@ -365,8 +366,7 @@
           if (i != iDatanodeClosed) {
             try {
               if (checkDataDirsEmpty) {
-                File dataDir = new File(dataNode.data.diskUsage.getDirPath());
-                assertNoBlocks(dataDir);
+                assertNoBlocks(dataNode);
 
               }
               dataNode.shutdown();
@@ -408,18 +408,13 @@
     msg(summarizeThreadGroup());
   }
 
-  private void assertNoBlocks(File datanodeDir) {
-    File datanodeDataDir = new File(datanodeDir, "data");
-    String[] blockFilenames =
-        datanodeDataDir.list(
-            new FilenameFilter() {
-              public boolean accept(File dir, String name){
-                return Block.isBlockFilename(new File(dir, name));}});
+  private void assertNoBlocks(DataNode dn) {
+    Block[] blocks = dn.data.getBlockReport();
     // if this fails, the delete did not propagate because either
     //   awaitQuiescence() returned before the disk images were removed
     //   or a real failure was detected.
-    assertTrue(" data dir not empty: " + datanodeDataDir,
-               blockFilenames.length==0);
+    assertTrue(" data dir not empty: " + dn.data.volumes,
+               blocks.length==0);
   }
 
   /**

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java Tue Sep  5 15:17:14 2006
@@ -357,12 +357,13 @@
       //
       for (int i = 0; i < dataNodeNum; i++) {
         // uniquely config real fs path for data storage for this datanode
-        String dataDir = baseDirSpecified + "/datanode" + i;
-        conf.set("dfs.data.dir", dataDir);
-        DataNode dn = DataNode.makeInstanceForDir(dataDir, conf);
+        String dataDir[] = new String[1];
+        dataDir[0] = baseDirSpecified + "/datanode" + i;
+        conf.set("dfs.data.dir", dataDir[0]);
+        DataNode dn = DataNode.makeInstance(dataDir, conf);
         if (dn != null) {
           dataNodeDaemons.add(dn);
-          (new Thread(dn, "DataNode" + i + ": " + dataDir)).start();
+          (new Thread(dn, "DataNode" + i + ": " + dataDir[0])).start();
         }
       }
 	         

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Tue Sep  5 15:17:14 2006
@@ -71,9 +71,12 @@
      */
     public void run() {
       try {
-        File dataDir = new File(conf.get("dfs.data.dir"));
-        dataDir.mkdirs();
-        node = new DataNode(conf, dataDir.getPath());
+        String[] dirs = conf.getStrings("dfs.data.dir");
+        for (int idx = 0; idx < dirs.length; idx++) {
+          File dataDir = new File(dirs[idx]);
+          dataDir.mkdirs();
+        }
+        node = new DataNode(conf, dirs);
         node.run();
       } catch (Throwable e) {
         node = null;
@@ -105,7 +108,8 @@
     File base_dir = new File(System.getProperty("test.build.data"),
                              "dfs/");
     conf.set("dfs.name.dir", new File(base_dir, "name").getPath());
-    conf.set("dfs.data.dir", new File(base_dir, "data").getPath());
+    conf.set("dfs.data.dir", new File(base_dir, "data1").getPath()+","+
+        new File(base_dir, "data2").getPath());
     conf.setInt("dfs.replication", 1);
     // this timeout seems to control the minimum time for the test, so
     // decrease it considerably.