You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sh...@apache.org on 2009/09/27 00:02:11 UTC

svn commit: r819226 - in /hadoop/hdfs/branches/HDFS-265: CHANGES.txt src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java

Author: shv
Date: Sat Sep 26 22:02:10 2009
New Revision: 819226

URL: http://svn.apache.org/viewvc?rev=819226&view=rev
Log:
HDFS-627. Support replica update in datanode. Contributed by Tsz Wo (Nicholas), SZE and Hairong Kuang.

Modified:
    hadoop/hdfs/branches/HDFS-265/CHANGES.txt
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java

Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=819226&r1=819225&r2=819226&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Sat Sep 26 22:02:10 2009
@@ -40,6 +40,9 @@
     HDFS-624. Support a new algorithm for pipeline recovery and pipeline setup 
     for append. (hairong)
 
+   HDFS-627. Support replica update in data-node.
+   (Tsz Wo (Nicholas), SZE and Hairong Kuang via shv)
+
   IMPROVEMENTS
 
     HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=819226&r1=819225&r2=819226&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Sat Sep 26 22:02:10 2009
@@ -21,6 +21,7 @@
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -383,14 +384,6 @@
     }
 
     /**
-     * Returns the name of the temporary file for this block.
-     */
-    File getTmpFile(Block b) throws IOException {
-      File f = new File(tmpDir, b.getBlockName());
-      return f;
-    }
-
-    /**
      * Files used for copy-on-write. They need recovery when datanode
      * restarts.
      */
@@ -1036,6 +1029,11 @@
 
   static private void truncateBlock(File blockFile, File metaFile,
       long oldlen, long newlen) throws IOException {
+    DataNode.LOG.info("truncateBlock: blockFile=" + blockFile
+        + ", metaFile=" + metaFile
+        + ", oldlen=" + oldlen
+        + ", newlen=" + newlen);
+
     if (newlen == oldlen) {
       return;
     }
@@ -1419,11 +1417,17 @@
       // been opened for append but never modified
       return;
     }
-    ReplicaInfo newReplicaInfo = null;
+    finalizeReplica(replicaInfo);
+  }
+  
+  private synchronized FinalizedReplica finalizeReplica(ReplicaInfo replicaInfo)
+  throws IOException {
+    FinalizedReplica newReplicaInfo = null;
     if (replicaInfo.getState() == ReplicaState.RUR &&
        ((ReplicaUnderRecovery)replicaInfo).getOrignalReplicaState() == 
          ReplicaState.FINALIZED) {
-      newReplicaInfo = ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
+      newReplicaInfo = (FinalizedReplica)
+             ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
     } else {
       FSVolume v = replicaInfo.getVolume();
       File f = replicaInfo.getBlockFile();
@@ -1436,6 +1440,7 @@
       newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
     }
     volumeMap.add(newReplicaInfo);
+    return newReplicaInfo;
   }
 
   /**
@@ -1578,46 +1583,25 @@
 
   /** {@inheritDoc} */
   public void validateBlockMetadata(Block b) throws IOException {
-    ReplicaInfo info = getReplicaInfo(b);
-    FSVolume v = info.getVolume();
-    File tmp = v.getTmpFile(b);
-    File f = getFile(b);
-    if (f == null) {
-      f = tmp;
+    checkReplicaFiles(getReplicaInfo(b));
+  }
+
+  /** Check the files of a replica. */
+  static void checkReplicaFiles(final ReplicaInfo r) throws IOException {
+    final File f = r.getBlockFile();
+    if (!f.exists()) {
+      throw new FileNotFoundException("File " + f + " not found, r=" + r);
     }
-    if (f == null) {
-      throw new IOException("Block " + b + " does not exist on disk.");
+    if (r.getNumBytes() != f.length()) {
+      throw new IOException("File length mismatched."
+          + f + " length is " + f.length() + " but r=" + r);
+    }
+    final File metafile = getMetaFile(f, r);
+    if (!metafile.exists()) {
+      throw new IOException("Metafile " + metafile + " does not exist, r=" + r);
     }
-    if (!f.exists()) {
-      throw new IOException("Block " + b + 
-                            " block file " + f +
-                            " does not exist on disk.");
-    }
-    if (b.getNumBytes() != f.length()) {
-      throw new IOException("Block " + b + 
-                            " length is " + b.getNumBytes()  +
-                            " does not match block file length " +
-                            f.length());
-    }
-    File meta = getMetaFile(f, b);
-    if (meta == null) {
-      throw new IOException("Block " + b + 
-                            " metafile does not exist.");
-    }
-    if (!meta.exists()) {
-      throw new IOException("Block " + b + 
-                            " metafile " + meta +
-                            " does not exist on disk.");
-    }
-    if (meta.length() == 0) {
-      throw new IOException("Block " + b + " metafile " + meta + " is empty.");
-    }
-    long stamp = parseGenerationStamp(f, meta);
-    if (stamp != b.getGenerationStamp()) {
-      throw new IOException("Block " + b + 
-                            " genstamp is " + b.getGenerationStamp()  +
-                            " does not match meta file stamp " +
-                            stamp);
+    if (metafile.length() == 0) {
+      throw new IOException("Metafile " + metafile + " is empty, r=" + r);
     }
   }
 
@@ -2045,4 +2029,70 @@
     }
     return rur.createInfo();
   }
+
+  /** Update a replica of a block. */
+  synchronized void updateReplica(final Block block, final long recoveryId,
+      final long newlength) throws IOException {
+    //get replica
+    final ReplicaInfo replica = volumeMap.get(block.getBlockId());
+    DataNode.LOG.info("updateReplica: block=" + block
+        + ", recoveryId=" + recoveryId
+        + ", length=" + newlength
+        + ", replica=" + replica);
+
+    //check replica
+    if (replica == null) {
+      throw new ReplicaNotFoundException(block);
+    }
+
+    //check replica state
+    if (replica.getState() != ReplicaState.RUR) {
+      throw new IOException("replica.getState() != " + ReplicaState.RUR
+          + ", replica=" + replica);
+    }
+
+    //check replica files before update
+    checkReplicaFiles(replica);
+
+    //update replica
+    final FinalizedReplica finalized = updateReplicaUnderRecovery(
+        (ReplicaUnderRecovery)replica, recoveryId, newlength);
+
+    //check replica files after update
+    checkReplicaFiles(finalized);
+  }
+
+  /** Update a ReplicaUnderRecovery to a FinalizedReplica. */
+  FinalizedReplica updateReplicaUnderRecovery(
+      final ReplicaUnderRecovery rur, final long recoveryId,
+      final long newlength) throws IOException {
+    DataNode.LOG.info("updateReplicaUnderRecovery: recoveryId=" + recoveryId
+        + ", newlength=" + newlength
+        + ", rur=" + rur);
+
+    //check recovery id
+    if (rur.getRecoveryID() != recoveryId) {
+      throw new IOException("rur.getRecoveryID() != recoveryId = " + recoveryId
+          + ", rur=" + rur);
+    }
+
+    // bump rur's GS to be recovery id
+    bumpReplicaGS(rur, recoveryId);
+
+    //update length
+    final File replicafile = rur.getBlockFile();
+    if (rur.getNumBytes() < newlength) {
+      throw new IOException("rur.getNumBytes() < newlength = " + newlength
+          + ", rur=" + rur);
+    }
+    if (rur.getNumBytes() > newlength) {
+      rur.detachBlock(1);
+      truncateBlock(replicafile, rur.getMetaFile(), rur.getNumBytes(), newlength);
+      // update RUR with the new length
+      rur.setNumBytes(newlength);
+   }
+
+    // finalize the block
+    return finalizeReplica(rur);
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=819226&r1=819225&r2=819226&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java Sat Sep 26 22:02:10 2009
@@ -196,4 +196,61 @@
     }
 
   }
