You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2009/09/17 03:34:47 UTC

svn commit: r816025 - in /hadoop/hdfs/branches/HDFS-265: ./ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/

Author: szetszwo
Date: Thu Sep 17 01:34:46 2009
New Revision: 816025

URL: http://svn.apache.org/viewvc?rev=816025&view=rev
Log:
HDFS-619. Support replica recovery initialization in datanode for the new append design.

Added:
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
Modified:
    hadoop/hdfs/branches/HDFS-265/CHANGES.txt
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java

Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=816025&r1=816024&r2=816025&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Thu Sep 17 01:34:46 2009
@@ -27,6 +27,9 @@
 
     HDFS-604. Block report processing for append. (shv)
 
+    HDFS-619. Support replica recovery initialization in datanode for the new
+    append design.  (szetszwo)
+
   IMPROVEMENTS
 
     HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=816025&r1=816024&r2=816025&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Thu Sep 17 01:34:46 2009
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hdfs.server.common;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 /************************************
  * Some handy internal HDFS constants
  *
@@ -109,6 +113,16 @@
     public static ReplicaState getState(int v) {
       return ReplicaState.values()[v];
     }
+
+    /** Read from in */
+    public static ReplicaState read(DataInput in) throws IOException {
+      return values()[in.readByte()];
+    }
+
+    /** Write to out */
+    public void write(DataOutput out) throws IOException {
+      out.writeByte(ordinal());
+    }
   }
 
   /**

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=816025&r1=816024&r2=816025&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu Sep 17 01:34:46 2009
@@ -94,7 +94,7 @@
       synchronized(datanode.data) { 
         this.replica = datanode.data.getReplica(block.getBlockId());
         if (replica == null) {
-          throw new IOException("Replica not found for " + block);
+          throw new ReplicaNotFoundException(block);
         }
         this.replicaVisibleLength = replica.getVisibleLength();
       }

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=816025&r1=816024&r2=816025&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu Sep 17 01:34:46 2009
@@ -1836,4 +1836,66 @@
     assert(Thread.holdsLock(this));
     return volumeMap.get(blockId);
   }
+
+  /** Initialize a replica recovery. */
+  synchronized ReplicaUnderRecovery.Info initReplicaRecovery(
+      Block block, long recoveryId) throws IOException {
+    return initReplicaRecovery(volumeMap, block, recoveryId);
+  }
+
+  /** static version of {@link #initReplicaRecovery(Block, long)}. */
+  static ReplicaUnderRecovery.Info initReplicaRecovery(
+      ReplicasMap map, Block block, long recoveryId) throws IOException {
+    final ReplicaInfo replica = map.get(block.getBlockId());
+    DataNode.LOG.info("initReplicaRecovery: block=" + block
+        + ", recoveryId=" + recoveryId
+        + ", replica=" + replica);
+
+    //check replica
+    if (replica == null) {
+      throw new ReplicaNotFoundException(block);
+    }
+
+    //stop writer if there is any
+    if (replica instanceof ReplicaInPipeline) {
+      ((ReplicaInPipeline)replica).stopWriter();
+    }
+
+    //check generation stamp
+    if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+      throw new IOException(
+          "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+          + block + ", replica=" + replica);
+    }
+
+    //check recovery id
+    if (replica.getGenerationStamp() >= recoveryId) {
+      throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
+          + " replica.getGenerationStamp() >= recoveryId = " + recoveryId
+          + ", block=" + block + ", replica=" + replica);
+    }
+
+    //check RUR
+    final ReplicaUnderRecovery rur;
+    if (replica.getState() == ReplicaState.RUR) {
+      rur = (ReplicaUnderRecovery)replica;
+      if (rur.getRecoveryID() >= recoveryId) {
+        throw new RecoveryInProgressException(
+            "rur.getRecoveryID() >= recoveryId = " + recoveryId
+            + ", block=" + block + ", rur=" + rur);
+      }
+      final long oldRecoveryID = rur.getRecoveryID();
+      rur.setRecoveryID(recoveryId);
+      DataNode.LOG.info("initReplicaRecovery: update recovery id for " + block
+          + " from " + oldRecoveryID + " to " + recoveryId);
+    }
+    else {
+      rur = new ReplicaUnderRecovery(replica, recoveryId);
+      map.add(rur);
+      DataNode.LOG.info("initReplicaRecovery: changing replica state for "
+          + block + " from " + replica.getState()
+          + " to " + rur.getState());
+    }
+    return rur.createInfo();
+  }
 }

Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java?rev=816025&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java Thu Sep 17 01:34:46 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+/**
+ * Exception indicating that a replica is already being recovery.
+ */
+class RecoveryInProgressException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  RecoveryInProgressException(String msg) {
+    super(msg);
+  }
+}
\ No newline at end of file

Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java?rev=816025&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java Thu Sep 17 01:34:46 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Exception indicating that a replica is not found.
+ */
+class ReplicaNotFoundException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  ReplicaNotFoundException(Block b) {
+    super("Replica not found for " + b);
+  }
+}
\ No newline at end of file

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java?rev=816025&r1=816024&r2=816025&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java Thu Sep 17 01:34:46 2009
@@ -17,8 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.File;
+import java.io.IOException;
 
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 
@@ -155,4 +159,44 @@
         + "\n  recoveryId=" + recoveryId
         + "\n  original=" + original;
   }
