You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sh...@apache.org on 2009/09/26 23:23:26 UTC
svn commit: r819215 - in /hadoop/hdfs/branches/HDFS-265: ./
src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/datanode/
src/java/org/apache/hadoop/hdfs/server/namenode/
src/java/org/apache/hadoop/hdfs/server/protocol/ src...
Author: shv
Date: Sat Sep 26 21:23:16 2009
New Revision: 819215
URL: http://svn.apache.org/viewvc?rev=819215&view=rev
Log:
HDFS-644. Lease recovery, concurrency support. Contributed by Konstantin Shvachko.
Added:
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java (with props)
Modified:
hadoop/hdfs/branches/HDFS-265/CHANGES.txt
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Sat Sep 26 21:23:16 2009
@@ -10,6 +10,8 @@
HDFS-636. SafeMode counts complete blocks only. (shv)
+ HDFS-644. Lease recovery, concurrency support. (shv)
+
NEW FEATURES
HDFS-536. Support hflush at DFSClient. (hairong)
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Sat Sep 26 21:23:16 2009
@@ -42,6 +42,7 @@
* generated access token is returned as part of the return value.
* @throws IOException
*/
+ @Deprecated // not used anymore - should be removed
LocatedBlock recoverBlock(Block block, boolean keepLength,
DatanodeInfo[] targets) throws IOException;
}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java Sat Sep 26 21:23:16 2009
@@ -15,17 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
/**
* Exception indicating that a replica is already being recovery.
*/
-class RecoveryInProgressException extends IOException {
+public class RecoveryInProgressException extends IOException {
private static final long serialVersionUID = 1L;
- RecoveryInProgressException(String msg) {
+ public RecoveryInProgressException(String msg) {
super(msg);
}
}
\ No newline at end of file
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Sep 26 21:23:16 2009
@@ -34,6 +34,7 @@
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedList;
@@ -69,6 +70,7 @@
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -77,6 +79,7 @@
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
@@ -913,7 +916,7 @@
processDistributedUpgradeCommand((UpgradeCommand)cmd);
break;
case DatanodeProtocol.DNA_RECOVERBLOCK:
- recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
+ recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
break;
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
@@ -1515,16 +1518,16 @@
return info;
}
- public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
+ public Daemon recoverBlocks(final Collection<RecoveringBlock> blocks) {
Daemon d = new Daemon(threadGroup, new Runnable() {
/** Recover a list of blocks. It is run by the primary datanode. */
public void run() {
- for(int i = 0; i < blocks.length; i++) {
+ for(RecoveringBlock b : blocks) {
try {
- logRecoverBlock("NameNode", blocks[i], targets[i]);
- recoverBlock(blocks[i], false, targets[i], true);
+ logRecoverBlock("NameNode", b.getBlock(), b.getLocations());
+ recoverBlock(b);
} catch (IOException e) {
- LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
+ LOG.warn("recoverBlocks FAILED: " + b, e);
}
}
}
@@ -1580,9 +1583,9 @@
}
/** Recover a block */
- private LocatedBlock recoverBlock(Block block, boolean keepLength,
- DatanodeInfo[] targets, boolean closeFile) throws IOException {
-
+ private LocatedBlock recoverBlock(RecoveringBlock rBlock) throws IOException {
+ Block block = rBlock.getBlock();
+ DatanodeInfo[] targets = rBlock.getLocations();
DatanodeID[] datanodeids = (DatanodeID[])targets;
// If the block is already being recovered, then skip recovering it.
// This can happen if the namenode and client start recovering the same
@@ -1609,16 +1612,9 @@
this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
- if (keepLength) {
- if (info.getNumBytes() == block.getNumBytes()) {
- syncList.add(new BlockRecord(id, datanode, new Block(info)));
- }
- }
- else {
- syncList.add(new BlockRecord(id, datanode, new Block(info)));
- if (info.getNumBytes() < minlength) {
- minlength = info.getNumBytes();
- }
+ syncList.add(new BlockRecord(id, datanode, new Block(info)));
+ if (info.getNumBytes() < minlength) {
+ minlength = info.getNumBytes();
}
}
} catch (IOException e) {
@@ -1633,10 +1629,8 @@
throw new IOException("All datanodes failed: block=" + block
+ ", datanodeids=" + Arrays.asList(datanodeids));
}
- if (!keepLength) {
- block.setNumBytes(minlength);
- }
- return syncBlock(block, syncList, targets, closeFile);
+ block.setNumBytes(minlength);
+ return syncBlock(rBlock, syncList);
} finally {
synchronized (ongoingRecovery) {
ongoingRecovery.remove(block);
@@ -1645,20 +1639,22 @@
}
/** Block synchronization */
- private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,
- DatanodeInfo[] targets, boolean closeFile) throws IOException {
+ private LocatedBlock syncBlock(RecoveringBlock rBlock,
+ List<BlockRecord> syncList) throws IOException {
+ Block block = rBlock.getBlock();
+ long newGenerationStamp = rBlock.getNewGenerationStamp();
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
- + "), syncList=" + syncList + ", closeFile=" + closeFile);
+ + "), syncList=" + syncList);
}
//syncList.isEmpty() that all datanodes do not have the block
//so the block can be deleted.
if (syncList.isEmpty()) {
- namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
- DatanodeID.EMPTY_ARRAY);
+ namenode.commitBlockSynchronization(block, newGenerationStamp, 0,
+ true, true, DatanodeID.EMPTY_ARRAY);
//always return a new access token even if everything else stays the same
- LocatedBlock b = new LocatedBlock(block, targets);
+ LocatedBlock b = new LocatedBlock(block, rBlock.getLocations());
if (isAccessTokenEnabled) {
b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
@@ -1668,12 +1664,12 @@
List<DatanodeID> successList = new ArrayList<DatanodeID>();
- long generationstamp = namenode.nextGenerationStamp(block);
- Block newblock = new Block(block.getBlockId(), block.getNumBytes(), generationstamp);
+ Block newblock =
+ new Block(block.getBlockId(), block.getNumBytes(), newGenerationStamp);
for(BlockRecord r : syncList) {
try {
- r.datanode.updateBlock(r.block, newblock, closeFile);
+ r.datanode.updateBlock(r.block, newblock, true);
successList.add(r.id);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
@@ -1685,7 +1681,7 @@
DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
namenode.commitBlockSynchronization(block,
- newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,
+ newblock.getGenerationStamp(), newblock.getNumBytes(), true, false,
nlist);
DatanodeInfo[] info = new DatanodeInfo[nlist.length];
for (int i = 0; i < nlist.length; i++) {
@@ -1712,10 +1708,12 @@
// ClientDataNodeProtocol implementation
/** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
) throws IOException {
logRecoverBlock("Client", block, targets);
- return recoverBlock(block, keepLength, targets, false);
+ assert false : "ClientDatanodeProtocol.recoverBlock: should never be called.";
+ return null;
}
private static void logRecoverBlock(String who,
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Sat Sep 26 21:23:16 2009
@@ -45,6 +45,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.metrics.util.MBeanUtil;
@@ -1414,7 +1415,9 @@
public synchronized void finalizeBlock(Block b) throws IOException {
ReplicaInfo replicaInfo = getReplicaInfo(b);
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
- throw new IOException("Block " + b + " is already finalized.");
+ // this is legal, when recovery happens on a file that has
+ // been opened for append but never modified
+ return;
}
ReplicaInfo newReplicaInfo = null;
if (replicaInfo.getState() == ReplicaState.RUR &&
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java Sat Sep 26 21:23:16 2009
@@ -306,7 +306,7 @@
// the block is already under construction
BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this;
ucBlock.setBlockUCState(s);
- ucBlock.setLocations(targets);
+ ucBlock.setExpectedLocations(targets);
ucBlock.setLastRecoveryTime(0);
return ucBlock;
}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java Sat Sep 26 21:23:16 2009
@@ -46,6 +46,13 @@
private long lastRecoveryTime = 0;
/**
+ * The new generation stamp, which this block will have
+ * after the recovery succeeds. Also used as a recovery id to identify
+ * the right recovery if any of the abandoned recoveries re-appear.
+ */
+ private long blockRecoveryId = 0;
+
+ /**
* ReplicaUnderConstruction contains information about replicas while
* they are under construction.
* The GS, the length and the state of the replica is as reported by
@@ -123,7 +130,7 @@
assert getBlockUCState() != BlockUCState.COMPLETE :
"BlockInfoUnderConstruction cannot be in COMPLETE state";
this.blockUCState = state;
- setLocations(targets);
+ setExpectedLocations(targets);
}
/**
@@ -144,7 +151,7 @@
return new BlockInfo(this);
}
- void setLocations(DatanodeDescriptor[] targets) {
+ void setExpectedLocations(DatanodeDescriptor[] targets) {
int numLocations = targets == null ? 0 : targets.length;
this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
for(int i = 0; i < numLocations; i++)
@@ -156,7 +163,7 @@
* Create array of expected replica locations
* (as has been assigned by chooseTargets()).
*/
- private DatanodeDescriptor[] getExpectedLocations() {
+ DatanodeDescriptor[] getExpectedLocations() {
int numLocations = replicas == null ? 0 : replicas.size();
DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocations];
for(int i = 0; i < numLocations; i++)
@@ -164,7 +171,7 @@
return locations;
}
- int getNumLocations() {
+ int getNumExpectedLocations() {
return replicas == null ? 0 : replicas.size();
}
@@ -181,6 +188,10 @@
blockUCState = s;
}
+ long getBlockRecoveryId() {
+ return blockRecoveryId;
+ }
+
/**
* Commit block's length and generation stamp as reported by the client.
* Set block state to {@link BlockUCState#COMMITTED}.
@@ -197,9 +208,12 @@
/**
* Initialize lease recovery for this block.
- * Find the first alive data-node starting from the previous primary.
+ * Find the first alive data-node starting from the previous primary and
+ * make it primary.
*/
- void assignPrimaryDatanode() {
+ void initializeBlockRecovery(long recoveryId) {
+ setBlockUCState(BlockUCState.UNDER_RECOVERY);
+ blockRecoveryId = recoveryId;
if (replicas.size() == 0) {
NameNode.stateChangeLog.warn("BLOCK*"
+ " INodeFileUnderConstruction.initLeaseRecovery:"
@@ -212,7 +226,7 @@
if (replicas.get(j).isAlive()) {
primaryNodeIndex = j;
DatanodeDescriptor primary = replicas.get(j).getExpectedLocation();
- primary.addBlockToBeRecovered(this, getExpectedLocations());
+ primary.addBlockToBeRecovered(this);
NameNode.stateChangeLog.info("BLOCK* " + this
+ " recovery started, primary=" + primary);
return;
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Sat Sep 26 21:23:16 2009
@@ -28,6 +28,8 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
@@ -57,29 +59,36 @@
}
/** A BlockTargetPair queue. */
- private static class BlockQueue {
- private final Queue<BlockTargetPair> blockq = new LinkedList<BlockTargetPair>();
+ private static class BlockQueue<E> {
+ private final Queue<E> blockq = new LinkedList<E>();
/** Size of the queue */
synchronized int size() {return blockq.size();}
/** Enqueue */
- synchronized boolean offer(Block block, DatanodeDescriptor[] targets) {
- return blockq.offer(new BlockTargetPair(block, targets));
+ synchronized boolean offer(E e) {
+ return blockq.offer(e);
}
/** Dequeue */
- synchronized List<BlockTargetPair> poll(int numBlocks) {
+ synchronized List<E> poll(int numBlocks) {
if (numBlocks <= 0 || blockq.isEmpty()) {
return null;
}
- List<BlockTargetPair> results = new ArrayList<BlockTargetPair>();
+ List<E> results = new ArrayList<E>();
for(; !blockq.isEmpty() && numBlocks > 0; numBlocks--) {
results.add(blockq.poll());
}
return results;
}
+
+ /**
+ * Returns <tt>true</tt> if the queue contains the specified element.
+ */
+ boolean contains(E e) {
+ return blockq.contains(e);
+ }
}
private volatile BlockInfo blockList = null;
@@ -89,9 +98,10 @@
protected boolean needKeyUpdate = false;
/** A queue of blocks to be replicated by this datanode */
- private BlockQueue replicateBlocks = new BlockQueue();
+ private BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<BlockTargetPair>();
/** A queue of blocks to be recovered by this datanode */
- private BlockQueue recoverBlocks = new BlockQueue();
+ private BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
+ new BlockQueue<BlockInfoUnderConstruction>();
/** A set of blocks to be invalidated by this datanode */
private Set<Block> invalidateBlocks = new TreeSet<Block>();
@@ -279,15 +289,20 @@
*/
void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
assert(block != null && targets != null && targets.length > 0);
- replicateBlocks.offer(block, targets);
+ replicateBlocks.offer(new BlockTargetPair(block, targets));
}
/**
* Store block recovery work.
*/
- void addBlockToBeRecovered(Block block, DatanodeDescriptor[] targets) {
- assert(block != null && targets != null && targets.length > 0);
- recoverBlocks.offer(block, targets);
+ void addBlockToBeRecovered(BlockInfoUnderConstruction block) {
+ if(recoverBlocks.contains(block)) {
+ // this prevents adding the same block twice to the recovery queue
+ FSNamesystem.LOG.info("Block " + block +
+ " is already in the recovery queue.");
+ return;
+ }
+ recoverBlocks.offer(block);
}
/**
@@ -325,10 +340,16 @@
new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
}
- BlockCommand getLeaseRecoveryCommand(int maxTransfers) {
- List<BlockTargetPair> blocktargetlist = recoverBlocks.poll(maxTransfers);
- return blocktargetlist == null? null:
- new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK, blocktargetlist);
+ BlockRecoveryCommand getLeaseRecoveryCommand(int maxTransfers) {
+ List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
+ if(blocks == null)
+ return null;
+ BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.size());
+ for(BlockInfoUnderConstruction b : blocks) {
+ brCommand.add(new RecoveringBlock(
+ b, b.getExpectedLocations(), b.getBlockRecoveryId()));
+ }
+ return brCommand;
}
/**
@@ -444,7 +465,7 @@
Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
FSNamesystem.LOG.debug("Reported block " + block
+ " on " + getName() + " size " + block.getNumBytes()
- + "replicaState = " + rState);
+ + " replicaState = " + rState);
// find block by blockId
BlockInfo storedBlock = blockManager.findStoredBlock(block.getBlockId());
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Sat Sep 26 21:23:16 2009
@@ -528,7 +528,7 @@
clientMachine,
null);
fsDir.replaceNode(path, node, cons);
- fsNamesys.leaseManager.addLease(cons.clientName, path);
+ fsNamesys.leaseManager.addLease(cons.getClientName(), path);
}
break;
}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Sat Sep 26 21:23:16 2009
@@ -1404,7 +1404,7 @@
}
INodeFile oldnode = (INodeFile) old;
fsDir.replaceNode(path, oldnode, cons);
- fs.leaseManager.addLease(cons.clientName, path);
+ fs.leaseManager.addLease(cons.getClientName(), path);
}
}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Sep 26 21:23:16 2009
@@ -918,40 +918,45 @@
// If the file is under construction , then it must be in our
// leases. Find the appropriate lease record.
//
- Lease lease = leaseManager.getLease(holder);
- //
- // We found the lease for this file. And surprisingly the original
- // holder is trying to recreate this file. This should never occur.
- //
- if (lease != null) {
+ Lease lease = leaseManager.getLeaseByPath(src);
+ if (lease == null) {
throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because current leaseholder is trying to recreate file.");
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ " because pendingCreates is non-null but no leases found.");
}
//
- // Find the original holder.
+ // We found the lease for this file. And surprisingly the original
+ // holder is trying to recreate this file. This should never occur.
//
- lease = leaseManager.getLease(pendingFile.clientName);
- if (lease == null) {
+ if (lease.getHolder().equals(holder)) {
throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because pendingCreates is non-null but no leases found.");
- }
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ " because current leaseholder is trying to recreate file.");
+ }
+ assert lease.getHolder().equals(pendingFile.getClientName()) :
+ "Current lease holder " + lease.getHolder() +
+ " does not match file creator " + pendingFile.getClientName();
//
+ // Current lease holder is different from the requester.
// If the original holder has not renewed in the last SOFTLIMIT
- // period, then start lease recovery.
+ // period, then start lease recovery, otherwise fail.
//
if (lease.expiredSoftLimit()) {
LOG.info("startFile: recover lease " + lease + ", src=" + src);
- internalReleaseLease(lease, src);
- }
- throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- ", because this file is already being created by " +
- pendingFile.getClientName() +
- " on " + pendingFile.getClientMachine());
+ boolean isClosed = internalReleaseLease(lease, src, null);
+ if(!isClosed)
+ throw new RecoveryInProgressException(
+ "Failed to close file " + src +
+ ". Lease recovery is in progress. Try again later.");
+
+ } else
+ throw new AlreadyBeingCreatedException("failed to create file " +
+ src + " for " + holder + " on client " + clientMachine +
+ ", because this file is already being created by " +
+ pendingFile.getClientName() +
+ " on " + pendingFile.getClientMachine());
}
try {
@@ -1004,7 +1009,7 @@
clientMachine,
clientNode);
dir.replaceNode(src, node, cons);
- leaseManager.addLease(cons.clientName, src);
+ leaseManager.addLease(cons.getClientName(), src);
} else {
// Now we can add the name to the filesystem. This file has no
@@ -1020,7 +1025,7 @@
throw new IOException("DIR* NameSystem.startFile: " +
"Unable to add file to namespace.");
}
- leaseManager.addLease(newNode.clientName, src);
+ leaseManager.addLease(newNode.getClientName(), src);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+"add "+src+" to namespace for "+holder);
@@ -1632,20 +1637,31 @@
* Move a file that is being written to be immutable.
* @param src The filename
* @param lease The lease for the client creating the file
- */
- void internalReleaseLease(Lease lease, String src) throws IOException {
+ * @param recoveryLeaseHolder reassign lease to this holder if the last block
+ * needs recovery; keep current holder if null.
+ * @throws AlreadyBeingCreatedException if file is waiting to achieve minimal
+ * replication;<br>
+ * RecoveryInProgressException if lease recovery is in progress.<br>
+ * IOException in case of an error.
+ * @return true if file has been successfully finalized and closed or
+ * false if block recovery has been initiated
+ */
+ boolean internalReleaseLease(
+ Lease lease, String src, String recoveryLeaseHolder)
+ throws AlreadyBeingCreatedException,
+ IOException {
LOG.info("Recovering lease=" + lease + ", src=" + src);
INodeFile iFile = dir.getFileINode(src);
if (iFile == null) {
- final String message = "DIR* NameSystem.internalReleaseCreate: "
+ final String message = "DIR* NameSystem.internalReleaseLease: "
+ "attempt to release a create lock on "
+ src + " file does not exist.";
NameNode.stateChangeLog.warn(message);
throw new IOException(message);
}
if (!iFile.isUnderConstruction()) {
- final String message = "DIR* NameSystem.internalReleaseCreate: "
+ final String message = "DIR* NameSystem.internalReleaseLease: "
+ "attempt to release a create lock on "
+ src + " but file is already closed.";
NameNode.stateChangeLog.warn(message);
@@ -1653,35 +1669,112 @@
}
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
- BlockInfoUnderConstruction lastBlock = pendingFile.getLastBlock();
+ int nrBlocks = pendingFile.numBlocks();
+ BlockInfo[] blocks = pendingFile.getBlocks();
- // Initialize lease recovery for pendingFile. If there are no blocks
- // associated with this file, then reap lease immediately. Otherwise
- // renew the lease and trigger lease recovery.
- if (lastBlock == null) {
- assert pendingFile.getBlocks().length == 0 :
- "file is not empty but the last block does not exist";
+ int nrCompleteBlocks;
+ BlockInfo curBlock = null;
+ for(nrCompleteBlocks = 0; nrCompleteBlocks < nrBlocks; nrCompleteBlocks++) {
+ curBlock = blocks[nrCompleteBlocks];
+ if(!curBlock.isComplete())
+ break;
+ assert blockManager.checkMinReplication(curBlock) :
+ "A COMPLETE block is not minimally replicated in " + src;
+ }
+
+ // If there are no incomplete blocks associated with this file,
+ // then reap lease immediately and close the file.
+ if(nrCompleteBlocks == nrBlocks) {
finalizeINodeFileUnderConstruction(src, pendingFile);
NameNode.stateChangeLog.warn("BLOCK*"
- + " internalReleaseLease: No blocks found, lease removed.");
- return;
+ + " internalReleaseLease: All existing blocks are COMPLETE,"
+ + " lease removed, file closed.");
+ return true; // closed!
}
- // setup the last block locations from the blockManager if not known
- if(lastBlock.getNumLocations() == 0) {
- DatanodeDescriptor targets[] = blockManager.getNodes(lastBlock);
- lastBlock.setLocations(targets);
+ // Only the last and the penultimate blocks may be in non COMPLETE state.
+ // If the penultimate block is not COMPLETE, then it must be COMMITTED.
+ if(nrCompleteBlocks < nrBlocks - 2 ||
+ nrCompleteBlocks == nrBlocks - 2 &&
+ curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
+ final String message = "DIR* NameSystem.internalReleaseLease: "
+ + "attempt to release a create lock on "
+ + src + " but file is already closed.";
+ NameNode.stateChangeLog.warn(message);
+ throw new IOException(message);
}
- // start lease recovery of the last block for this file.
- lastBlock.assignPrimaryDatanode();
- leaseManager.renewLease(lease);
+ // no we know that the last block is not COMPLETE, and
+ // that the penultimate block if exists is either COMPLETE or COMMITTED
+ BlockInfoUnderConstruction lastBlock = pendingFile.getLastBlock();
+ BlockUCState lastBlockState = lastBlock.getBlockUCState();
+ BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
+ BlockUCState penultimateBlockState = (penultimateBlock == null ?
+ BlockUCState.COMPLETE : penultimateBlock.getBlockUCState());
+ assert penultimateBlockState == BlockUCState.COMPLETE ||
+ penultimateBlockState == BlockUCState.COMMITTED :
+ "Unexpected state of penultimate block in " + src;
+
+ switch(lastBlockState) {
+ case COMPLETE:
+ assert false : "Already checked that the last block is incomplete";
+ break;
+ case COMMITTED:
+ // Close file if committed blocks are minimally replicated
+ if(blockManager.checkMinReplication(penultimateBlock) &&
+ blockManager.checkMinReplication(lastBlock)) {
+ finalizeINodeFileUnderConstruction(src, pendingFile);
+ NameNode.stateChangeLog.warn("BLOCK*"
+ + " internalReleaseLease: Committed blocks are minimally replicated,"
+ + " lease removed, file closed.");
+ return true; // closed!
+ }
+ // Cannot close file right now, since some blocks
+ // are not yet minimally replicated.
+ // This may potentially cause infinite loop in lease recovery
+ // if there are no valid replicas on data-nodes.
+ String message = "DIR* NameSystem.internalReleaseLease: " +
+ "Failed to release lease for file " + src +
+ ". Committed blocks are waiting to be minimally replicated." +
+ " Try again later.";
+ NameNode.stateChangeLog.warn(message);
+ throw new AlreadyBeingCreatedException(message);
+ case UNDER_CONSTRUCTION:
+ case UNDER_RECOVERY:
+ // setup the last block locations from the blockManager if not known
+ if(lastBlock.getNumExpectedLocations() == 0)
+ lastBlock.setExpectedLocations(blockManager.getNodes(lastBlock));
+ // start recovery of the last block for this file
+ long blockRecoveryId = nextGenerationStamp();
+ lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
+ lastBlock.initializeBlockRecovery(blockRecoveryId);
+ leaseManager.renewLease(lease);
+ // Cannot close file right now, since the last block requires recovery.
+ // This may potentially cause infinite loop in lease recovery
+ // if there are no valid replicas on data-nodes.
+ NameNode.stateChangeLog.warn(
+ "DIR* NameSystem.internalReleaseLease: " +
+ "File " + src + " has not been closed." +
+ " Lease recovery is in progress. " +
+ "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
+ break;
+ }
+ return false;
}
+ Lease reassignLease(Lease lease, String src, String newHolder,
+ INodeFileUnderConstruction pendingFile) {
+ if(newHolder == null)
+ return lease;
+ pendingFile.setClientName(newHolder);
+ return leaseManager.reassignLease(lease, src, newHolder);
+ }
+
+
private void finalizeINodeFileUnderConstruction(
String src,
INodeFileUnderConstruction pendingFile) throws IOException {
- leaseManager.removeLease(pendingFile.clientName, src);
+ leaseManager.removeLease(pendingFile.getClientName(), src);
// complete the penultimate block
blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2);
@@ -1715,11 +1808,20 @@
throw new IOException("Block (=" + lastblock + ") not found");
}
INodeFile iFile = oldblockinfo.getINode();
- if (!iFile.isUnderConstruction()) {
+ if (!iFile.isUnderConstruction() || oldblockinfo.isComplete()) {
throw new IOException("Unexpected block (=" + lastblock
+ ") since the file (=" + iFile.getLocalName()
+ ") is not under construction");
}
+
+ long recoveryId =
+ ((BlockInfoUnderConstruction)oldblockinfo).getBlockRecoveryId();
+ if(recoveryId != newgenerationstamp) {
+ throw new IOException("The recovery id " + newgenerationstamp
+ + " does not match current recovery id "
+ + recoveryId + " for block " + lastblock);
+ }
+
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Sat Sep 26 21:23:16 2009
@@ -25,7 +25,7 @@
class INodeFileUnderConstruction extends INodeFile {
- final String clientName; // lease holder
+ private String clientName; // lease holder
private final String clientMachine;
private final DatanodeDescriptor clientNode; // if client is a cluster node too.
@@ -64,6 +64,10 @@
return clientName;
}
+ void setClientName(String clientName) {
+ this.clientName = clientName;
+ }
+
String getClientMachine() {
return clientMachine;
}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Sat Sep 26 21:23:16 2009
@@ -102,7 +102,7 @@
/**
* Adds (or re-adds) the lease for the specified file.
*/
- synchronized void addLease(String holder, String src) {
+ synchronized Lease addLease(String holder, String src) {
Lease lease = getLease(holder);
if (lease == null) {
lease = new Lease(holder);
@@ -113,6 +113,7 @@
}
sortedLeasesByPath.put(src, lease);
lease.paths.add(src);
+ return lease;
}
/**
@@ -143,11 +144,22 @@
}
/**
+ * Reassign lease for file src to the new holder.
+ */
+ synchronized Lease reassignLease(Lease lease, String src, String newHolder) {
+ assert newHolder != null : "new lease holder is null";
+ if (lease != null) {
+ removeLease(lease, src);
+ }
+ return addLease(newHolder, src);
+ }
+
+ /**
* Finds the pathname for the specified pendingFile
*/
synchronized String findPath(INodeFileUnderConstruction pendingFile
) throws IOException {
- Lease lease = getLease(pendingFile.clientName);
+ Lease lease = getLease(pendingFile.getClientName());
if (lease != null) {
String src = lease.findPath(pendingFile);
if (src != null) {
@@ -265,7 +277,11 @@
Collection<String> getPaths() {
return paths;
}
-
+
+ String getHolder() {
+ return holder;
+ }
+
void replacePath(String oldpath, String newpath) {
paths.remove(oldpath);
paths.add(newpath);
@@ -376,7 +392,13 @@
oldest.getPaths().toArray(leasePaths);
for(String p : leasePaths) {
try {
- fsnamesystem.internalReleaseLease(oldest, p);
+ if(fsnamesystem.internalReleaseLease(oldest, p, "HDFS_NameNode")) {
+ LOG.info("Lease recovery for file " + p +
+ " is complete. File closed.");
+ removing.add(p);
+ } else
+ LOG.info("Started block recovery for file " + p +
+ " lease " + oldest);
} catch (IOException e) {
LOG.error("Cannot release the path "+p+" in the lease "+oldest, e);
removing.add(p);
Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java?rev=819215&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java Sat Sep 26 21:23:16 2009
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.ArrayList;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * BlockRecoveryCommand is an instruction to a data-node to recover
+ * the specified blocks.
+ *
+ * The data-node that receives this command treats itself as a primary
+ * data-node in the recover process.
+ *
+ * Block recovery is identified by a recoveryId, which is also the new
+ * generation stamp, which the block will have after the recovery succeeds.
+ */
+public class BlockRecoveryCommand extends DatanodeCommand {
+ Collection<RecoveringBlock> recoveringBlocks;
+
+ /**
+ * This is a block with locations from which it should be recovered
+ * and the new generation stamp, which the block will have after
+ * successful recovery.
+ *
+ * The new generation stamp of the block, also plays role of the recovery id.
+ */
+ public static class RecoveringBlock extends LocatedBlock {
+ private long newGenerationStamp;
+
+ /**
+ * Create empty RecoveringBlock.
+ */
+ public RecoveringBlock() {
+ super();
+ newGenerationStamp = -1L;
+ }
+
+ /**
+ * Create RecoveringBlock.
+ */
+ public RecoveringBlock(Block b, DatanodeInfo[] locs, long newGS) {
+ super(b, locs, -1, false); // startOffset is unknown
+ this.newGenerationStamp = newGS;
+ }
+
+ /**
+ * Return the new generation stamp of the block,
+ * which also plays role of the recovery id.
+ */
+ public long getNewGenerationStamp() {
+ return newGenerationStamp;
+ }
+
+ ///////////////////////////////////////////
+ // Writable
+ ///////////////////////////////////////////
+ static { // register a ctor
+ WritableFactories.setFactory
+ (RecoveringBlock.class,
+ new WritableFactory() {
+ public Writable newInstance() { return new RecoveringBlock(); }
+ });
+ }
+
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeLong(newGenerationStamp);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ newGenerationStamp = in.readLong();
+ }
+ }
+
+ /**
+ * Create empty BlockRecoveryCommand.
+ */
+ public BlockRecoveryCommand() {
+ this(0);
+ }
+
+ /**
+ * Create BlockRecoveryCommand with
+ * the specified capacity for recovering blocks.
+ */
+ public BlockRecoveryCommand(int capacity) {
+ super(DatanodeProtocol.DNA_RECOVERBLOCK);
+ recoveringBlocks = new ArrayList<RecoveringBlock>(capacity);
+ }
+
+ /**
+ * Return the list of recovering blocks.
+ */
+ public Collection<RecoveringBlock> getRecoveringBlocks() {
+ return recoveringBlocks;
+ }
+
+ /**
+ * Add recovering block to the command.
+ */
+ public void add(RecoveringBlock block) {
+ recoveringBlocks.add(block);
+ }
+
+ ///////////////////////////////////////////
+ // Writable
+ ///////////////////////////////////////////
+ static { // register a ctor
+ WritableFactories.setFactory
+ (BlockRecoveryCommand.class,
+ new WritableFactory() {
+ public Writable newInstance() { return new BlockRecoveryCommand(); }
+ });
+ }
+
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeInt(recoveringBlocks.size());
+ for(RecoveringBlock block : recoveringBlocks) {
+ block.write(out);
+ }
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ int numBlocks = in.readInt();
+ recoveringBlocks = new ArrayList<RecoveringBlock>(numBlocks);
+ for(int i = 0; i < numBlocks; i++) {
+ RecoveringBlock b = new RecoveringBlock();
+ b.readFields(in);
+ add(b);
+ }
+ }
+}
Propchange: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Sat Sep 26 21:23:16 2009
@@ -35,9 +35,9 @@
**********************************************************************/
public interface DatanodeProtocol extends VersionedProtocol {
/**
- * 21: blockReport() includes under-construction replicas.
+ * 22: BlockRecoveryCommand introduced in reply to sendHeartbeat().
*/
- public static final long versionID = 21L;
+ public static final long versionID = 22L;
// error code
final static int NOTIFY = 0;
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java Sat Sep 26 21:23:16 2009
@@ -18,12 +18,10 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
-import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -34,6 +32,7 @@
public class TestLeaseRecovery extends junit.framework.TestCase {
static final int BLOCK_SIZE = 1024;
static final short REPLICATION_NUM = (short)3;
+ private static final long LEASE_PERIOD = 300L;
static void checkMetaInfo(Block b, InterDatanodeProtocol idp
) throws IOException {
@@ -50,6 +49,15 @@
return m;
}
+ void waitLeaseRecovery(MiniDFSCluster cluster) {
+ cluster.setLeasePeriod(LEASE_PERIOD, LEASE_PERIOD);
+ // wait for the lease to expire
+ try {
+ Thread.sleep(2 * 3000); // 2 heartbeat intervals
+ } catch (InterruptedException e) {
+ }
+ }
+
/**
* The following test first creates a file with a few blocks.
* It randomly truncates the replica of the last block stored in each datanode.
@@ -96,44 +104,22 @@
checkMetaInfo(lastblock, idps[i]);
}
- //setup random block sizes
- int lastblocksize = ORG_FILE_SIZE % BLOCK_SIZE;
- Integer[] newblocksizes = new Integer[REPLICATION_NUM];
- for(int i = 0; i < REPLICATION_NUM; i++) {
- newblocksizes[i] = AppendTestUtil.nextInt(lastblocksize);
- }
- DataNode.LOG.info("newblocksizes = " + Arrays.asList(newblocksizes));
DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
cluster.getNameNode().append(filestr, dfs.dfs.clientName);
- //update blocks with random block sizes
- long newGS = cluster.getNameNode().nextGenerationStamp(lastblock);
- Block[] newblocks = new Block[REPLICATION_NUM];
- for(int i = 0; i < REPLICATION_NUM; i++) {
- newblocks[i] = new Block(lastblock.getBlockId(), newblocksizes[i],
- newGS);
- idps[i].updateBlock(lastblock, newblocks[i], false);
- checkMetaInfo(newblocks[i], idps[i]);
- }
- cluster.getNameNode().commitBlockSynchronization(lastblock, newGS,
- lastblocksize, false, false, new DatanodeID[]{});
-
- //block synchronization
- final int primarydatanodeindex = AppendTestUtil.nextInt(datanodes.length);
- DataNode.LOG.info("primarydatanodeindex =" + primarydatanodeindex);
- DataNode primary = datanodes[primarydatanodeindex];
- DataNode.LOG.info("primary.dnRegistration=" + primary.dnRegistration);
- primary.recoverBlocks(new Block[]{lastblock}, new DatanodeInfo[][]{datanodeinfos}).join();
+ // expire lease to trigger block recovery.
+ waitLeaseRecovery(cluster);
BlockMetaDataInfo[] updatedmetainfo = new BlockMetaDataInfo[REPLICATION_NUM];
- int minsize = min(newblocksizes);
- long currentGS = cluster.getNamesystem().getGenerationStamp();
- lastblock.setGenerationStamp(currentGS);
+ long oldSize = lastblock.getNumBytes();
+ lastblock = TestInterDatanodeProtocol.getLastLocatedBlock(
+ dfs.dfs.getNamenode(), filestr).getBlock();
+ long currentGS = lastblock.getGenerationStamp();
for(int i = 0; i < REPLICATION_NUM; i++) {
updatedmetainfo[i] = idps[i].getBlockMetaDataInfo(lastblock);
assertEquals(lastblock.getBlockId(), updatedmetainfo[i].getBlockId());
- assertEquals(minsize, updatedmetainfo[i].getNumBytes());
+ assertEquals(oldSize, updatedmetainfo[i].getNumBytes());
assertEquals(currentGS, updatedmetainfo[i].getGenerationStamp());
}
}
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Sat Sep 26 21:23:16 2009
@@ -56,6 +56,7 @@
// conf.setInt("io.bytes.per.checksum", 16);
MiniDFSCluster cluster = null;
+ DistributedFileSystem dfs = null;
byte[] actual = new byte[FILE_SIZE];
try {
@@ -63,7 +64,7 @@
cluster.waitActive();
//create a file
- DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+ dfs = (DistributedFileSystem)cluster.getFileSystem();
// create a random file name
String filestr = "/foo" + AppendTestUtil.nextInt();
System.out.println("filestr=" + filestr);
@@ -129,10 +130,9 @@
+ "Validating its contents now...");
// verify that file-size matches
+ long fileSize = dfs.getFileStatus(filepath).getLen();
assertTrue("File should be " + size + " bytes, but is actually " +
- " found to be " + dfs.getFileStatus(filepath).getLen() +
- " bytes",
- dfs.getFileStatus(filepath).getLen() == size);
+ " found to be " + fileSize + " bytes", fileSize == size);
// verify that there is enough data to read.
System.out.println("File size is good. Now validating sizes from datanodes...");
@@ -142,6 +142,7 @@
}
finally {
try {
+ if(dfs != null) dfs.close();
if (cluster != null) {cluster.shutdown();}
} catch (Exception e) {
// ignore
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=819215&r1=819214&r2=819215&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java Sat Sep 26 21:23:16 2009
@@ -30,6 +30,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery.Info;
import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;