You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2007/10/05 01:56:04 UTC

svn commit: r582028 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/

Author: dhruba
Date: Thu Oct  4 16:56:03 2007
New Revision: 582028

URL: http://svn.apache.org/viewvc?rev=582028&view=rev
Log:
HADOOP-1955.  The Namenode tries to not pick the same source Datanode for
a replication request if the earlier replication request for the same
block and that source Datanode had failed.
(Raghu Angadi via dhruba)


Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=582028&r1=582027&r2=582028&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Oct  4 16:56:03 2007
@@ -373,6 +373,11 @@
     HADOOP-1978.  Name-node removes edits.new after a successful startup.
     (Konstantin Shvachko via dhruba)
 
+    HADOOP-1955.  The Namenode tries to not pick the same source Datanode for
+    a replication request if the earlier replication request for the same
+    block and that source Datanode had failed.
+    (Raghu Angadi via dhruba)
+
 Release 0.14.1 - 2007-09-04
 
   BUG FIXES

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=582028&r1=582027&r2=582028&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Thu Oct  4 16:56:03 2007
@@ -467,7 +467,9 @@
   private HashMap<Block,FSVolume> volumeMap = null;
   private HashMap<Block,File> blockMap = null;
   static  Random random = new Random();
-
+  
+  long blockWriteTimeout = 3600 * 1000;
+  
   /**
    * An FSDataset has a directory where it loads its data files.
    */
@@ -482,6 +484,8 @@
     volumes.getVolumeMap(volumeMap);
     blockMap = new HashMap<Block,File>();
     volumes.getBlockMap(blockMap);