+
+  Info createInfo() {
+    return new Info(this); 
+  }
+
+  /** Replica recovery information. */
+  static class Info extends Block{
+    private ReplicaState originalState;
+
+    private Info(ReplicaUnderRecovery rur) {
+      super(rur);
+      originalState = rur.getOrignalReplicaState();
+    }
+
+    ReplicaState getOriginalReplicaState() {
+      return originalState;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      super.readFields(in);
+      originalState = ReplicaState.read(in); 
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      super.write(out);
+      originalState.write(out);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return super.equals(o);
+    }
+
+    @Override
+    public int hashCode() {
+      return super.hashCode();
+    }
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=816025&r1=816024&r2=816025&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java Thu Sep 17 01:34:46 2009
@@ -21,19 +21,20 @@
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery.Info;
 import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.junit.Assert;
+import org.junit.Test;
 
 /**
  * This tests InterDataNodeProtocol for block handling. 
@@ -111,4 +112,87 @@
       if (cluster != null) {cluster.shutdown();}
     }
   }
+
+  private static ReplicaInfo createReplicaInfo(Block b) {
+    return new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(),
+        null, null);
+  }
+
+  private static void assertEquals(ReplicaInfo originalInfo, Info recoveryInfo) {
+    Assert.assertEquals(originalInfo.getBlockId(), recoveryInfo.getBlockId());
+    Assert.assertEquals(originalInfo.getGenerationStamp(), recoveryInfo.getGenerationStamp());
+    Assert.assertEquals(originalInfo.getBytesOnDisk(), recoveryInfo.getNumBytes());
+    Assert.assertEquals(originalInfo.getState(), recoveryInfo.getOriginalReplicaState());
+  }
+
+  /** Test {@link FSDataset#initReplicaRecovery(ReplicasMap, Block, long)} */
+  @Test
+  public void testInitReplicaRecovery() throws IOException {
+    final long firstblockid = 10000L;
+    final long gs = 7777L;
+    final long length = 22L;
+    final ReplicasMap map = new ReplicasMap();
+    final Block[] blocks = new Block[5];
+    for(int i = 0; i < blocks.length; i++) {
+      blocks[i] = new Block(firstblockid + i, length, gs);
+      map.add(createReplicaInfo(blocks[i]));
+    }
+    
+    { 
+      //normal case
+      final Block b = blocks[0];
+      final ReplicaInfo originalInfo = map.get(b);
+
+      final long recoveryid = gs + 1;
+      final Info recoveryInfo = FSDataset.initReplicaRecovery(map, blocks[0], recoveryid);
+      assertEquals(originalInfo, recoveryInfo);
+
+      final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(b);
+      Assert.assertEquals(originalInfo.getBlockId(), updatedInfo.getBlockId());
+      Assert.assertEquals(recoveryid, updatedInfo.getRecoveryID());
+
+      //recover one more time 
+      final long recoveryid2 = gs + 2;
+      final Info recoveryInfo2 = FSDataset.initReplicaRecovery(map, blocks[0], recoveryid2);
+      assertEquals(originalInfo, recoveryInfo2);
+
+      final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(b);
+      Assert.assertEquals(originalInfo.getBlockId(), updatedInfo2.getBlockId());
+      Assert.assertEquals(recoveryid2, updatedInfo2.getRecoveryID());
+      
+      //case RecoveryInProgressException
+      try {
+        FSDataset.initReplicaRecovery(map, b, recoveryid);
+        Assert.fail();
+      }
+      catch(RecoveryInProgressException ripe) {
+        System.out.println("GOOD: getting " + ripe);
+      }
+    }
+
+    { //replica not found
+      final long recoveryid = gs + 1;
+      final Block b = new Block(firstblockid - 1, length, gs);
+      try {
+        FSDataset.initReplicaRecovery(map, b, recoveryid);
+        Assert.fail();
+      }
+      catch(ReplicaNotFoundException rnfe) {
+        System.out.println("GOOD: getting " + rnfe);
+      }
+    }
+    
+    { //case "THIS IS NOT SUPPOSED TO HAPPEN"
+      final long recoveryid = gs - 1;
+      final Block b = new Block(firstblockid + 1, length, gs);
+      try {
+        FSDataset.initReplicaRecovery(map, b, recoveryid);
+        Assert.fail();
+      }
+      catch(IOException ioe) {
+        System.out.println("GOOD: getting " + ioe);
+      }
+    }
+
+  }
 }