+
+  /** Test {@link FSDataset#updateReplicaUnderRecovery(ReplicaUnderRecovery, long, long)} */
+  @Test
+  public void testUpdateReplicaUnderRecovery() throws IOException {
+    final Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+
+    try {
+      cluster = new MiniDFSCluster(conf, 3, true, null);
+      cluster.waitActive();
+
+      //create a file
+      DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+      String filestr = "/foo";
+      Path filepath = new Path(filestr);
+      DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
+
+      //get block info
+      final LocatedBlock locatedblock = getLastLocatedBlock(
+          dfs.getClient().getNamenode(), filestr);
+      final DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
+      Assert.assertTrue(datanodeinfo.length > 0);
+
+      //get DataNode and FSDataset objects
+      final DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
+      Assert.assertTrue(datanode != null);
+      Assert.assertTrue(datanode.data instanceof FSDataset);
+      final FSDataset fsdataset = (FSDataset)datanode.data;
+
+      //initReplicaRecovery
+      final Block b = locatedblock.getBlock();
+      final long recoveryid = b.getGenerationStamp() + 1;
+      final long newlength = b.getNumBytes() - 1;
+      FSDataset.initReplicaRecovery(fsdataset.volumeMap, b, recoveryid);
+
+      //check replica
+      final ReplicaInfo replica = fsdataset.getReplica(b.getBlockId());
+      Assert.assertTrue(replica instanceof ReplicaUnderRecovery);
+      final ReplicaUnderRecovery rur = (ReplicaUnderRecovery)replica;
+
+      //check meta data before update
+      FSDataset.checkReplicaFiles(rur);
+
+      //update
+      final FinalizedReplica finalized = fsdataset.updateReplicaUnderRecovery(
+          rur, recoveryid, newlength);
+
+      //check meta data after update
+      FSDataset.checkReplicaFiles(finalized);
+      Assert.assertEquals(b.getBlockId(), finalized.getBlockId());
+      Assert.assertEquals(recoveryid, finalized.getGenerationStamp());
+      Assert.assertEquals(newlength, finalized.getNumBytes());
+
+    } finally {
+      if (cluster != null) cluster.shutdown();
+    }
+  }
 }