You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/06/02 20:43:02 UTC
svn commit: r662513 [1/2] - in /hadoop/core/trunk: ./
src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
Author: dhruba
Date: Mon Jun 2 11:43:01 2008
New Revision: 662513
URL: http://svn.apache.org/viewvc?rev=662513&view=rev
Log:
HADOOP-3310. The namenode instructs the primary datanode to do lease
recovery. The block gets a new generation stamp.
(Tsz Wo (Nicholas), SZE via dhruba)
Added:
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientDatanodeProtocol.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreationNamenodeRestart.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestLeaseRecovery.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStamp.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/StringBytesWritable.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestGetBlocks.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jun 2 11:43:01 2008
@@ -182,6 +182,10 @@
HADOOP-3434. Retain the cause of the bind failure in Server::bind.
(Steve Loughran via cdouglas)
+ HADOOP-3310. The namenode instructs the primary datanode to do lease
+ recovery. The block gets a new generation stamp.
+ (Tsz Wo (Nicholas), SZE via dhruba)
+
OPTIMIZATIONS
HADOOP-3274. The default constructor of BytesWritable creates empty
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java Mon Jun 2 11:43:01 2008
@@ -25,7 +25,7 @@
* long.
*
**************************************************/
-class Block implements Writable, Comparable {
+class Block implements Writable, Comparable<Block> {
static { // register a ctor
WritableFactories.setFactory
@@ -51,43 +51,29 @@
}
}
+ static long filename2id(String name) {
+ return Long.parseLong(name.substring("blk_".length()));
+ }
+
long blkid;
long len;
long generationStamp;
- /**
- */
- public Block() {
- this.blkid = 0;
- this.len = 0;
- this.generationStamp = 0;
- }
+ Block() {this(0, 0, 0);}
- /**
- */
- public Block(final long blkid, final long len, final long generationStamp) {
- this.blkid = blkid;
- this.len = len;
- this.generationStamp = generationStamp;
+ Block(final long blkid, final long len, final long generationStamp) {
+ set(blkid, len, generationStamp);
}
- /**
- */
- public Block(Block blk) {
- this.blkid = blk.blkid;
- this.len = blk.len;
- this.generationStamp = blk.generationStamp;
- }
+ Block(final long blkid) {this(blkid, 0, GenerationStamp.WILDCARD_STAMP);}
+
+ Block(Block blk) {this(blk.blkid, blk.len, blk.generationStamp);}
/**
* Find the blockid from the given filename
*/
public Block(File f, long len, long genstamp) {
- String name = f.getName();
- name = name.substring("blk_".length());
- this.blkid = Long.parseLong(name);
- this.len = len;
- this.generationStamp = genstamp;
+ this(filename2id(f.getName()), len, genstamp);
}
public void set(long blkid, long len, long genStamp) {
@@ -147,31 +133,43 @@
/////////////////////////////////////
// Comparable
/////////////////////////////////////
- public int compareTo(Object o) {
- Block b = (Block) o;
+ static void validateGenerationStamp(long generationstamp) {
+ if (generationstamp == GenerationStamp.WILDCARD_STAMP) {
+ throw new IllegalStateException("generationStamp (=" + generationstamp
+ + ") == GenerationStamp.WILDCARD_STAMP");
+ }
+ }
+
+ /** {@inheritDoc} */
+ public int compareTo(Block b) {
+ //Wildcard generationStamp is NOT ALLOWED here
+ validateGenerationStamp(this.generationStamp);
+ validateGenerationStamp(b.generationStamp);
+
if (blkid < b.blkid) {
return -1;
} else if (blkid == b.blkid) {
- if (generationStamp < b.generationStamp) {
- return -1;
- } else if (generationStamp == b.generationStamp) {
- return 0;
- } else {
- return 1;
- }
+ return GenerationStamp.compare(generationStamp, b.generationStamp);
} else {
return 1;
}
}
+
+ /** {@inheritDoc} */
public boolean equals(Object o) {
if (!(o instanceof Block)) {
return false;
}
- return blkid == ((Block)o).blkid &&
- generationStamp == ((Block)o).generationStamp;
+ final Block that = (Block)o;
+ //Wildcard generationStamp is ALLOWED here
+ return this.blkid == that.blkid
+ && GenerationStamp.equalsWithWildcard(
+ this.generationStamp, that.generationStamp);
}
-
+
+ /** {@inheritDoc} */
public int hashCode() {
+ //GenerationStamp is IRRELEVANT and should not be used here
return 37 * 17 + (int) (blkid^(blkid>>>32));
}
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java Mon Jun 2 11:43:01 2008
@@ -18,6 +18,9 @@
package org.apache.hadoop.dfs;
import java.io.*;
+import java.util.List;
+
+import org.apache.hadoop.dfs.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.io.*;
abstract class DatanodeCommand implements Writable {
@@ -101,12 +104,17 @@
/**
* Create BlockCommand for transferring blocks to another datanode
* @param blocks blocks to be transferred
- * @param targets nodes to transfer
*/
- BlockCommand(Block blocks[], DatanodeInfo targets[][]) {
- super( DatanodeProtocol.DNA_TRANSFER);
- this.blocks = blocks;
- this.targets = targets;
+ BlockCommand(int action, List<BlockTargetPair> blocktargetlist) {
+ super(action);
+
+ blocks = new Block[blocktargetlist.size()];
+ targets = new DatanodeInfo[blocks.length][];
+ for(int i = 0; i < blocks.length; i++) {
+ BlockTargetPair p = blocktargetlist.get(i);
+ blocks[i] = p.block;
+ targets[i] = p.targets;
+ }
}
private static final DatanodeInfo[][] EMPTY_TARGET = {};
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java Mon Jun 2 11:43:01 2008
@@ -32,25 +32,16 @@
WritableFactories.setFactory(BlockMetaDataInfo.class, FACTORY);
}
- /** get BlockMetaDataInfo from the data set and the block scanner */
- static BlockMetaDataInfo getBlockMetaDataInfo(Block b,
- FSDatasetInterface dataset, DataBlockScanner blockscanner
- ) throws IOException {
- BlockMetaDataInfo info = new BlockMetaDataInfo();
- info.blkid = b.getBlockId();
- info.lastScanTime = blockscanner.getLastScanTime(b);
- info.len = dataset.getLength(b);
- //TODO: get generation stamp here
- return info;
- }
-
private long lastScanTime;
public BlockMetaDataInfo() {}
- long getLastScanTime() {return lastScanTime;}
+ BlockMetaDataInfo(Block b, long lastScanTime) {
+ super(b);
+ this.lastScanTime = lastScanTime;
+ }
- long getGenerationStamp() {return generationStamp;}
+ long getLastScanTime() {return lastScanTime;}
/** {@inheritDoc} */
public void write(DataOutput out) throws IOException {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java Mon Jun 2 11:43:01 2008
@@ -138,6 +138,23 @@
return 0;
}
+ /** Update this object */
+ void update(long newgenerationstamp, long newlength,
+ DatanodeDescriptor[] newtargets) {
+ //remove all nodes
+ for(int n = numNodes(); n >= 0; ) {
+ removeNode(--n);
+ }
+
+ //add all targets
+ for(DatanodeDescriptor d : newtargets) {
+ addNode(d);
+ }
+
+ generationStamp = newgenerationstamp;
+ len = newlength;
+ }
+
/**
* Add data-node this block belongs to.
*/
@@ -156,7 +173,11 @@
* Remove data-node from the block.
*/
boolean removeNode(DatanodeDescriptor node) {
- int dnIndex = this.findDatanode(node);
+ return removeNode(findDatanode(node));
+ }
+
+ /** Remove the indexed datanode from the block. */
+ boolean removeNode(int dnIndex) {
if(dnIndex < 0) // the node is not found
return false;
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
Added: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientDatanodeProtocol.java?rev=662513&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientDatanodeProtocol.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientDatanodeProtocol.java Mon Jun 2 11:43:01 2008
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/** An client-datanode protocol for block recovery
+ */
+interface ClientDatanodeProtocol extends VersionedProtocol {
+ public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
+
+ /**
+ * 1: start of time
+ */
+ public static final long versionID = 1L;
+
+ /** Start generation-stamp recovery for specified block
+ * @param block the specified block
+ * @param DatanodeInfo the list of possible locations of specified block
+ * @return the new blockid if recovery successful and the generation stamp
+ * got updated as part of the recovery, else returns null if the block id
+ * not have any data and the block was deleted.
+ * @throws IOException
+ */
+ Block recoverBlock(Block block, DatanodeInfo[] targets) throws IOException;
+}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon Jun 2 11:43:01 2008
@@ -128,6 +128,18 @@
return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
rpcNamenode, methodNameToPolicyMap);
}
+
+ static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
+ DatanodeID datanodeid, Configuration conf) throws IOException {
+ InetSocketAddress addr = NetUtils.createSocketAddr(
+ datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+ if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
+ ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
+ }
+ return (ClientDatanodeProtocol)RPC.waitForProxy(ClientDatanodeProtocol.class
+,
+ ClientDatanodeProtocol.versionID, addr, conf);
+ }
/**
* Create a new DFSClient connected to the default namenode.
@@ -1715,6 +1727,8 @@
private long artificialSlowdown = 0;
private long lastFlushOffset = -1; // offset when flush was invoked
private boolean persistBlocks = false; // persist blocks on namenode
+ private int recoveryErrorCount = 0; // number of times block recovery failed
+ private int maxRecoveryErrorCount = 5; // try block recovery 5 times
private class Packet {
ByteBuffer buffer; // only one of buf and buffer is non-null
@@ -1839,15 +1853,16 @@
synchronized (dataQueue) {
// process IO errors if any
- processDatanodeError();
+ boolean doSleep = processDatanodeError();
// wait for a packet to be sent.
- while (!closed && !hasError && clientRunning
- && dataQueue.size() == 0) {
+ while ((!closed && !hasError && clientRunning
+ && dataQueue.size() == 0) || doSleep) {
try {
dataQueue.wait(1000);
} catch (InterruptedException e) {
}
+ doSleep = false;
}
if (closed || hasError || dataQueue.size() == 0 || !clientRunning) {
continue;
@@ -2053,16 +2068,17 @@
}
// If this stream has encountered any errors so far, shutdown
- // threads and mark stream as closed.
+ // threads and mark stream as closed. Returns true if we should
+ // sleep for a while after returning from this call.
//
- private void processDatanodeError() {
+ private boolean processDatanodeError() {
if (!hasError) {
- return;
+ return false;
}
if (response != null) {
LOG.info("Error Recovery for block " + block +
" waiting for responder to exit. ");
- return;
+ return true;
}
String msg = "Error Recovery for block " + block +
" bad datanode[" + errorIndex + "]";
@@ -2094,7 +2110,7 @@
"Aborting...");
closed = true;
streamer.close();
- return;
+ return false;
}
StringBuilder pipelineMsg = new StringBuilder();
for (int j = 0; j < nodes.length; j++) {
@@ -2109,7 +2125,7 @@
pipeline + " are bad. Aborting...");
closed = true;
streamer.close();
- return;
+ return false;
}
LOG.warn("Error Recovery for block " + block +
" in pipeline " + pipeline +
@@ -2124,9 +2140,47 @@
for (int i = errorIndex; i < (nodes.length-1); i++) {
newnodes[i] = nodes[i+1];
}
- nodes = newnodes;
+
+ // Tell the primary datanode to do error recovery
+ // by stamping appropriate generation stamps.
+ //
+ Block newBlock = null;
+ ClientDatanodeProtocol datanodeRPC = null;
+ try {
+ datanodeRPC = createClientDatanodeProtocolProxy(newnodes[0], conf);
+ newBlock = datanodeRPC.recoverBlock(block, newnodes);
+ } catch (IOException e) {
+ recoveryErrorCount++;
+ if (recoveryErrorCount > maxRecoveryErrorCount) {
+ String emsg = "Error Recovery for block " + block + " failed " +
+ " because recovery from primary datanode " +
+ newnodes[0] + " failed " + recoveryErrorCount +
+ " times. Aborting...";
+ LOG.warn(emsg);
+ lastException = new IOException(emsg);
+ closed = true;
+ streamer.close();
+ return false; // abort with IOexception
+ }
+ LOG.warn("Error Recovery for block " + block + " failed " +
+ " because recovery from primary datanode " +
+ newnodes[0] + " failed " + recoveryErrorCount +
+ " times. Will retry...");
+ return true; // sleep when we return from here
+ } finally {
+ RPC.stopProxy(datanodeRPC);
+ }
+ recoveryErrorCount = 0; // block recovery successful
+
+ // If the block recovery generated a new generation stamp, use that
+ // from now on.
+ //
+ if (newBlock != null) {
+ block = newBlock;
+ }
// setup new pipeline
+ nodes = newnodes;
hasError = false;
errorIndex = 0;
success = createBlockOutputStream(nodes, src, true);
@@ -2134,6 +2188,7 @@
response = new ResponseProcessor(nodes);
response.start();
+ return false; // do not sleep, continue processing
}
private void isClosed() throws IOException {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Mon Jun 2 11:43:01 2008
@@ -19,7 +19,6 @@
import org.apache.commons.logging.*;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
@@ -80,7 +79,8 @@
* information to clients or other DataNodes that might be interested.
*
**********************************************************/
-public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
+public class DataNode extends Configured
+ implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
/**
@@ -178,7 +178,7 @@
*/
DataNode(Configuration conf,
AbstractList<File> dataDirs) throws IOException {
-
+ super(conf);
datanodeObject = this;
try {
@@ -772,12 +772,11 @@
private boolean processCommand(DatanodeCommand cmd) throws IOException {
if (cmd == null)
return true;
+ final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null;
+
switch(cmd.getAction()) {
case DatanodeProtocol.DNA_TRANSFER:
- //
// Send a copy of a block to another datanode
- //
- BlockCommand bcmd = (BlockCommand)cmd;
transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
break;
case DatanodeProtocol.DNA_INVALIDATE:
@@ -785,7 +784,7 @@
// Some local block(s) are obsolete and can be
// safely garbage-collected.
//
- Block toDelete[] = ((BlockCommand)cmd).getBlocks();
+ Block toDelete[] = bcmd.getBlocks();
try {
if (blockScanner != null) {
blockScanner.deleteBlocks(toDelete);
@@ -821,6 +820,9 @@
scheduleBlockReport(initialBlockReportDelay);
}
break;
+ case DatanodeProtocol.DNA_RECOVERBLOCK:
+ recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
+ break;
default:
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
}
@@ -3046,16 +3048,27 @@
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block);
}
- return BlockMetaDataInfo.getBlockMetaDataInfo(block, data, blockScanner);
+ Block stored = data.getStoredBlock(block.blkid);
+ return stored == null?
+ null: new BlockMetaDataInfo(stored, blockScanner.getLastScanTime(stored));
+ }
+
+ Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
+ Daemon d = new Daemon(threadGroup, new Runnable() {
+ public void run() {
+ LeaseManager.recoverBlocks(blocks, targets, namenode, getConf());
+ }
+ });
+ d.start();
+ return d;
}
/** {@inheritDoc} */
- public boolean updateGenerationStamp(Block block, GenerationStamp generationstamp) {
+ public void updateBlock(Block oldblock, Block newblock) throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("block=" + block + ", generationstamp=" + generationstamp);
+ LOG.debug("oldblock=" + oldblock + ", newblock=" + newblock);
}
- //TODO: update generation stamp here
- return false;
+ data.updateBlock(oldblock, newblock);
}
/** {@inheritDoc} */
@@ -3063,8 +3076,21 @@
) throws IOException {
if (protocol.equals(InterDatanodeProtocol.class.getName())) {
return InterDatanodeProtocol.versionID;
- } else {
- throw new IOException("Unknown protocol to name node: " + protocol);
+ } else if (protocol.equals(ClientDatanodeProtocol.class.getName())) {
+ return ClientDatanodeProtocol.versionID;
+ }
+ throw new IOException("Unknown protocol to " + getClass().getSimpleName()
+ + ": " + protocol);
+ }
+
+ // ClientDataNodeProtocol implementation
+ /** {@inheritDoc} */
+ public Block recoverBlock(Block block, DatanodeInfo[] targets
+ ) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("recoverBlock for block " + block);
}
+ return LeaseManager.recoverBlock(block, targets, namenode,
+ getConf(), false);
}
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java Mon Jun 2 11:43:01 2008
@@ -124,4 +124,9 @@
setDetached();
return true;
}
+
+ public String toString() {
+ return getClass().getSimpleName() + "(volume=" + volume
+ + ", file=" + file + ", detached=" + detached + ")";
+ }
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Mon Jun 2 11:43:01 2008
@@ -18,12 +18,10 @@
package org.apache.hadoop.dfs;
import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
-import org.apache.hadoop.dfs.DatanodeInfo.AdminStates;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.WritableUtils;
@@ -40,28 +38,63 @@
**************************************************/
public class DatanodeDescriptor extends DatanodeInfo {
+ /** Block and targets pair */
+ static class BlockTargetPair {
+ final Block block;
+ final DatanodeDescriptor[] targets;
+
+ BlockTargetPair(Block block, DatanodeDescriptor[] targets) {
+ this.block = block;
+ this.targets = targets;
+ }
+ }
+
+ /** A BlockTargetPair queue. */
+ private static class BlockQueue {
+ private final Queue<BlockTargetPair> blockq = new LinkedList<BlockTargetPair>();
+
+ /** Size of the queue */
+ synchronized int size() {return blockq.size();}
+
+ /** Enqueue */
+ synchronized boolean offer(Block block, DatanodeDescriptor[] targets) {
+ return blockq.offer(new BlockTargetPair(block, targets));
+ }
+
+ /** Dequeue */
+ synchronized List<BlockTargetPair> poll(int numTargets) {
+ if (numTargets <= 0 || blockq.isEmpty()) {
+ return null;
+ }
+ else {
+ List<BlockTargetPair> results = new ArrayList<BlockTargetPair>();
+ for(; !blockq.isEmpty() && numTargets > 0; ) {
+ numTargets -= blockq.peek().targets.length;
+ if (numTargets >= 0) {
+ results.add(blockq.poll());
+ }
+ }
+ return results;
+ }
+ }
+ }
private volatile BlockInfo blockList = null;
// isAlive == heartbeats.contains(this)
// This is an optimization, because contains takes O(n) time on Arraylist
protected boolean isAlive = false;
- //
- // List of blocks to be replicated by this datanode
- // Also, a list of datanodes per block to indicate the target
- // datanode of this replication.
- //
- List<Block> replicateBlocks;
- List<DatanodeDescriptor[]> replicateTargetSets;
- Set<Block> invalidateBlocks;
- boolean processedBlockReport = false;
+ /** A queue of blocks to be replicated by this datanode */
+ private BlockQueue replicateBlocks = new BlockQueue();
+ /** A queue of blocks to be recovered by this datanode */
+ private BlockQueue recoverBlocks = new BlockQueue();
+ /** A set of blocks to be invalidated by this datanode */
+ private Set<Block> invalidateBlocks = new TreeSet<Block>();
+ boolean processedBlockReport = false;
/** Default constructor */
- public DatanodeDescriptor() {
- super();
- initWorkLists();
- }
+ public DatanodeDescriptor() {}
/** DatanodeDescriptor constructor
* @param nodeID id of the data node
@@ -107,7 +140,6 @@
int xceiverCount) {
super(nodeID);
updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
- initWorkLists();
}
/** DatanodeDescriptor constructor
@@ -128,16 +160,6 @@
int xceiverCount) {
super(nodeID, networkLocation, hostName);
updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
- initWorkLists();
- }
-
- /*
- * initialize list of blocks that store work for the datanodes
- */
- private void initWorkLists() {
- replicateBlocks = new ArrayList<Block>();
- replicateTargetSets = new ArrayList<DatanodeDescriptor[]>();
- invalidateBlocks = new TreeSet<Block>();
}
/**
@@ -227,10 +249,15 @@
*/
void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
assert(block != null && targets != null && targets.length > 0);
- synchronized (replicateBlocks) {
- replicateBlocks.add(block);
- replicateTargetSets.add(targets);
- }
+ replicateBlocks.offer(block, targets);
+ }
+
+ /**
+ * Store block recovery work.
+ */
+ void addBlockToBeRecovered(Block block, DatanodeDescriptor[] targets) {
+ assert(block != null && targets != null && targets.length > 0);
+ recoverBlocks.offer(block, targets);
}
/**
@@ -245,16 +272,14 @@
}
}
- /*
+ /**
* The number of work items that are pending to be replicated
*/
int getNumberOfBlocksToBeReplicated() {
- synchronized (replicateBlocks) {
- return replicateBlocks.size();
- }
+ return replicateBlocks.size();
}
- /*
+ /**
* The number of block invalidation items that are pending to
* be sent to the datanode
*/
@@ -279,36 +304,16 @@
return processedBlockReport;
}
- /**
- * Remove the specified number of target sets
- */
- BlockCommand getReplicationCommand(int maxNumTransfers) {
- synchronized (replicateBlocks) {
- assert(replicateBlocks.size() == replicateTargetSets.size());
+ BlockCommand getReplicationCommand(int maxTransfers) {
+ List<BlockTargetPair> blocktargetlist = replicateBlocks.poll(maxTransfers);
+ return blocktargetlist == null? null:
+ new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
+ }
- if (maxNumTransfers <= 0 || replicateBlocks.size() == 0) {
- return null;
- }
- int numTransfers = 0;
- int numBlocks = 0;
- int i;
- for (i = 0; i < replicateTargetSets.size() &&
- numTransfers < maxNumTransfers; i++) {
- numTransfers += replicateTargetSets.get(i).length;
- }
- numBlocks = i;
- Block[] blocklist = new Block[numBlocks];
- DatanodeDescriptor targets[][] = new DatanodeDescriptor[numBlocks][];
-
- for (i = 0; i < numBlocks; i++) {
- blocklist[i] = replicateBlocks.get(0);
- targets[i] = replicateTargetSets.get(0);
- replicateBlocks.remove(0);
- replicateTargetSets.remove(0);
- }
- assert(blocklist.length > 0 && targets.length > 0);
- return new BlockCommand(blocklist, targets);
- }
+ BlockCommand getLeaseRecoveryCommand(int maxTransfers) {
+ List<BlockTargetPair> blocktargetlist = recoverBlocks.poll(maxTransfers);
+ return blocktargetlist == null? null:
+ new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK, blocktargetlist);
}
/**
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java Mon Jun 2 11:43:01 2008
@@ -32,6 +32,7 @@
*
*/
public class DatanodeID implements WritableComparable {
+ static final DatanodeID[] EMPTY_ARRAY = {};
protected String name; /// hostname:portNumber
protected String storageID; /// unique per cluster storageID
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Mon Jun 2 11:43:01 2008
@@ -31,9 +31,9 @@
**********************************************************************/
interface DatanodeProtocol extends VersionedProtocol {
/**
- * 14: add corrupt field to LocatedBlock
+ * 15: added DNA_RECOVERBLOCK, nextGenerationStamp and commitBlockSynchronization
*/
- public static final long versionID = 14L;
+ public static final long versionID = 15L;
// error code
final static int NOTIFY = 0;
@@ -51,6 +51,7 @@
final static int DNA_REGISTER = 4; // re-register
final static int DNA_FINALIZE = 5; // finalize previous upgrade
final static int DNA_BLOCKREPORT = 6; // request a block report
+ final static int DNA_RECOVERBLOCK = 7; // request a block recovery
/**
* Register Datanode.
@@ -132,4 +133,17 @@
* same as {@link ClientProtocol#reportBadBlocks(LocatedBlock[] blocks)}
*/
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
+
+ /**
+ * @return the next GenerationStamp
+ */
+ public long nextGenerationStamp() throws IOException;
+
+ /**
+ * Commit block synchronization in lease recovery
+ */
+ public void commitBlockSynchronization(Block block,
+ long newgenerationstamp, long newlength,
+ boolean closeFile, boolean deleteblock, DatanodeID[] newtargets
+ ) throws IOException;
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Mon Jun 2 11:43:01 2008
@@ -546,6 +546,11 @@
}
threads.add(Thread.currentThread());
}
+
+ public String toString() {
+ return getClass().getSimpleName() + "(file=" + file
+ + ", threads=" + threads + ")";
+ }
}
static File getMetaFile(File f , Block b) {
@@ -553,9 +558,75 @@
"_" + b.getGenerationStamp() + METADATA_EXTENSION );
}
protected File getMetaFile(Block b) throws IOException {
- File blockFile = getBlockFile( b );
- return new File( blockFile.getAbsolutePath() +
- "_" + b.getGenerationStamp() + METADATA_EXTENSION );
+ return getMetaFile(getBlockFile(b), b);
+ }
+
+ /** Find the corresponding meta data file from a given block file */
+ private static File findMetaFile(final File blockFile) throws IOException {
+ final String prefix = blockFile.getName() + "_";
+ final File parent = blockFile.getParentFile();
+ File[] matches = parent.listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return dir.equals(parent)
+ && name.startsWith(prefix) && name.endsWith(METADATA_EXTENSION);
+ }
+ });
+
+ if (matches == null || matches.length == 0) {
+ throw new IOException("Meta file not found, blockFile=" + blockFile);
+ }
+ else if (matches.length > 1) {
+ throw new IOException("Found more than one meta files: "
+ + Arrays.asList(matches));
+ }
+ return matches[0];
+ }
+
+ /** Find the corresponding meta data file from a given block file */
+ private static long parseGenerationStamp(File blockFile, File metaFile
+ ) throws IOException {
+ String metaname = metaFile.getName();
+ String gs = metaname.substring(blockFile.getName().length() + 1,
+ metaname.length() - METADATA_EXTENSION.length());
+ try {
+ return Long.parseLong(gs);
+ } catch(NumberFormatException nfe) {
+ throw (IOException)new IOException("blockFile=" + blockFile
+ + ", metaFile=" + metaFile).initCause(nfe);
+ }
+ }
+
+ File findBlockFile(Block b) {
+ assert b.generationStamp == GenerationStamp.WILDCARD_STAMP;
+
+ File blockfile = null;
+ ActiveFile activefile = ongoingCreates.get(b);
+ if (activefile != null) {
+ blockfile = activefile.file;
+ }
+ if (blockfile == null) {
+ blockfile = getFile(b);
+ }
+ if (blockfile == null) {
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("ongoingCreates=" + ongoingCreates);
+ DataNode.LOG.debug("volumeMap=" + volumeMap);
+ }
+ }
+ return blockfile;
+ }
+
+ /** {@inheritDoc} */
+ public Block getStoredBlock(long blkid) throws IOException {
+ Block b = new Block(blkid);
+ File blockfile = findBlockFile(b);
+ if (blockfile == null) {
+ return null;
+ }
+ File metafile = findMetaFile(blockfile);
+ b.generationStamp = parseGenerationStamp(blockfile, metafile);
+ b.len = blockfile.length();
+ return b;
}
public boolean metaFileExists(Block b) throws IOException {
@@ -624,11 +695,7 @@
* Find the block's on-disk length
*/
public long getLength(Block b) throws IOException {
- File f = validateBlockFile(b);
- if(f == null) {
- throw new IOException("Block " + b + " is not valid.");
- }
- return f.length();
+ return getBlockFile(b).length();
}
/**
@@ -637,6 +704,9 @@
protected synchronized File getBlockFile(Block b) throws IOException {
File f = validateBlockFile(b);
if(f == null) {
+ if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
+ InterDatanodeProtocol.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
+ }
throw new IOException("Block " + b + " is not valid.");
}
return f;
@@ -680,6 +750,126 @@
return info.detachBlock(block, numLinks);
}
+ static private <T> void updateBlockMap(Map<Block, T> blockmap,
+ Block oldblock, Block newblock) throws IOException {
+ if (blockmap.containsKey(oldblock)) {
+ T value = blockmap.remove(oldblock);
+ blockmap.put(newblock, value);
+ }
+ }
+
+ /** interrupt and wait for all ongoing create threads */
+ private synchronized void interruptOngoingCreates(Block b) {
+ //remove ongoingCreates threads
+ ActiveFile activefile = ongoingCreates.get(b);
+ if (activefile != null) {
+ for(Thread t : activefile.threads) {
+ t.interrupt();
+ }
+ for(Thread t : activefile.threads) {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ DataNode.LOG.warn("interruptOngoingCreates: b=" + b
+ + ", activeFile=" + activefile + ", t=" + t, e);
+ }
+ }
+ activefile.threads.clear();
+ }
+ }
+ /** {@inheritDoc} */
+ public synchronized void updateBlock(Block oldblock, Block newblock
+ ) throws IOException {
+ if (oldblock.getBlockId() != newblock.getBlockId()) {
+ throw new IOException("Cannot update oldblock (=" + oldblock
+ + ") to newblock (=" + newblock + ").");
+ }
+
+ File blockFile = findBlockFile(oldblock);
+ interruptOngoingCreates(oldblock);
+
+ File oldMetaFile = getMetaFile(blockFile, oldblock);
+ long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
+
+ //rename meta file to a tmp file
+ File tmpMetaFile = new File(oldMetaFile.getParent(),
+ oldMetaFile.getName()+"_tmp");
+ if (!oldMetaFile.renameTo(tmpMetaFile)){
+ throw new IOException("Cannot rename block meta file to " + tmpMetaFile);
+ }
+
+ //update generation stamp
+ if (oldgs > newblock.generationStamp) {
+ throw new IOException("Cannot update block (id=" + newblock.blkid
+ + ") generation stamp from " + oldgs
+ + " to " + newblock.generationStamp);
+ }
+
+ //update length
+ if (newblock.len > oldblock.len) {
+ throw new IOException("Cannot update block file (=" + blockFile
+ + ") length from " + oldblock.len + " to " + newblock.len);
+ }
+ if (newblock.len < oldblock.len) {
+ truncateBlock(blockFile, tmpMetaFile, oldblock.len, newblock.len);
+ }
+
+ //rename the tmp file to the new meta file (with new generation stamp)
+ File newMetaFile = getMetaFile(blockFile, newblock);
+ if (!tmpMetaFile.renameTo(newMetaFile)) {
+ throw new IOException("Cannot rename tmp meta file to " + newMetaFile);
+ }
+
+ updateBlockMap(ongoingCreates, oldblock, newblock);
+ updateBlockMap(volumeMap, oldblock, newblock);
+ }
+
+ static private void truncateBlock(File blockFile, File metaFile,
+ long oldlen, long newlen) throws IOException {
+ if (newlen == oldlen) {
+ return;
+ }
+ if (newlen > oldlen) {
+ throw new IOException("Cannout truncate block to from oldlen (=" + oldlen
+ + ") to newlen (=" + newlen + ")");
+ }
+
+ DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
+ int checksumsize = dcs.getChecksumSize();
+ int bpc = dcs.getBytesPerChecksum();
+ long n = (newlen - 1)/bpc + 1;
+ long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;
+ long lastchunkoffset = (n - 1)*bpc;
+ int lastchunksize = (int)(newlen - lastchunkoffset);
+ byte[] b = new byte[Math.max(lastchunksize, checksumsize)];
+
+ RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
+ try {
+ //truncate blockFile
+ blockRAF.setLength(newlen);
+
+ //read last chunk
+ blockRAF.seek(lastchunkoffset);
+ blockRAF.readFully(b, 0, lastchunksize);
+ } finally {
+ blockRAF.close();
+ }
+
+ //compute checksum
+ dcs.update(b, 0, lastchunksize);
+ dcs.writeValue(b, 0, false);
+
+ //update metaFile
+ RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+ try {
+ metaRAF.setLength(newmetalen);
+ metaRAF.seek(newmetalen - checksumsize);
+ metaRAF.write(b, 0, checksumsize);
+ } finally {
+ metaRAF.close();
+ }
+ }
+
/**
* Start writing to a block file
* If isRecovery is true and the block pre-exists, then we kill all
@@ -880,6 +1070,9 @@
File f = getFile(b);
if(f != null && f.exists())
return f;
+ if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
+ InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f);
+ }
return null;
}
@@ -1020,12 +1213,4 @@
public String getStorageInfo() {
return toString();
}
-
- /**
- * A duplicate of {@link #getLength()}
- */
- @Deprecated
- public long getBlockSize(Block b) {
- return getFile(b).length();
- }
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java Mon Jun 2 11:43:01 2008
@@ -89,7 +89,12 @@
* @throws IOException
*/
public long getLength(Block b) throws IOException;
-
+
+ /**
+ * @return the generation stamp stored with the block.
+ */
+ public Block getStoredBlock(long blkid) throws IOException;
+
/**
* Returns an input stream to read the contents of the specified block
* @param b
@@ -136,6 +141,11 @@
public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException;
/**
+ * Update the block to the new generation stamp and length.
+ */
+ public void updateBlock(Block oldblock, Block newblock) throws IOException;
+
+ /**
* Finalizes the block previously opened for writing using writeToBlock.
* The block size is what is in the parameter b and it must match the amount
* of data written
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Mon Jun 2 11:43:01 2008
@@ -127,7 +127,7 @@
/**
* Add the given filename to the fs.
*/
- INode addFile(String path,
+ INodeFileUnderConstruction addFile(String path,
PermissionStatus permissions,
short replication,
long preferredBlockSize,
@@ -166,7 +166,6 @@
}
// add create file record to log, record new generation stamp
fsImage.getEditLog().logOpenFile(path, newNode);
- fsImage.getEditLog().logGenerationStamp(generationStamp);
NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
+path+" is added to the file system");
@@ -287,13 +286,14 @@
*/
void closeFile(String path, INodeFile file) throws IOException {
waitForReady();
-
synchronized (rootDir) {
// file is closed
fsImage.getEditLog().logCloseFile(path, file);
- NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
+path+" with "+ file.getBlocks().length
+" blocks is persisted to the file system");
+ }
}
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Mon Jun 2 11:43:01 2008
@@ -527,7 +527,7 @@
clientMachine,
null);
fsDir.replaceNode(path, node, cons);
- fsNamesys.leaseManager.addLease(path, clientName);
+ fsNamesys.leaseManager.addLease(cons.clientName, path);
} else if (opcode == OP_CLOSE) {
//
// Remove lease if it exists.
@@ -535,7 +535,7 @@
if (old.isUnderConstruction()) {
INodeFileUnderConstruction cons = (INodeFileUnderConstruction)
old;
- fsNamesys.leaseManager.removeLease(path, cons.getClientName());
+ fsNamesys.leaseManager.removeLease(cons.clientName, path);
}
}
break;
@@ -573,7 +573,7 @@
old = fsDir.unprotectedDelete(path, timestamp, null);
if (old != null && old.isUnderConstruction()) {
INodeFileUnderConstruction cons = (INodeFileUnderConstruction)old;
- fsNamesys.leaseManager.removeLease(path, cons.getClientName());
+ fsNamesys.leaseManager.removeLease(cons.clientName, path);
}
break;
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Mon Jun 2 11:43:01 2008
@@ -1031,7 +1031,7 @@
}
INodeFile oldnode = (INodeFile) old;
fsDir.replaceNode(path, oldnode, cons);
- fs.leaseManager.addLease(path, cons.getClientName());
+ fs.leaseManager.addLease(cons.clientName, path);
}
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon Jun 2 11:43:01 2008
@@ -96,9 +96,8 @@
FSDirectory dir;
//
- // Stores the block-->datanode(s) map. Updated only in response
- // to client-sent information.
// Mapping: Block -> { INode, datanodes, self ref }
+ // Updated only in response to client-sent information.
//
BlocksMap blocksMap = new BlocksMap();
@@ -238,9 +237,8 @@
/**
* The global generation stamp for this file system.
- * Valid values start from 1000.
*/
- private GenerationStamp generationStamp = new GenerationStamp(1000);
+ private final GenerationStamp generationStamp = new GenerationStamp();
// Ask Datanode only up to this many blocks to delete.
private int blockInvalidateLimit = FSConstants.BLOCK_INVALIDATE_CHUNK;
@@ -931,7 +929,7 @@
// If the file is under construction , then it must be in our
// leases. Find the appropriate lease record.
//
- Lease lease = leaseManager.getLease(holder);
+ Lease lease = leaseManager.getLease(new StringBytesWritable(holder));
//
// We found the lease for this file. And surprisingly the original
// holder is trying to recreate this file. This should never occur.
@@ -945,7 +943,7 @@
//
// Find the original holder.
//
- lease = leaseManager.getLease(pendingFile.getClientName());
+ lease = leaseManager.getLease(pendingFile.clientName);
if (lease == null) {
throw new AlreadyBeingCreatedException(
"failed to create file " + src + " for " + holder +
@@ -958,8 +956,9 @@
// to proceed. Otherwise, prevent this request from creating file.
//
if (lease.expiredSoftLimit()) {
- LOG.info("startFile: Removing lease " + lease);
- leaseManager.removeExpiredLease(lease);
+ LOG.info("startFile: recover lease " + lease + ", src=" + src);
+ internalReleaseLease(lease, src);
+ leaseManager.renewLease(lease);
} else {
throw new AlreadyBeingCreatedException(
"failed to create file " + src + " for " + holder +
@@ -988,8 +987,6 @@
DatanodeDescriptor clientNode =
host2DataNodeMap.getDatanodeByHost(clientMachine);
- leaseManager.addLease(src, holder);
-
//
// Now we can add the name to the filesystem. This file has no
// blocks associated with it.
@@ -997,22 +994,64 @@
checkFsObjectLimit();
// increment global generation stamp
- long genstamp = generationStamp.nextStamp();
-
- INode newNode = dir.addFile(src, permissions,
+ long genstamp = nextGenerationStamp();
+ INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
replication, blockSize, holder, clientMachine, clientNode, genstamp);
if (newNode == null) {
throw new IOException("DIR* NameSystem.startFile: " +
"Unable to add file to namespace.");
}
+ leaseManager.addLease(newNode.clientName, src);
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
+ie.getMessage());
throw ie;
}
- NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+"add "+src+" to namespace for "+holder);
+ }
+ }
+
+ /** append is not yet ready. This method is for testing. */
+ void appendFileInternal(String src, String holder, String clientMachine
+ ) throws IOException {
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
+ +src+" for "+holder+" at "+clientMachine);
+ }
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot append file" + src, safeMode);
+ if (!isValidName(src)) {
+ throw new IOException("Invalid file name: " + src);
+ }
+ if (isPermissionEnabled) {
+ checkPathAccess(src, FsAction.WRITE);
+ }
+
+ try {
+ INodeFile f = dir.getFileINode(src);
+ //assume f != null && !f.isUnderConstruction() && lease does not exist
+ //TODO: remove the assumption
+
+ DatanodeDescriptor clientNode = host2DataNodeMap.getDatanodeByHost(
+ clientMachine);
+ INodeFileUnderConstruction newnode = f.toINodeFileUnderConstruction(
+ holder, clientMachine, clientNode);
+
+ dir.replaceNode(src, f, newnode);
+ leaseManager.addLease(newnode.clientName, src);
+
+ } catch (IOException ie) {
+ NameNode.stateChangeLog.warn("DIR* NameSystem.appendFile: ", ie);
+ throw ie;
+ }
+
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: "
+ +"add "+src+" to namespace for "+holder);
+ }
}
/**
@@ -1079,6 +1118,7 @@
// allocate new block record block locations in INode.
newBlock = allocateBlock(src, pendingFile);
+ pendingFile.targets = targets;
}
// Create next block
@@ -1108,7 +1148,7 @@
) throws IOException {
INode file = dir.getFileINode(src);
if (file == null) {
- Lease lease = leaseManager.getLease(holder);
+ Lease lease = leaseManager.getLease(new StringBytesWritable(holder));
throw new LeaseExpiredException("No lease on " + src +
" File does not exist. " +
(lease != null ? lease.toString() :
@@ -1116,7 +1156,7 @@
" does not have any open files."));
}
if (!file.isUnderConstruction()) {
- Lease lease = leaseManager.getLease(holder);
+ Lease lease = leaseManager.getLease(new StringBytesWritable(holder));
throw new LeaseExpiredException("No lease on " + src +
" File is not open for writing. " +
(lease != null ? lease.toString() :
@@ -1167,30 +1207,13 @@
} else if (!checkFileProgress(pendingFile, true)) {
return STILL_WAITING;
}
-
- // The file is no longer pending.
- // Create permanent INode, update blockmap
- INodeFile newFile = pendingFile.convertToInodeFile();
- dir.replaceNode(src, pendingFile, newFile);
- // close file and persist block allocations for this file
- dir.closeFile(src, newFile);
+ finalizeINodeFileUnderConstruction(src, pendingFile);
- NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
+ " blocklist persisted");
-
- leaseManager.removeLease(src, holder);
-
- //
- // REMIND - mjc - this should be done only after we wait a few secs.
- // The namenode isn't giving datanodes enough time to report the
- // replicated blocks that are automatically done as part of a client
- // write.
- //
-
- // Now that the file is real, we need to be sure to replicate
- // the blocks.
- checkReplicationFactor(newFile);
+ }
return COMPLETE_SUCCESS;
}
@@ -1291,27 +1314,15 @@
* dumps the contents of recentInvalidateSets
*/
private synchronized void dumpRecentInvalidateSets(PrintWriter out) {
- Collection<Collection<Block>> values = recentInvalidateSets.values();
- Iterator<Map.Entry<String,Collection<Block>>> it =
- recentInvalidateSets.entrySet().iterator();
- if (values.size() == 0) {
- out.println("Metasave: Blocks waiting deletion: 0");
+ int size = recentInvalidateSets.values().size();
+ out.println("Metasave: Blocks waiting deletion from "+size+" datanodes.");
+ if (size == 0) {
return;
}
- out.println("Metasave: Blocks waiting deletion from " +
- values.size() + " datanodes.");
- while (it.hasNext()) {
- Map.Entry<String,Collection<Block>> entry = it.next();
- String storageId = entry.getKey();
- DatanodeDescriptor node = datanodeMap.get(storageId);
- Collection<Block> blklist = entry.getValue();
- if (blklist.size() > 0) {
- out.print(node.getName());
- for (Iterator jt = blklist.iterator(); jt.hasNext();) {
- Block block = (Block) jt.next();
- out.print(" " + block);
- }
- out.println("");
+ for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
+ Collection<Block> blocks = entry.getValue();
+ if (blocks.size() > 0) {
+ out.println(datanodeMap.get(entry.getKey()).getName() + blocks);
}
}
}
@@ -1478,7 +1489,7 @@
}
if (old.isUnderConstruction()) {
INodeFileUnderConstruction cons = (INodeFileUnderConstruction) old;
- leaseManager.removeLease(src, cons.getClientName());
+ leaseManager.removeLease(cons.clientName, src);
}
return true;
}
@@ -1602,7 +1613,11 @@
* @param src The filename
* @param holder The datanode that was creating the file
*/
- void internalReleaseCreate(String src, String holder) throws IOException {
+ void internalReleaseLease(Lease lease, String src) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("lease=" + lease + ", src=" + src);
+ }
+
INodeFile iFile = dir.getFileINode(src);
if (iFile == null) {
NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: "
@@ -1616,30 +1631,20 @@
+ src + " but file is already closed.");
return;
}
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: "
+ + src + " does not being written in " + lease);
+ }
+
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
- // The last block that was allocated migth not have been used by the
- // client. In this case, the size of the last block would be 0. A fsck
- // will report this block as a missing block because no datanodes have it.
- // Delete this block.
- Block[] blocks = pendingFile.getBlocks();
- if (blocks != null && blocks.length > 0) {
- Block last = blocks[blocks.length - 1];
- if (last.getNumBytes() == 0) {
- pendingFile.removeBlock(last);
- blocksMap.removeINode(last);
- for (Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator(last); it.hasNext();) {
- DatanodeDescriptor node = it.next();
- addToInvalidates(last, node);
- }
- /* What else do we need to do?
- * removeStoredBlock()? we do different things when a block is
- * removed in different contexts. Mostly these should be
- * same and/or should be in one place.
- */
- }
- }
+ // Initialize lease recovery for pendingFile
+ pendingFile.assignPrimaryDatanode();
+ }
+
+ private void finalizeINodeFileUnderConstruction(String src,
+ INodeFileUnderConstruction pendingFile) throws IOException {
+ leaseManager.removeLease(pendingFile.clientName, src);
// The file is no longer pending.
// Create permanent INode, update blockmap
@@ -1649,14 +1654,58 @@
// close file and persist block allocations for this file
dir.closeFile(src, newFile);
- // replicate blocks of this file.
checkReplicationFactor(newFile);
-
- NameNode.stateChangeLog.info("DIR* NameSystem.internalReleaseCreate: " +
- src + " is no longer written to by " +
- holder);
}
+ synchronized void commitBlockSynchronization(Block lastblock,
+ long newgenerationstamp, long newlength,
+ boolean closeFile, boolean deleteblock, DatanodeID[] newtargets
+ ) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("commitBlockSynchronization(lastblock=" + lastblock
+ + ", newgenerationstamp=" + newgenerationstamp
+ + ", newlength=" + newlength
+ + ", newtargets=" + Arrays.asList(newtargets) + ")");
+ }
+ BlockInfo blockinfo = blocksMap.getStoredBlock(lastblock);
+ if (blockinfo == null) {
+ throw new IOException("Block (=" + lastblock + ") not found");
+ }
+ INodeFile iFile = blockinfo.getINode();
+ if (!iFile.isUnderConstruction()) {
+ throw new IOException("Unexpected block (=" + lastblock
+ + ") since the file (=" + iFile.getLocalName()
+ + ") is not under construction");
+ }
+
+ //update block info
+ if (newtargets.length > 0) {
+ DatanodeDescriptor[] descriptors = new DatanodeDescriptor[newtargets.length];
+ for(int i = 0; i < newtargets.length; i++) {
+ descriptors[i] = getDatanode(newtargets[i]);
+ }
+ blockinfo.update(newgenerationstamp, newlength, descriptors);
+ }
+
+ // If this commit does not want to close the file, just persist
+ // block locations and return
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
+ String src = leaseManager.findPath(pendingFile);
+ if (deleteblock) {
+ dir.removeBlock(src, pendingFile, lastblock);
+ }
+ if (!closeFile) {
+ dir.persistBlocks(src, pendingFile);
+ getEditLog().logSync();
+ return;
+ }
+
+ //remove lease, close file
+ finalizeINodeFileUnderConstruction(src, pendingFile);
+ getEditLog().logSync();
+ }
+
+
/**
* Renew the lease(s) held by the given client
*/
@@ -1970,6 +2019,10 @@
nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
updateStats(nodeinfo, true);
+ //check lease recovery
+ if (cmd == null) {
+ cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
+ }
//check pending replication
if (cmd == null) {
cmd = nodeinfo.getReplicationCommand(
@@ -4187,6 +4240,15 @@
return generationStamp.getStamp();
}
+ /**
+ * Increments, logs and then returns the stamp
+ */
+ long nextGenerationStamp() {
+ long gs = generationStamp.nextStamp();
+ getEditLog().logGenerationStamp(gs);
+ return gs;
+ }
+
// rename was successful. If any part of the renamed subtree had
// files that were being written to, update with new filename.
//
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStamp.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStamp.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStamp.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStamp.java Mon Jun 2 11:43:01 2008
@@ -23,31 +23,29 @@
/****************************************************************
* A GenerationStamp is a Hadoop FS primitive, identified by a long.
****************************************************************/
-class GenerationStamp implements WritableComparable {
+class GenerationStamp implements WritableComparable<GenerationStamp> {
+ public static final long WILDCARD_STAMP = 1;
+ public static final long FIRST_VALID_STAMP = 1000L;
static { // register a ctor
WritableFactories.setFactory
(GenerationStamp.class,
new WritableFactory() {
- public Writable newInstance() { return new GenerationStamp(); }
+ public Writable newInstance() { return new GenerationStamp(0); }
});
}
long genstamp;
/**
- * Create a new instance, initialized to 0.
+ * Create a new instance, initialized to FIRST_VALID_STAMP.
*/
- public GenerationStamp() {
- this.genstamp = 0;
- }
+ GenerationStamp() {this(GenerationStamp.FIRST_VALID_STAMP);}
/**
* Create a new instance, initialized to the specified value.
*/
- public GenerationStamp(long stamp) {
- this.genstamp = stamp;
- }
+ GenerationStamp(long stamp) {this.genstamp = stamp;}
/**
* Returns the current generation stamp
@@ -64,9 +62,9 @@
}
/**
- * Returns the current stamp and incrments the counter
+ * First increments the counter and then returns the stamp
*/
- public long nextStamp() {
+ public synchronized long nextStamp() {
this.genstamp++;
return this.genstamp;
}
@@ -88,23 +86,28 @@
/////////////////////////////////////
// Comparable
/////////////////////////////////////
- public int compareTo(Object o) {
- GenerationStamp b = (GenerationStamp) o;
- if (genstamp < b.genstamp) {
- return -1;
- } else if (genstamp == b.genstamp) {
- return 0;
- } else {
- return 1;
- }
+ static int compare(long x, long y) {
+ return x < y? -1: x == y? 0: 1;
+ }
+
+ /** {@inheritDoc} */
+ public int compareTo(GenerationStamp that) {
+ return compare(this.genstamp, that.genstamp);
}
+
+ /** {@inheritDoc} */
public boolean equals(Object o) {
if (!(o instanceof GenerationStamp)) {
return false;
}
return genstamp == ((GenerationStamp)o).genstamp;
}
-
+
+ static boolean equalsWithWildcard(long x, long y) {
+ return x == y || x == WILDCARD_STAMP || y == WILDCARD_STAMP;
+ }
+
+ /** {@inheritDoc} */
public int hashCode() {
return 37 * 17 + (int) (genstamp^(genstamp>>>32));
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java Mon Jun 2 11:43:01 2008
@@ -713,27 +713,6 @@
}
/**
- * remove a block from the block list. This block should be
- * the last one on the list.
- */
- void removeBlock(Block oldblock) throws IOException {
- if (this.blocks == null) {
- throw new IOException("Trying to delete non-existant block " +
- oldblock);
- }
- int size = this.blocks.length;
- if (!this.blocks[size-1].equals(oldblock)) {
- throw new IOException("Trying to delete non-existant block " +
- oldblock);
- }
- BlockInfo[] newlist = new BlockInfo[size - 1];
- for (int i = 0; i < size-1; i++) {
- newlist[i] = this.blocks[i];
- }
- this.blocks = newlist;
- }
-
- /**
* Set file block
*/
void setBlock(int idx, BlockInfo blk) {
@@ -777,18 +756,29 @@
}
return blocks[blocks.length - 2];
}
+
+ INodeFileUnderConstruction toINodeFileUnderConstruction(
+ String clientName, String clientMachine, DatanodeDescriptor clientNode
+ ) throws IOException {
+ if (isUnderConstruction()) {
+ return (INodeFileUnderConstruction)this;
+ }
+ return new INodeFileUnderConstruction(name,
+ blockReplication, modificationTime, preferredBlockSize,
+ blocks, getPermissionStatus(),
+ clientName, clientMachine, clientNode);
+ }
}
class INodeFileUnderConstruction extends INodeFile {
- protected StringBytesWritable clientName; // lease holder
- protected StringBytesWritable clientMachine;
- protected DatanodeDescriptor clientNode; // if client is a cluster node too.
-
- INodeFileUnderConstruction() {
- clientName = null;
- clientMachine = null;
- clientNode = null;
- }
+ StringBytesWritable clientName = null; // lease holder
+ StringBytesWritable clientMachine = null;
+ DatanodeDescriptor clientNode = null; // if client is a cluster node too.
+
+ private int primaryNodeIndex = -1; //the node working on lease recovery
+ DatanodeDescriptor[] targets = null; //locations for last block
+
+ INodeFileUnderConstruction() {}
INodeFileUnderConstruction(PermissionStatus permissions,
short replication,
@@ -855,4 +845,49 @@
return obj;
}
+
+ /**
+ * remove a block from the block list. This block should be
+ * the last one on the list.
+ */
+ void removeBlock(Block oldblock) throws IOException {
+ if (blocks == null) {
+ throw new IOException("Trying to delete non-existant block " + oldblock);
+ }
+ int size_1 = blocks.length - 1;
+ if (!blocks[size_1].equals(oldblock)) {
+ throw new IOException("Trying to delete non-last block " + oldblock);
+ }
+
+ //copy to a new list
+ BlockInfo[] newlist = new BlockInfo[size_1];
+ System.arraycopy(blocks, 0, newlist, 0, size_1);
+ blocks = newlist;
+
+ // Remove the block locations for the last block.
+ targets = null;
+ }
+
+ /**
+ * Initialize lease recovery for this object
+ * @return the chosen primary datanode
+ */
+ void assignPrimaryDatanode() {
+ //assign the first alive datanode as the primary datanode
+ if (targets.length == 0) {
+ NameNode.stateChangeLog.warn("BLOCK*"
+ + " INodeFileUnderConstruction.initLeaseRecovery:"
+ + " all targets are not alive.");
+ }
+
+ int previous = primaryNodeIndex;
+ //find an alive datanode beginning from previous
+ for(int i = 1; i <= targets.length; i++) {
+ int j = (previous + i)%targets.length;
+ if (targets[j].isAlive) {
+ DatanodeDescriptor primary = targets[primaryNodeIndex = j];
+ primary.addBlockToBeRecovered(blocks[blocks.length - 1], targets);
+ }
+ }
+ }
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java Mon Jun 2 11:43:01 2008
@@ -30,17 +30,17 @@
public static final Log LOG = LogFactory.getLog(InterDatanodeProtocol.class);
/**
- * 1: added getBlockMetaDataInfo and updateGenerationStamp
+ * 2: change updateGenerationStamp to updataBlock
*/
- public static final long versionID = 1L;
+ public static final long versionID = 2L;
- /** @return the BlockMetaDataInfo of a block */
+ /** @return the BlockMetaDataInfo of a block;
+ * null if the block is not found
+ */
BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;
/**
- * Update the GenerationStamp of a block
- * @return true iff update was required and done successfully
+ * Update the block to the new generation stamp and length.
*/
- boolean updateGenerationStamp(Block block, GenerationStamp generationstamp
- ) throws IOException;
+ void updateBlock(Block oldblock, Block newblock) throws IOException;
}
\ No newline at end of file
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java Mon Jun 2 11:43:01 2008
@@ -18,14 +18,39 @@
package org.apache.hadoop.dfs;
import java.io.IOException;
+import java.io.FileNotFoundException;
import java.util.*;
import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+/**
+ * LeaseManager does the lease housekeeping for writing on files.
+ * This class also provides useful static methods for lease recovery.
+ *
+ * Lease Recovery Algorithm
+ * 1) Namenode retrieves lease information
+ * 2) For each file f in the lease, consider the last block b of f
+ * 2.1) Get the datanodes which contains b
+ * 2.2) Assign one of the datanodes as the primary datanode p
+
+ * 2.3) p obtains a new generation stamp form the namenode
+ * 2.4) p get the block info from each datanode
+ * 2.5) p computes the minimum block length
+ * 2.6) p updates the datanodes, which have a valid generation stamp,
+ * with the new generation stamp and the minimum block length
+ * 2.7) p acknowledges the namenode the update results
+
+ * 2.8) Namenode updates the BlockInfo
+ * 2.9) Namenode removes f from the lease
+ * and removes the lease once all files have been removed
+ * 2.10) Namenode commit changes to edit log
+ */
class LeaseManager {
- static final Log LOG = FSNamesystem.LOG;
+ static final Log LOG = LogFactory.getLog(LeaseManager.class);
private final FSNamesystem fsnamesystem;
@@ -48,8 +73,8 @@
LeaseManager(FSNamesystem fsnamesystem) {this.fsnamesystem = fsnamesystem;}
- Lease getLease(String holder) throws IOException {
- return leases.get(new StringBytesWritable(holder));
+ Lease getLease(StringBytesWritable holder) throws IOException {
+ return leases.get(holder);
}
SortedSet<Lease> getSortedLeases() {return sortedLeases;}
@@ -72,42 +97,68 @@
/**
* Adds (or re-adds) the lease for the specified file.
*/
- synchronized void addLease(String src, String holder) throws IOException {
+ synchronized void addLease(StringBytesWritable holder, String src
+ ) throws IOException {
Lease lease = getLease(holder);
if (lease == null) {
lease = new Lease(holder);
- leases.put(new StringBytesWritable(holder), lease);
+ leases.put(holder, lease);
sortedLeases.add(lease);
} else {
- sortedLeases.remove(lease);
- lease.renew();
- sortedLeases.add(lease);
+ renewLease(lease);
}
sortedLeasesByPath.put(src, lease);
- lease.startedCreate(src);
+ lease.paths.add(new StringBytesWritable(src));
}
/**
- * deletes the lease for the specified file
+ * Remove the specified lease and src.
*/
- synchronized void removeLease(String src, String holder) throws IOException {
- Lease lease = getLease(holder);
- if (lease != null) {
- lease.completedCreate(src);
- sortedLeasesByPath.remove(src);
+ synchronized void removeLease(Lease lease, String src) throws IOException {
+ sortedLeasesByPath.remove(src);
+ if (!lease.paths.remove(new StringBytesWritable(src))) {
+ LOG.error(src + " not found in lease.paths (=" + lease.paths + ")");
+ }
+
+ if (!lease.hasPath()) {
+ leases.remove(lease.holder);
+ if (!sortedLeases.remove(lease)) {
+ LOG.error(lease + " not found in sortedLeases");
+ }
+ }
+ }
+
+ /**
+ * Remove the lease for the specified holder and src
+ */
+ synchronized void removeLease(StringBytesWritable holder, String src
+ ) throws IOException {
+ removeLease(getLease(holder), src);
+ }
- if (!lease.hasPath()) {
- leases.remove(new StringBytesWritable(holder));
- sortedLeases.remove(lease);
+ /**
+ * Finds the pathname for the specified pendingFile
+ */
+ synchronized String findPath(INodeFileUnderConstruction pendingFile
+ ) throws IOException {
+ Lease lease = getLease(pendingFile.clientName);
+ if (lease != null) {
+ String src = lease.findPath(pendingFile);
+ if (src != null) {
+ return src;
}
}
+ throw new IOException("pendingFile (=" + pendingFile + ") not found."
+ + "(lease=" + lease + ")");
}
/**
* Renew the lease(s) held by the given client
*/
synchronized void renewLease(String holder) throws IOException {
- Lease lease = getLease(holder);
+ renewLease(getLease(new StringBytesWritable(holder)));
+ }
+ synchronized void renewLease(Lease lease) {
if (lease != null) {
sortedLeases.remove(lease);
lease.renew();
@@ -115,21 +166,6 @@
}
}
- synchronized void removeExpiredLease(Lease lease) throws IOException {
- String holder = lease.holder.getString();
- for(StringBytesWritable sbw : lease.paths) {
- String p = sbw.getString();
- fsnamesystem.internalReleaseCreate(p, holder);
- sortedLeasesByPath.remove(p);
- }
- lease.paths.clear();
-
- leases.remove(lease.holder);
- if (!sortedLeases.remove(lease)) {
- LOG.error("removeExpiredLease: " + lease + " not found in sortedLeases");
- }
- }
-
/************************************************************
* A Lease governs all the locks held by a single client.
* For each client there's a corresponding lease, whose
@@ -142,11 +178,13 @@
private long lastUpdate;
private Collection<StringBytesWritable> paths = new TreeSet<StringBytesWritable>();
- public Lease(String holder) throws IOException {
- this.holder = new StringBytesWritable(holder);
+ /** Only LeaseManager object can create a lease */
+ private Lease(StringBytesWritable holder) throws IOException {
+ this.holder = holder;
renew();
}
- public void renew() {
+ /** Only LeaseManager object can renew a lease */
+ private void renew() {
this.lastUpdate = FSNamesystem.now();
}
@@ -160,14 +198,20 @@
return FSNamesystem.now() - lastUpdate > softLimit;
}
- void startedCreate(String src) throws IOException {
- paths.add(new StringBytesWritable(src));
- }
-
- boolean completedCreate(String src) throws IOException {
- return paths.remove(new StringBytesWritable(src));
+ /**
+ * @return the path associated with the pendingFile and null if not found.
+ */
+ private String findPath(INodeFileUnderConstruction pendingFile) {
+ for(Iterator<StringBytesWritable> i = paths.iterator(); i.hasNext(); ) {
+ String src = i.next().toString();
+ if (fsnamesystem.dir.getFileINode(src) == pendingFile) {
+ return src;
+ }
+ }
+ return null;
}
+ /** Does this lease contain any path? */
boolean hasPath() {return !paths.isEmpty();}
/** {@inheritDoc} */
@@ -309,8 +353,11 @@
((top = sortedLeases.first()) != null)) {
if (top.expiredHardLimit()) {
LOG.info("Lease Monitor: Removing lease " + top
- + ", leases remaining: " + sortedLeases.size());
- removeExpiredLease(top);
+ + ", sortedLeases.size()=: " + sortedLeases.size());
+ for(StringBytesWritable s : top.paths) {
+ fsnamesystem.internalReleaseLease(top, s.getString());
+ }
+ renewLease(top);
} else {
break;
}
@@ -330,4 +377,119 @@
}
}
}
+
+ /*
+ * The following codes provides useful static methods for lease recovery.
+ */
+ /** A convenient class used in lease recovery */
+ private static class BlockRecord {
+ final DatanodeID id;
+ final InterDatanodeProtocol datanode;
+ final Block block;
+
+ BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
+ this.id = id;
+ this.datanode = datanode;
+ this.block = block;
+ }
+
+ public String toString() {
+ return "block:" + block + " node:" + id;
+ }
+ }
+
+ /**
+ * Recover a list of blocks.
+ * This method is invoked by the primary datanode.
+ */
+ static void recoverBlocks(Block[] blocks, DatanodeID[][] targets,
+ DatanodeProtocol namenode, Configuration conf) {
+ for(int i = 0; i < blocks.length; i++) {
+ try {
+ recoverBlock(blocks[i], targets[i], namenode, conf, true);
+ } catch (IOException e) {
+ LOG.warn("recoverBlocks, i=" + i, e);
+ }
+ }
+ }
+
+ /** Recover a block */
+ static Block recoverBlock(Block block, DatanodeID[] datanodeids,
+ DatanodeProtocol namenode, Configuration conf,
+ boolean closeFile) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("block=" + block
+ + ", datanodeids=" + Arrays.asList(datanodeids));
+ }
+ List<BlockRecord> syncList = new ArrayList<BlockRecord>();
+ long minlength = Long.MAX_VALUE;
+ int errorCount = 0;
+
+ //check generation stamps
+ for(DatanodeID id : datanodeids) {
+ try {
+ InterDatanodeProtocol datanode
+ = DataNode.createInterDataNodeProtocolProxy(id, conf);
+ BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
+ if (info != null && info.getGenerationStamp() >= block.generationStamp) {
+ syncList.add(new BlockRecord(id, datanode, new Block(info)));
+ if (info.len < minlength) {
+ minlength = info.len;
+ }
+ }
+ } catch (IOException e) {
+ ++errorCount;
+ InterDatanodeProtocol.LOG.warn(
+ "Failed to getBlockMetaDataInfo for block (=" + block
+ + ") from datanode (=" + id + ")", e);
+ }
+ }
+
+ if (syncList.isEmpty() && errorCount > 0) {
+ throw new IOException("All datanodes failed: block=" + block
+ + ", datanodeids=" + Arrays.asList(datanodeids));
+ }
+ return syncBlock(block, minlength, syncList, namenode, closeFile);
+ }
+
+ /** Block synchronization */
+ private static Block syncBlock(Block block, long minlength,
+ List<BlockRecord> syncList, DatanodeProtocol namenode,
+ boolean closeFile) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("block=" + block + ", minlength=" + minlength
+ + ", syncList=" + syncList + ", closeFile=" + closeFile);
+ }
+
+ //syncList.isEmpty() that all datanodes do not have the block
+ //so the block can be deleted.
+ if (syncList.isEmpty()) {
+ namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
+ DatanodeID.EMPTY_ARRAY);
+ return null;
+ }
+
+ List<DatanodeID> successList = new ArrayList<DatanodeID>();
+
+ long generationstamp = namenode.nextGenerationStamp();
+ Block newblock = new Block(block.blkid, minlength, generationstamp);
+
+ for(BlockRecord r : syncList) {
+ try {
+ r.datanode.updateBlock(r.block, newblock);
+ successList.add(r.id);
+ } catch (IOException e) {
+ InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+ + newblock + ", datanode=" + r.id + ")", e);
+ }
+ }
+
+ if (!successList.isEmpty()) {
+ namenode.commitBlockSynchronization(block,
+ newblock.generationStamp, newblock.len, closeFile, false,
+ successList.toArray(new DatanodeID[successList.size()]));
+ return newblock; // success
+ }
+ return null; // failed
+ }
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Mon Jun 2 11:43:01 2008
@@ -288,6 +288,20 @@
myMetrics.numFilesCreated.inc();
}
+ /** Coming in a future release.... */
+ void append(String src, String clientName) throws IOException {
+ String clientMachine = getClientMachine();
+ if (stateChangeLog.isDebugEnabled()) {
+ stateChangeLog.debug("*DIR* NameNode.append: file "
+ +src+" for "+clientName+" at "+clientMachine);
+ }
+ //TODO: add namesystem.appendFile(...), which calls appendFileInternal(...)
+ namesystem.appendFileInternal(src, clientName, clientMachine);
+
+ //TODO: inc myMetrics;
+ }
+
+ /** {@inheritDoc} */
public boolean setReplication(String src,
short replication
) throws IOException {
@@ -358,6 +372,20 @@
}
}
+ /** {@inheritDoc} */
+ public long nextGenerationStamp() {
+ return namesystem.nextGenerationStamp();
+ }
+
+ /** {@inheritDoc} */
+ public void commitBlockSynchronization(Block block,
+ long newgenerationstamp, long newlength,
+ boolean closeFile, boolean deleteblock, DatanodeID[] newtargets
+ ) throws IOException {
+ namesystem.commitBlockSynchronization(block,
+ newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
+ }
+
public long getPreferredBlockSize(String filename) throws IOException {
return namesystem.getPreferredBlockSize(filename);
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/StringBytesWritable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/StringBytesWritable.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/StringBytesWritable.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/StringBytesWritable.java Mon Jun 2 11:43:01 2008
@@ -46,6 +46,15 @@
return new String(get(),"UTF8");
}
+ /** {@inheritDoc} */
+ public String toString() {
+ try {
+ return getString();
+ } catch (IOException e) {
+ throw (RuntimeException)new RuntimeException().initCause(e);
+ }
+ }
+
/**
* Compare to a String.
*/
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java?rev=662513&r1=662512&r2=662513&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java Mon Jun 2 11:43:01 2008
@@ -97,6 +97,15 @@
oStream = null;
}
}
+
+ synchronized long getGenerationStamp() {
+ return theBlock.getGenerationStamp();
+ }
+
+ synchronized void updateBlock(Block b) {
+ theBlock.generationStamp = b.generationStamp;
+ setlength(b.len);
+ }
synchronized long getlength() {
if (!finalized) {
@@ -292,6 +301,27 @@
return binfo.getlength();
}
+ /** {@inheritDoc} */
+ public Block getStoredBlock(long blkid) throws IOException {
+ Block b = new Block(blkid);
+ BInfo binfo = blockMap.get(b);
+ if (binfo == null) {
+ return null;
+ }
+ b.generationStamp = binfo.getGenerationStamp();
+ b.len = binfo.getlength();
+ return b;
+ }
+
+ /** {@inheritDoc} */
+ public void updateBlock(Block oldblock, Block newblock) throws IOException {
+ BInfo binfo = blockMap.get(newblock);
+ if (binfo == null) {
+ throw new IOException("BInfo not found, b=" + newblock);
+ }
+ binfo.updateBlock(newblock);
+ }
+
public synchronized void invalidate(Block[] invalidBlks) throws IOException {
boolean error = false;
if (invalidBlks == null) {
@@ -586,5 +616,4 @@
public String getStorageInfo() {
return "Simulated FSDataset-" + storageId;
}
-
}