You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2009/09/17 03:34:47 UTC
svn commit: r816025 - in /hadoop/hdfs/branches/HDFS-265: ./
src/java/org/apache/hadoop/hdfs/server/common/
src/java/org/apache/hadoop/hdfs/server/datanode/
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/
Author: szetszwo
Date: Thu Sep 17 01:34:46 2009
New Revision: 816025
URL: http://svn.apache.org/viewvc?rev=816025&view=rev
Log:
HDFS-619. Support replica recovery initialization in datanode for the new append design.
Added:
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
Modified:
hadoop/hdfs/branches/HDFS-265/CHANGES.txt
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=816025&r1=816024&r2=816025&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Thu Sep 17 01:34:46 2009
@@ -27,6 +27,9 @@
HDFS-604. Block report processing for append. (shv)
+ HDFS-619. Support replica recovery initialization in datanode for the new
+ append design. (szetszwo)
+
IMPROVEMENTS
HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=816025&r1=816024&r2=816025&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Thu Sep 17 01:34:46 2009
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hdfs.server.common;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
/************************************
* Some handy internal HDFS constants
*
@@ -109,6 +113,16 @@
public static ReplicaState getState(int v) {
return ReplicaState.values()[v];
}
+
+ /** Read from in */
+ public static ReplicaState read(DataInput in) throws IOException {
+ return values()[in.readByte()];
+ }
+
+ /** Write to out */
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(ordinal());
+ }
}
/**
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=816025&r1=816024&r2=816025&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu Sep 17 01:34:46 2009
@@ -94,7 +94,7 @@
synchronized(datanode.data) {
this.replica = datanode.data.getReplica(block.getBlockId());
if (replica == null) {
- throw new IOException("Replica not found for " + block);
+ throw new ReplicaNotFoundException(block);
}
this.replicaVisibleLength = replica.getVisibleLength();
}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=816025&r1=816024&r2=816025&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu Sep 17 01:34:46 2009
@@ -1836,4 +1836,66 @@
assert(Thread.holdsLock(this));
return volumeMap.get(blockId);
}
+
+ /** Initialize a replica recovery. */
+ synchronized ReplicaUnderRecovery.Info initReplicaRecovery(
+ Block block, long recoveryId) throws IOException {
+ return initReplicaRecovery(volumeMap, block, recoveryId);
+ }
+
+ /** static version of {@link #initReplicaRecovery(Block, long)}. */
+ static ReplicaUnderRecovery.Info initReplicaRecovery(
+ ReplicasMap map, Block block, long recoveryId) throws IOException {
+ final ReplicaInfo replica = map.get(block.getBlockId());
+ DataNode.LOG.info("initReplicaRecovery: block=" + block
+ + ", recoveryId=" + recoveryId
+ + ", replica=" + replica);
+
+ //check replica
+ if (replica == null) {
+ throw new ReplicaNotFoundException(block);
+ }
+
+ //stop writer if there is any
+ if (replica instanceof ReplicaInPipeline) {
+ ((ReplicaInPipeline)replica).stopWriter();
+ }
+
+ //check generation stamp
+ if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+ throw new IOException(
+ "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+ + block + ", replica=" + replica);
+ }
+
+ //check recovery id
+ if (replica.getGenerationStamp() >= recoveryId) {
+ throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
+ + " replica.getGenerationStamp() >= recoveryId = " + recoveryId
+ + ", block=" + block + ", replica=" + replica);
+ }
+
+ //check RUR
+ final ReplicaUnderRecovery rur;
+ if (replica.getState() == ReplicaState.RUR) {
+ rur = (ReplicaUnderRecovery)replica;
+ if (rur.getRecoveryID() >= recoveryId) {
+ throw new RecoveryInProgressException(
+ "rur.getRecoveryID() >= recoveryId = " + recoveryId
+ + ", block=" + block + ", rur=" + rur);
+ }
+ final long oldRecoveryID = rur.getRecoveryID();
+ rur.setRecoveryID(recoveryId);
+ DataNode.LOG.info("initReplicaRecovery: update recovery id for " + block
+ + " from " + oldRecoveryID + " to " + recoveryId);
+ }
+ else {
+ rur = new ReplicaUnderRecovery(replica, recoveryId);
+ map.add(rur);
+ DataNode.LOG.info("initReplicaRecovery: changing replica state for "
+ + block + " from " + replica.getState()
+ + " to " + rur.getState());
+ }
+ return rur.createInfo();
+ }
}
Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java?rev=816025&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java Thu Sep 17 01:34:46 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+/**
+ * Exception indicating that a replica is already being recovery.
+ */
+class RecoveryInProgressException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ RecoveryInProgressException(String msg) {
+ super(msg);
+ }
+}
\ No newline at end of file
Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java?rev=816025&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java Thu Sep 17 01:34:46 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Exception indicating that a replica is not found.
+ */
+class ReplicaNotFoundException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ ReplicaNotFoundException(Block b) {
+ super("Replica not found for " + b);
+ }
+}
\ No newline at end of file
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java?rev=816025&r1=816024&r2=816025&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java Thu Sep 17 01:34:46 2009
@@ -17,8 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.File;
+import java.io.IOException;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
@@ -155,4 +159,44 @@
+ "\n recoveryId=" + recoveryId
+ "\n original=" + original;
}
+
+ Info createInfo() {
+ return new Info(this);
+ }
+
+ /** Replica recovery information. */
+ static class Info extends Block{
+ private ReplicaState originalState;
+
+ private Info(ReplicaUnderRecovery rur) {
+ super(rur);
+ originalState = rur.getOrignalReplicaState();
+ }
+
+ ReplicaState getOriginalReplicaState() {
+ return originalState;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ originalState = ReplicaState.read(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ originalState.write(out);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+ }
}
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=816025&r1=816024&r2=816025&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java Thu Sep 17 01:34:46 2009
@@ -21,19 +21,20 @@
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery.Info;
import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.junit.Assert;
+import org.junit.Test;
/**
* This tests InterDataNodeProtocol for block handling.
@@ -111,4 +112,87 @@
if (cluster != null) {cluster.shutdown();}
}
}
+
+ private static ReplicaInfo createReplicaInfo(Block b) {
+ return new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(),
+ null, null);
+ }
+
+ private static void assertEquals(ReplicaInfo originalInfo, Info recoveryInfo) {
+ Assert.assertEquals(originalInfo.getBlockId(), recoveryInfo.getBlockId());
+ Assert.assertEquals(originalInfo.getGenerationStamp(), recoveryInfo.getGenerationStamp());
+ Assert.assertEquals(originalInfo.getBytesOnDisk(), recoveryInfo.getNumBytes());
+ Assert.assertEquals(originalInfo.getState(), recoveryInfo.getOriginalReplicaState());
+ }
+
+ /** Test {@link FSDataset#initReplicaRecovery(ReplicasMap, Block, long)} */
+ @Test
+ public void testInitReplicaRecovery() throws IOException {
+ final long firstblockid = 10000L;
+ final long gs = 7777L;
+ final long length = 22L;
+ final ReplicasMap map = new ReplicasMap();
+ final Block[] blocks = new Block[5];
+ for(int i = 0; i < blocks.length; i++) {
+ blocks[i] = new Block(firstblockid + i, length, gs);
+ map.add(createReplicaInfo(blocks[i]));
+ }
+
+ {
+ //normal case
+ final Block b = blocks[0];
+ final ReplicaInfo originalInfo = map.get(b);
+
+ final long recoveryid = gs + 1;
+ final Info recoveryInfo = FSDataset.initReplicaRecovery(map, blocks[0], recoveryid);
+ assertEquals(originalInfo, recoveryInfo);
+
+ final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(b);
+ Assert.assertEquals(originalInfo.getBlockId(), updatedInfo.getBlockId());
+ Assert.assertEquals(recoveryid, updatedInfo.getRecoveryID());
+
+ //recover one more time
+ final long recoveryid2 = gs + 2;
+ final Info recoveryInfo2 = FSDataset.initReplicaRecovery(map, blocks[0], recoveryid2);
+ assertEquals(originalInfo, recoveryInfo2);
+
+ final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(b);
+ Assert.assertEquals(originalInfo.getBlockId(), updatedInfo2.getBlockId());
+ Assert.assertEquals(recoveryid2, updatedInfo2.getRecoveryID());
+
+ //case RecoveryInProgressException
+ try {
+ FSDataset.initReplicaRecovery(map, b, recoveryid);
+ Assert.fail();
+ }
+ catch(RecoveryInProgressException ripe) {
+ System.out.println("GOOD: getting " + ripe);
+ }
+ }
+
+ { //replica not found
+ final long recoveryid = gs + 1;
+ final Block b = new Block(firstblockid - 1, length, gs);
+ try {
+ FSDataset.initReplicaRecovery(map, b, recoveryid);
+ Assert.fail();
+ }
+ catch(ReplicaNotFoundException rnfe) {
+ System.out.println("GOOD: getting " + rnfe);
+ }
+ }
+
+ { //case "THIS IS NOT SUPPOSED TO HAPPEN"
+ final long recoveryid = gs - 1;
+ final Block b = new Block(firstblockid + 1, length, gs);
+ try {
+ FSDataset.initReplicaRecovery(map, b, recoveryid);
+ Assert.fail();
+ }
+ catch(IOException ioe) {
+ System.out.println("GOOD: getting " + ioe);
+ }
+ }
+
+ }
}