+    blockWriteTimeout = Math.max(
+         conf.getInt("dfs.datanode.block.write.timeout.sec", 3600), 1) * 1000;
   }
 
   /**
@@ -559,7 +563,8 @@
       if (ongoingCreates.containsKey(b)) {
         // check how old is the temp file - wait 1 hour
         File tmp = ongoingCreates.get(b);
-        if ((System.currentTimeMillis() - tmp.lastModified()) < 3600 * 1000) {
+        if ((System.currentTimeMillis() - tmp.lastModified()) < 
+            blockWriteTimeout) {
           throw new IOException("Block " + b +
                                 " has already been started (though not completed), and thus cannot be created.");
         } else {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=582028&r1=582027&r2=582028&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Oct  4 16:56:03 2007
@@ -180,7 +180,7 @@
   private long decommissionRecheckInterval;
   // default block size of a file
   private long defaultBlockSize = 0;
-  static int replIndex = 0; // last datanode used for replication work
+  private int replIndex = 0; // last datanode used for replication work
   static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration
 
   public static FSNamesystem fsNamesystemObject;
@@ -221,7 +221,9 @@
     this.dir.loadFSImage(getNamespaceDirs(conf), startOpt);
     this.safeMode = new SafeModeInfo(conf);
     setBlockTotal();
-    pendingReplications = new PendingReplicationBlocks();
+    pendingReplications = new PendingReplicationBlocks(
+                            conf.getInt("dfs.replication.pending.timeout.sec", 
+                                        -1) * 1000);
     this.hbthread = new Daemon(new HeartbeatMonitor());
     this.lmthread = new Daemon(new LeaseMonitor());
     this.replthread = new Daemon(new ReplicationMonitor());
@@ -1923,6 +1925,7 @@
     int numiter = 0;
     int foundwork = 0;
     int hsize = 0;
+    int lastReplIndex = -1;
 
     while (true) {
       DatanodeDescriptor node = null;
@@ -1934,6 +1937,11 @@
       synchronized (heartbeats) {
         hsize = heartbeats.size();
         if (numiter++ >= hsize) {
+          // no change in replIndex.
+          if (lastReplIndex >= 0) {
+            //next time, start after where the last replication was scheduled
+            replIndex = lastReplIndex;
+          }
           break;
         }
         if (replIndex >= hsize) {
@@ -1959,6 +1967,7 @@
           doReplication = true;
           addBlocksToBeReplicated(node, (Block[])replsets[0], 
                                   (DatanodeDescriptor[][])replsets[1]);
+          lastReplIndex = replIndex;
         }
       }
       if (!doReplication) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java?rev=582028&r1=582027&r2=582028&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java Thu Oct  4 16:56:03 2007
@@ -47,7 +47,9 @@
   private long defaultRecheckInterval = 5 * 60 * 1000;
 
   PendingReplicationBlocks(long timeoutPeriod) {
-    this.timeout = timeoutPeriod;
+    if ( timeoutPeriod > 0 ) {
+      this.timeout = timeoutPeriod;
+    }
     init();
   }
 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=582028&r1=582027&r2=582028&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Thu Oct  4 16:56:03 2007
@@ -117,7 +117,9 @@
       conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
                new File(base_dir, "name2").getPath());
     }
-    conf.setInt("dfs.replication", Math.min(3, numDataNodes));
+    
+    int replication = conf.getInt("dfs.replication", 3);
+    conf.setInt("dfs.replication", Math.min(replication, numDataNodes));
     conf.setInt("dfs.safemode.extension", 0);
     conf.setInt("dfs.namenode.decommission.interval", 3 * 1000); // 3 second
     

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java?rev=582028&r1=582027&r2=582028&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java Thu Oct  4 16:56:03 2007
@@ -19,6 +19,7 @@
 
 import junit.framework.TestCase;
 import java.io.*;
+import java.util.Iterator;
 import java.util.Random;
 import java.net.*;
 
@@ -167,4 +168,152 @@
       cluster.shutdown();
     }
   }
+  
+  // Waits for all of the blocks to have expected replication
+  private void waitForBlockReplication(String filename, 
+                                       ClientProtocol namenode,
+                                       int expected, long maxWaitSec) 
+                                       throws IOException {
+    long start = System.currentTimeMillis();
+    
+    //wait for all the blocks to be replicated;
+    System.out.println("Checking for block replication for " + filename);
+    int iters = 0;
+    while (true) {
+      boolean replOk = true;
+      LocatedBlocks blocks = namenode.getBlockLocations(filename, 0, 
+                                                        Long.MAX_VALUE);
+      
+      for (Iterator<LocatedBlock> iter = blocks.getLocatedBlocks().iterator();
+           iter.hasNext();) {
+        LocatedBlock block = iter.next();
+        int actual = block.getLocations().length;
+        if ( actual < expected ) {
+          if (true || iters > 0) {
+            System.out.println("Not enough replicas for " + block.getBlock() +
+                               " yet. Expecting " + expected + ", got " + 
+                               actual + ".");
+          }
+          replOk = false;
+          break;
+        }
+      }
+      
+      if (replOk) {
+        return;
+      }
+      
+      iters++;
+      
+      if (maxWaitSec > 0 && 
+          (System.currentTimeMillis() - start) > (maxWaitSec * 1000)) {
+        throw new IOException("Timedout while waiting for all blocks to " +
+                              " be replicated for " + filename);
+      }
+      
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException ignored) {}
+    }
+  }
+  
+  /* This test makes sure that NameNode retries all the available blocks 
+   * for under replicated blocks. 
+   * 
+   * It creates a file with one block and replication of 4. It corrupts 
+   * two of the blocks and removes one of the replicas. Expected behaviour is
+   * that missing replica will be copied from one valid source.
+   */
+  public void testPendingReplicationRetry() throws IOException {
+    
+    MiniDFSCluster cluster = null;
+    int numDataNodes = 4;
+    String testFile = "/replication-test-file";
+    Path testPath = new Path(testFile);
+    
+    byte buffer[] = new byte[1024];
+    for (int i=0; i<buffer.length; i++) {
+      buffer[i] = '1';
+    }
+    
+    try {
+      Configuration conf = new Configuration();
+      conf.set("dfs.replication", Integer.toString(numDataNodes));
+      //first time format
+      cluster = new MiniDFSCluster(0, conf, numDataNodes, true,
+                                   true, null, null);
+      cluster.waitActive();
+      DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
+                                            cluster.getNameNodePort()),
+                                            conf);
+      
+      OutputStream out = cluster.getFileSystem().create(testPath);
+      out.write(buffer);
+      out.close();
+      
+      waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
+
+      // get first block of the file.
+      String block = dfsClient.namenode.
+                       getBlockLocations(testFile, 0, Long.MAX_VALUE).
+                       get(0).getBlock().toString();
+      
+      cluster.shutdown();
+      cluster = null;
+      
+      //Now mess up some of the replicas.
+      //Delete the first and corrupt the next two.
+      File baseDir = new File(System.getProperty("test.build.data"), 
+                                                 "dfs/data");
+      for (int i=0; i<25; i++) {
+        buffer[i] = '0';
+      }
+      
+      int fileCount = 0;
+      for (int i=0; i<6; i++) {
+        File blockFile = new File(baseDir, "data" + (i+1) + "/current/" + block);
+        System.out.println("Checking for file " + blockFile);
+        
+        if (blockFile.exists()) {
+          if (fileCount == 0) {
+            assertTrue(blockFile.delete());
+          } else {
+            // corrupt it.
+            long len = blockFile.length();
+            assertTrue(len > 50);
+            RandomAccessFile blockOut = new RandomAccessFile(blockFile, "rw");
+            blockOut.seek(len/3);
+            blockOut.write(buffer, 0, 25);
+          }
+          fileCount++;
+        }
+      }
+      assertEquals(3, fileCount);
+      
+      /* Start the MiniDFSCluster with more datanodes since once a writeBlock
+       * to a datanode node fails, same block can not be written to it
+       * immediately. In our case some replication attempts will fail.
+       */
+      conf = new Configuration();
+      conf.set("dfs.replication", Integer.toString(numDataNodes));
+      conf.set("dfs.replication.pending.timeout.sec", Integer.toString(2));
+      conf.set("dfs.datanode.block.write.timeout.sec", Integer.toString(5));
+      conf.set("dfs.safemode.threshold.pct", "0.75f"); // only 3 copies exist
+      
+      cluster = new MiniDFSCluster(0, conf, numDataNodes*2, false,
+                                   true, null, null);
+      cluster.waitActive();
+      
+      dfsClient = new DFSClient(new InetSocketAddress("localhost",
+                                  cluster.getNameNodePort()),
+                                  conf);
+      
+      waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
+      
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }  
 }