You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:06:08 UTC
svn commit: r885143 [9/18] - in /hadoop/hdfs/branches/HDFS-326: ./
.eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/
src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs/...
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Sat Nov 28 20:05:56 2009
@@ -29,11 +29,11 @@
import java.io.PrintStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Random;
import java.util.TreeSet;
import java.util.regex.Matcher;
@@ -49,6 +49,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.StringUtils;
@@ -211,8 +212,8 @@
private void init() {
// get the list of blocks and arrange them in random order
- Block arr[] = dataset.getBlockReport();
- Collections.shuffle(Arrays.asList(arr));
+ List<Block> arr = dataset.getFinalizedBlocks();
+ Collections.shuffle(arr);
blockInfoSet = new TreeSet<BlockScanInfo>();
blockMap = new HashMap<Block, BlockScanInfo>();
@@ -373,7 +374,7 @@
static private class LogEntry {
long blockId = -1;
long verificationTime = -1;
- long genStamp = Block.GRANDFATHER_GENERATION_STAMP;
+ long genStamp = GenerationStamp.GRANDFATHER_GENERATION_STAMP;
/**
* The format consists of single line with multiple entries. each
@@ -450,7 +451,6 @@
return;
} catch (IOException e) {
- totalScanErrors++;
updateScanStatus(block, ScanType.VERIFICATION_SCAN, false);
// If the block does not exists anymore, then its not an error
@@ -466,6 +466,7 @@
StringUtils.stringifyException(e));
if (second) {
+ totalScanErrors++;
datanode.getMetrics().blockVerificationFailures.inc();
handleScanFailure(block);
return;
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Nov 28 20:05:56 2009
@@ -34,11 +34,10 @@
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
@@ -55,11 +54,16 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -67,7 +71,7 @@
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -75,7 +79,11 @@
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
@@ -83,9 +91,6 @@
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessToken;
-import org.apache.hadoop.security.AccessTokenHandler;
-import org.apache.hadoop.security.ExportedAccessKeys;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authorize.ConfiguredPolicy;
import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -169,8 +174,6 @@
volatile boolean shouldRun = true;
private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
- /** list of blocks being recovered */
- private final Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();
private LinkedList<String> delHints = new LinkedList<String>();
public final static String EMPTY_DEL_HINT = "";
AtomicInteger xmitsInProgress = new AtomicInteger();
@@ -268,8 +271,8 @@
AbstractList<File> dataDirs
) throws IOException, InterruptedException {
// use configured nameserver & interface to get local hostname
- if (conf.get("slave.host.name") != null) {
- machineName = conf.get("slave.host.name");
+ if (conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY) != null) {
+ machineName = conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY);
}
if (machineName == null) {
machineName = DNS.getDefaultHost(
@@ -278,7 +281,7 @@
}
this.nameNodeAddr = NameNode.getAddress(conf);
- this.socketTimeout = conf.getInt("dfs.socket.timeout",
+ this.socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsConstants.READ_TIMEOUT);
this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
HdfsConstants.WRITE_TIMEOUT);
@@ -286,7 +289,8 @@
* to false on some of them. */
this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed",
true);
- this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
+ this.writePacketSize = conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
+ DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
InetSocketAddress socAddr = NetUtils.createSocketAddr(
conf.get("dfs.datanode.address", "0.0.0.0:50010"));
int tmpPort = socAddr.getPort();
@@ -313,7 +317,7 @@
dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
// it would have been better to pass storage as a parameter to
// constructor below - need to augment ReflectionUtils used below.
- conf.set("StorageId", dnRegistration.getStorageID());
+ conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, dnRegistration.getStorageID());
try {
//Equivalent of following (can't do because Simulated is in test dir)
// this.data = new SimulatedFSDataset(conf);
@@ -385,10 +389,11 @@
LOG.debug("Datanode listening on " + infoHost + ":" + tmpInfoPort);
}
if (conf.getBoolean("dfs.https.enable", false)) {
- boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
+ boolean needClientAuth = conf.getBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
+ DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
"dfs.datanode.https.address", infoHost + ":" + 0));
- Configuration sslConf = new Configuration(false);
+ Configuration sslConf = new HdfsConfiguration(false);
sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
"ssl-server.xml"));
if (LOG.isDebugEnabled()) {
@@ -405,7 +410,7 @@
this.infoServer.start();
// adjust info port
this.dnRegistration.setInfoPort(this.infoServer.getPort());
- myMetrics = new DataNodeMetrics(conf, dnRegistration.getStorageID());
+ myMetrics = new DataNodeMetrics(conf, dnRegistration.getName());
// set service-level authorization security policy
if (conf.getBoolean(
@@ -963,7 +968,7 @@
processDistributedUpgradeCommand((UpgradeCommand)cmd);
break;
case DatanodeProtocol.DNA_RECOVERBLOCK:
- recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
+ recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
break;
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
@@ -1040,13 +1045,12 @@
// and can be safely GC'ed.
//
long brStartTime = now();
- Block[] bReport = data.getBlockReport();
+ BlockListAsLongs bReport = data.getBlockReport();
- cmd = namenode.blockReport(dnRegistration,
- BlockListAsLongs.convertToArrayLongs(bReport));
+ cmd = namenode.blockReport(dnRegistration, bReport.getBlockListAsLongs());
long brTime = now() - brStartTime;
myMetrics.blockReports.inc(brTime);
- LOG.info("BlockReport of " + bReport.length +
+ LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
" blocks got processed in " + brTime + " msecs");
//
// If we have sent the first block report, then wait a random
@@ -1295,13 +1299,14 @@
//
// Header info
//
- AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+ BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
if (isAccessTokenEnabled) {
accessToken = accessTokenHandler.generateToken(null, b.getBlockId(),
EnumSet.of(AccessTokenHandler.AccessMode.WRITE));
}
DataTransferProtocol.Sender.opWriteBlock(out,
- b.getBlockId(), b.getGenerationStamp(), 0, false, "",
+ b.getBlockId(), b.getGenerationStamp(), 0,
+ BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, "",
srcNode, targets, accessToken);
// send data & checksum
@@ -1324,6 +1329,20 @@
}
}
}
+
+ /**
+ * After a block becomes finalized, a datanode increases metric counter,
+ * notifies namenode, and adds it to the block scanner
+ * @param block
+ * @param delHint
+ */
+ void closeBlock(Block block, String delHint) {
+ myMetrics.blocksWritten.inc();
+ notifyNamenodeReceivedBlock(block, delHint);
+ if (blockScanner != null) {
+ blockScanner.addBlock(block);
+ }
+ }
/**
* No matter what kind of exception we get, keep retrying to offerService().
@@ -1382,7 +1401,7 @@
public static DataNode instantiateDataNode(String args[],
Configuration conf) throws IOException {
if (conf == null)
- conf = new Configuration();
+ conf = new HdfsConfiguration();
if (args != null) {
// parse generic hadoop options
@@ -1399,7 +1418,7 @@
" anymore. RackID resolution is handled by the NameNode.");
System.exit(-1);
}
- String[] dataDirs = conf.getStrings("dfs.data.dir");
+ String[] dataDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
dnThreadName = "DataNode: [" +
StringUtils.arrayToString(dataDirs) + "]";
return makeInstance(dataDirs, conf);
@@ -1561,42 +1580,16 @@
}
}
- // InterDataNodeProtocol implementation
- /** {@inheritDoc} */
- public BlockMetaDataInfo getBlockMetaDataInfo(Block block
- ) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("block=" + block);
- }
- Block stored = data.getStoredBlock(block.getBlockId());
-
- if (stored == null) {
- return null;
- }
- BlockMetaDataInfo info = new BlockMetaDataInfo(stored,
- blockScanner.getLastScanTime(stored));
- if (LOG.isDebugEnabled()) {
- LOG.debug("getBlockMetaDataInfo successful block=" + stored +
- " length " + stored.getNumBytes() +
- " genstamp " + stored.getGenerationStamp());
- }
-
- // paranoia! verify that the contents of the stored block
- // matches the block file on disk.
- data.validateBlockMetadata(stored);
- return info;
- }
-
- public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
+ public Daemon recoverBlocks(final Collection<RecoveringBlock> blocks) {
Daemon d = new Daemon(threadGroup, new Runnable() {
/** Recover a list of blocks. It is run by the primary datanode. */
public void run() {
- for(int i = 0; i < blocks.length; i++) {
+ for(RecoveringBlock b : blocks) {
try {
- logRecoverBlock("NameNode", blocks[i], targets[i]);
- recoverBlock(blocks[i], false, targets[i], true);
+ logRecoverBlock("NameNode", b.getBlock(), b.getLocations());
+ recoverBlock(b);
} catch (IOException e) {
- LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
+ LOG.warn("recoverBlocks FAILED: " + b, e);
}
}
}
@@ -1605,22 +1598,39 @@
return d;
}
- /** {@inheritDoc} */
- public void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException {
- LOG.info("oldblock=" + oldblock + "(length=" + oldblock.getNumBytes()
- + "), newblock=" + newblock + "(length=" + newblock.getNumBytes()
- + "), datanode=" + dnRegistration.getName());
- data.updateBlock(oldblock, newblock);
- if (finalize) {
- data.finalizeBlock(newblock);
- myMetrics.blocksWritten.inc();
- notifyNamenodeReceivedBlock(newblock, EMPTY_DEL_HINT);
- LOG.info("Received block " + newblock +
- " of size " + newblock.getNumBytes() +
- " as part of lease recovery.");
+ // InterDataNodeProtocol implementation
+ @Override // InterDatanodeProtocol
+ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+ throws IOException {
+ return data.initReplicaRecovery(rBlock);
+ }
+
+ /**
+ * Convenience method, which unwraps RemoteException.
+ * @throws IOException not a RemoteException.
+ */
+ private static ReplicaRecoveryInfo callInitReplicaRecovery(
+ InterDatanodeProtocol datanode,
+ RecoveringBlock rBlock) throws IOException {
+ try {
+ return datanode.initReplicaRecovery(rBlock);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException();
}
}
+ /**
+ * Update replica with the new generation stamp and length.
+ */
+ @Override // InterDatanodeProtocol
+ public Block updateReplicaUnderRecovery(Block oldBlock,
+ long recoveryId,
+ long newLength) throws IOException {
+ ReplicaInfo r =
+ data.updateReplicaUnderRecovery(oldBlock, recoveryId, newLength);
+ return new Block(r);
+ }
+
/** {@inheritDoc} */
public long getProtocolVersion(String protocol, long clientVersion
) throws IOException {
@@ -1633,164 +1643,171 @@
+ ": " + protocol);
}
- /** A convenient class used in lease recovery */
+ /** A convenient class used in block recovery */
private static class BlockRecord {
final DatanodeID id;
final InterDatanodeProtocol datanode;
- final Block block;
+ final ReplicaRecoveryInfo rInfo;
- BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
+ BlockRecord(DatanodeID id,
+ InterDatanodeProtocol datanode,
+ ReplicaRecoveryInfo rInfo) {
this.id = id;
this.datanode = datanode;
- this.block = block;
+ this.rInfo = rInfo;
}
/** {@inheritDoc} */
public String toString() {
- return "block:" + block + " node:" + id;
+ return "block:" + rInfo + " node:" + id;
}
}
/** Recover a block */
- private LocatedBlock recoverBlock(Block block, boolean keepLength,
- DatanodeInfo[] targets, boolean closeFile) throws IOException {
-
+ private void recoverBlock(RecoveringBlock rBlock) throws IOException {
+ Block block = rBlock.getBlock();
+ DatanodeInfo[] targets = rBlock.getLocations();
DatanodeID[] datanodeids = (DatanodeID[])targets;
- // If the block is already being recovered, then skip recovering it.
- // This can happen if the namenode and client start recovering the same
- // file at the same time.
- synchronized (ongoingRecovery) {
- Block tmp = new Block();
- tmp.set(block.getBlockId(), block.getNumBytes(), GenerationStamp.WILDCARD_STAMP);
- if (ongoingRecovery.get(tmp) != null) {
- String msg = "Block " + block + " is already being recovered, " +
- " ignoring this request to recover it.";
- LOG.info(msg);
- throw new IOException(msg);
- }
- ongoingRecovery.put(block, block);
- }
- try {
- List<BlockRecord> syncList = new ArrayList<BlockRecord>();
- long minlength = Long.MAX_VALUE;
- int errorCount = 0;
+ List<BlockRecord> syncList = new ArrayList<BlockRecord>(datanodeids.length);
+ int errorCount = 0;
- //check generation stamps
- for(DatanodeID id : datanodeids) {
- try {
- InterDatanodeProtocol datanode = dnRegistration.equals(id)?
- this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
- BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
- if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
- if (keepLength) {
- if (info.getNumBytes() == block.getNumBytes()) {
- syncList.add(new BlockRecord(id, datanode, new Block(info)));
- }
- }
- else {
- syncList.add(new BlockRecord(id, datanode, new Block(info)));
- if (info.getNumBytes() < minlength) {
- minlength = info.getNumBytes();
- }
- }
- }
- } catch (IOException e) {
- ++errorCount;
- InterDatanodeProtocol.LOG.warn(
- "Failed to getBlockMetaDataInfo for block (=" + block
- + ") from datanode (=" + id + ")", e);
+ //check generation stamps
+ for(DatanodeID id : datanodeids) {
+ try {
+ InterDatanodeProtocol datanode = dnRegistration.equals(id)?
+ this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
+ ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
+ if (info != null &&
+ info.getGenerationStamp() >= block.getGenerationStamp() &&
+ info.getNumBytes() > 0) {
+ syncList.add(new BlockRecord(id, datanode, info));
}
+ } catch (RecoveryInProgressException ripE) {
+ InterDatanodeProtocol.LOG.warn(
+ "Recovery for replica " + block + " on data-node " + id
+ + " is already in progress. Recovery id = "
+ + rBlock.getNewGenerationStamp() + " is aborted.", ripE);
+ return;
+ } catch (IOException e) {
+ ++errorCount;
+ InterDatanodeProtocol.LOG.warn(
+ "Failed to obtain replica info for block (=" + block
+ + ") from datanode (=" + id + ")", e);
}
+ }
- if (syncList.isEmpty() && errorCount > 0) {
- throw new IOException("All datanodes failed: block=" + block
- + ", datanodeids=" + Arrays.asList(datanodeids));
- }
- if (!keepLength) {
- block.setNumBytes(minlength);
- }
- return syncBlock(block, syncList, targets, closeFile);
- } finally {
- synchronized (ongoingRecovery) {
- ongoingRecovery.remove(block);
- }
+ if (errorCount == datanodeids.length) {
+ throw new IOException("All datanodes failed: block=" + block
+ + ", datanodeids=" + Arrays.asList(datanodeids));
}
+
+ syncBlock(rBlock, syncList);
}
/** Block synchronization */
- private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,
- DatanodeInfo[] targets, boolean closeFile) throws IOException {
+ private void syncBlock(RecoveringBlock rBlock,
+ List<BlockRecord> syncList) throws IOException {
+ Block block = rBlock.getBlock();
+ long recoveryId = rBlock.getNewGenerationStamp();
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
- + "), syncList=" + syncList + ", closeFile=" + closeFile);
+ + "), syncList=" + syncList);
}
- //syncList.isEmpty() that all datanodes do not have the block
- //so the block can be deleted.
+ // syncList.isEmpty() means that all data-nodes do not have the block
+ // or their replicas have 0 length.
+ // The block can be deleted.
if (syncList.isEmpty()) {
- namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
- DatanodeID.EMPTY_ARRAY);
- //always return a new access token even if everything else stays the same
- LocatedBlock b = new LocatedBlock(block, targets);
- if (isAccessTokenEnabled) {
- b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
- .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
- }
- return b;
+ namenode.commitBlockSynchronization(block, recoveryId, 0,
+ true, true, DatanodeID.EMPTY_ARRAY);
+ return;
}
- List<DatanodeID> successList = new ArrayList<DatanodeID>();
+ // Calculate the best available replica state.
+ ReplicaState bestState = ReplicaState.RWR;
+ long finalizedLength = -1;
+ for(BlockRecord r : syncList) {
+ assert r.rInfo.getNumBytes() > 0 : "zero length replica";
+ ReplicaState rState = r.rInfo.getOriginalReplicaState();
+ if(rState.getValue() < bestState.getValue())
+ bestState = rState;
+ if(rState == ReplicaState.FINALIZED) {
+ if(finalizedLength > 0 && finalizedLength != r.rInfo.getNumBytes())
+ throw new IOException("Inconsistent size of finalized replicas. " +
+ "Replica " + r.rInfo + " expected size: " + finalizedLength);
+ finalizedLength = r.rInfo.getNumBytes();
+ }
+ }
- long generationstamp = namenode.nextGenerationStamp(block);
- Block newblock = new Block(block.getBlockId(), block.getNumBytes(), generationstamp);
+ // Calculate list of nodes that will participate in the recovery
+ // and the new block size
+ List<BlockRecord> participatingList = new ArrayList<BlockRecord>();
+ Block newBlock = new Block(block.getBlockId(), -1, recoveryId);
+ switch(bestState) {
+ case FINALIZED:
+ assert finalizedLength > 0 : "finalizedLength is not positive";
+ for(BlockRecord r : syncList) {
+ ReplicaState rState = r.rInfo.getOriginalReplicaState();
+ if(rState == ReplicaState.FINALIZED ||
+ rState == ReplicaState.RBW &&
+ r.rInfo.getNumBytes() == finalizedLength)
+ participatingList.add(r);
+ }
+ newBlock.setNumBytes(finalizedLength);
+ break;
+ case RBW:
+ case RWR:
+ long minLength = Long.MAX_VALUE;
+ for(BlockRecord r : syncList) {
+ ReplicaState rState = r.rInfo.getOriginalReplicaState();
+ if(rState == bestState) {
+ minLength = Math.min(minLength, r.rInfo.getNumBytes());
+ participatingList.add(r);
+ }
+ }
+ newBlock.setNumBytes(minLength);
+ break;
+ case RUR:
+ case TEMPORARY:
+ assert false : "bad replica state: " + bestState;
+ }
- for(BlockRecord r : syncList) {
+ List<DatanodeID> failedList = new ArrayList<DatanodeID>();
+ List<DatanodeID> successList = new ArrayList<DatanodeID>();
+ for(BlockRecord r : participatingList) {
try {
- r.datanode.updateBlock(r.block, newblock, closeFile);
+ Block reply = r.datanode.updateReplicaUnderRecovery(
+ r.rInfo, recoveryId, newBlock.getNumBytes());
+ assert reply.equals(newBlock) &&
+ reply.getNumBytes() == newBlock.getNumBytes() :
+ "Updated replica must be the same as the new block.";
successList.add(r.id);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
- + newblock + ", datanode=" + r.id + ")", e);
+ + newBlock + ", datanode=" + r.id + ")", e);
+ failedList.add(r.id);
}
}
- if (!successList.isEmpty()) {
- DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
-
- namenode.commitBlockSynchronization(block,
- newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,
- nlist);
- DatanodeInfo[] info = new DatanodeInfo[nlist.length];
- for (int i = 0; i < nlist.length; i++) {
- info[i] = new DatanodeInfo(nlist[i]);
+ // If any of the data-nodes failed, the recovery fails, because
+ // we never know the actual state of the replica on failed data-nodes.
+ // The recovery should be started over.
+ if(!failedList.isEmpty()) {
+ StringBuilder b = new StringBuilder();
+ for(DatanodeID id : failedList) {
+ b.append("\n " + id);
}
- LocatedBlock b = new LocatedBlock(newblock, info); // success
- // should have used client ID to generate access token, but since
- // owner ID is not checked, we simply pass null for now.
- if (isAccessTokenEnabled) {
- b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
- .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
- }
- return b;
+ throw new IOException("Cannot recover " + block + ", the following "
+ + failedList.size() + " data-nodes failed {" + b + "\n}");
}
- //failed
- StringBuilder b = new StringBuilder();
- for(BlockRecord r : syncList) {
- b.append("\n " + r.id);
- }
- throw new IOException("Cannot recover " + block + ", none of these "
- + syncList.size() + " datanodes success {" + b + "\n}");
+ // Notify the name-node about successfully recovered replicas.
+ DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
+ namenode.commitBlockSynchronization(block,
+ newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
+ nlist);
}
- // ClientDataNodeProtocol implementation
- /** {@inheritDoc} */
- public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
- ) throws IOException {
- logRecoverBlock("Client", block, targets);
- return recoverBlock(block, keepLength, targets, false);
- }
-
private static void logRecoverBlock(String who,
Block block, DatanodeID[] targets) {
StringBuilder msg = new StringBuilder(targets[0].getName());
@@ -1800,4 +1817,11 @@
LOG.info(who + " calls recoverBlock(block=" + block
+ ", targets=[" + msg + "])");
}
+
+ // ClientDataNodeProtocol implementation
+ /** {@inheritDoc} */
+ @Override // ClientDataNodeProtocol
+ public long getReplicaVisibleLength(final Block block) throws IOException {
+ return data.getReplicaVisibleLength(block);
+ }
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Sat Nov 28 20:05:56 2009
@@ -32,8 +32,8 @@
import java.util.regex.Pattern;
import org.apache.hadoop.fs.FileUtil.HardLink;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
@@ -53,6 +53,9 @@
final static String BLOCK_SUBDIR_PREFIX = "subdir";
final static String BLOCK_FILE_PREFIX = "blk_";
final static String COPY_FILE_PREFIX = "dncp_";
+ final static String STORAGE_DIR_RBW = "rbw";
+ final static String STORAGE_DIR_FINALIZED = "finalized";
+ final static String STORAGE_DIR_DETACHED = "detach";
private String storageID;
@@ -270,6 +273,8 @@
File curDir = sd.getCurrentDir();
File prevDir = sd.getPreviousDir();
assert curDir.exists() : "Current directory must exist.";
+ // Cleanup directory "detach"
+ cleanupDetachDir(new File(curDir, STORAGE_DIR_DETACHED));
// delete previous dir before upgrading
if (prevDir.exists())
deleteDir(prevDir);
@@ -277,8 +282,11 @@
assert !tmpDir.exists() : "previous.tmp directory must not exist.";
// rename current to tmp
rename(curDir, tmpDir);
- // hardlink blocks
- linkBlocks(tmpDir, curDir, this.getLayoutVersion());
+ // hard link finalized & rbw blocks
+ linkAllBlocks(tmpDir, curDir);
+ // create current directory if not exists
+ if (!curDir.exists() && !curDir.mkdirs())
+ throw new IOException("Cannot create directory " + curDir);
// write version file
this.layoutVersion = FSConstants.LAYOUT_VERSION;
assert this.namespaceID == nsInfo.getNamespaceID() :
@@ -290,6 +298,30 @@
LOG.info("Upgrade of " + sd.getRoot()+ " is complete.");
}
+ /**
+ * Cleanup the detachDir.
+ *
+ * If the directory is not empty report an error;
+ * Otherwise remove the directory.
+ *
+ * @param detachDir detach directory
+ * @throws IOException if the directory is not empty or it can not be removed
+ */
+ private void cleanupDetachDir(File detachDir) throws IOException {
+ if (layoutVersion >= PRE_RBW_LAYOUT_VERSION &&
+ detachDir.exists() && detachDir.isDirectory() ) {
+
+ if (detachDir.list().length != 0 ) {
+ throw new IOException("Detached directory " + detachDir +
+ " is not empty. Please manually move each file under this " +
+ "directory to the finalized directory if the finalized " +
+ "directory tree does not have the file.");
+ } else if (!detachDir.delete()) {
+ throw new IOException("Cannot remove directory " + detachDir);
+ }
+ }
+ }
+
void doRollback( StorageDirectory sd,
NamespaceInfo nsInfo
) throws IOException {
@@ -359,8 +391,34 @@
doFinalize(it.next());
}
}
+
+ /**
+ * Hardlink all finalized and RBW blocks in fromDir to toDir
+ * @param fromDir directory where the snapshot is stored
+ * @param toDir the current data directory
+ * @throws IOException if error occurs during hardlink
+ */
+ private void linkAllBlocks(File fromDir, File toDir) throws IOException {
+ // do the link
+ int diskLayoutVersion = this.getLayoutVersion();
+ if (diskLayoutVersion < PRE_RBW_LAYOUT_VERSION) { // RBW version
+ // hardlink finalized blocks in tmpDir/finalized
+ linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED),
+ new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion);
+ // hardlink rbw blocks in tmpDir/finalized
+ linkBlocks(new File(fromDir, STORAGE_DIR_RBW),
+ new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion);
+ } else { // pre-RBW version
+ // hardlink finalized blocks in tmpDir
+ linkBlocks(fromDir,
+ new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion);
+ }
+ }
static void linkBlocks(File from, File to, int oldLV) throws IOException {
+ if (!from.exists()) {
+ return;
+ }
if (!from.isDirectory()) {
if (from.getName().startsWith(COPY_FILE_PREFIX)) {
FileInputStream in = new FileInputStream(from);
@@ -387,7 +445,7 @@
return;
}
// from is a directory
- if (!to.mkdir())
+ if (!to.mkdirs())
throw new IOException("Cannot create directory " + to);
String[] blockNames = from.list(new java.io.FilenameFilter() {
public boolean accept(File dir, String name) {
@@ -440,7 +498,7 @@
if (matcher.matches()) {
//return the current metadata file name
return FSDataset.getMetaFileName(matcher.group(1),
- Block.GRANDFATHER_GENERATION_STAMP);
+ GenerationStamp.GRANDFATHER_GENERATION_STAMP);
}
return oldFileName;
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Sat Nov 28 20:05:56 2009
@@ -38,6 +38,9 @@
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.AccessTokenHandler;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
import org.apache.hadoop.io.IOUtils;
@@ -46,8 +49,6 @@
import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessToken;
-import org.apache.hadoop.security.AccessTokenHandler;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
@@ -127,7 +128,7 @@
@Override
protected void opReadBlock(DataInputStream in,
long blockId, long blockGs, long startOffset, long length,
- String clientName, AccessToken accessToken) throws IOException {
+ String clientName, BlockAccessToken accessToken) throws IOException {
final Block block = new Block(blockId, 0 , blockGs);
OutputStream baseStream = NetUtils.getOutputStream(s,
datanode.socketWriteTimeout);
@@ -208,9 +209,10 @@
*/
@Override
protected void opWriteBlock(DataInputStream in, long blockId, long blockGs,
- int pipelineSize, boolean isRecovery,
+ int pipelineSize, BlockConstructionStage stage,
+ long newGs, long minBytesRcvd, long maxBytesRcvd,
String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
- AccessToken accessToken) throws IOException {
+ BlockAccessToken accessToken) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
@@ -250,11 +252,17 @@
String firstBadLink = ""; // first datanode that failed in connection setup
DataTransferProtocol.Status mirrorInStatus = SUCCESS;
try {
- // open a block receiver and check if the block does not exist
- blockReceiver = new BlockReceiver(block, in,
- s.getRemoteSocketAddress().toString(),
- s.getLocalSocketAddress().toString(),
- isRecovery, client, srcDataNode, datanode);
+ if (client.length() == 0 ||
+ stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+ // open a block receiver
+ blockReceiver = new BlockReceiver(block, in,
+ s.getRemoteSocketAddress().toString(),
+ s.getLocalSocketAddress().toString(),
+ stage, newGs, minBytesRcvd, maxBytesRcvd,
+ client, srcDataNode, datanode);
+ } else {
+ datanode.data.recoverClose(block, newGs, minBytesRcvd);
+ }
//
// Open network conn to backup machine, if
@@ -282,10 +290,13 @@
// Write header: Copied from DFSClient.java!
DataTransferProtocol.Sender.opWriteBlock(mirrorOut,
- block.getBlockId(), block.getGenerationStamp(), pipelineSize,
- isRecovery, client, srcDataNode, targets, accessToken);
+ blockId, blockGs,
+ pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, client,
+ srcDataNode, targets, accessToken);
- blockReceiver.writeChecksumHeader(mirrorOut);
+ if (blockReceiver != null) { // send checksum header
+ blockReceiver.writeChecksumHeader(mirrorOut);
+ }
mirrorOut.flush();
// read connect ack (only for clients, not for replication req)
@@ -336,24 +347,31 @@
}
// receive the block and mirror to the next target
- String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
- blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
- mirrorAddr, null, targets.length);
+ if (blockReceiver != null) {
+ String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
+ blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
+ mirrorAddr, null, targets.length);
+ }
- // if this write is for a replication request (and not
- // from a client), then confirm block. For client-writes,
+ // update its generation stamp
+ if (client.length() != 0 &&
+ stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+ block.setGenerationStamp(newGs);
+ block.setNumBytes(minBytesRcvd);
+ }
+
+ // if this write is for a replication request or recovering
+ // a failed close for client, then confirm block. For other client-writes,
// the block is finalized in the PacketResponder.
- if (client.length() == 0) {
- datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
+ if (client.length() == 0 ||
+ stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+ datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
LOG.info("Received block " + block +
" src: " + remoteAddress +
" dest: " + localAddress +
" of size " + block.getNumBytes());
}
- if (datanode.blockScanner != null) {
- datanode.blockScanner.addBlock(block);
- }
} catch (IOException ioe) {
LOG.info("writeBlock " + block + " received exception " + ioe);
@@ -378,7 +396,7 @@
*/
@Override
protected void opBlockChecksum(DataInputStream in,
- long blockId, long blockGs, AccessToken accessToken) throws IOException {
+ long blockId, long blockGs, BlockAccessToken accessToken) throws IOException {
final Block block = new Block(blockId, 0 , blockGs);
DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
datanode.socketWriteTimeout));
@@ -437,7 +455,7 @@
*/
@Override
protected void opCopyBlock(DataInputStream in,
- long blockId, long blockGs, AccessToken accessToken) throws IOException {
+ long blockId, long blockGs, BlockAccessToken accessToken) throws IOException {
// Read in the header
Block block = new Block(blockId, 0, blockGs);
if (datanode.isAccessTokenEnabled
@@ -508,7 +526,7 @@
@Override
protected void opReplaceBlock(DataInputStream in,
long blockId, long blockGs, String sourceID, DatanodeInfo proxySource,
- AccessToken accessToken) throws IOException {
+ BlockAccessToken accessToken) throws IOException {
/* read header */
final Block block = new Block(blockId, dataXceiverServer.estimateBlockSize,
blockGs);
@@ -569,7 +587,7 @@
blockReceiver = new BlockReceiver(
block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
- false, "", null, datanode);
+ null, 0, 0, 0, "", null, datanode);
// receive a block
blockReceiver.receiveBlock(null, null, null, null,
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Sat Nov 28 20:05:56 2009
@@ -32,6 +32,8 @@
import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+
/**
* Server used for receiving/sending a block of data.
@@ -115,11 +117,12 @@
this.maxXceiverCount = conf.getInt("dfs.datanode.max.xcievers",
MAX_XCEIVER_COUNT);
- this.estimateBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ this.estimateBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
//set up parameter for cluster balancing
this.balanceThrottler = new BlockBalanceThrottler(
- conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024));
+ conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
+ DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT));
}
/**
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Sat Nov 28 20:05:56 2009
@@ -32,12 +32,12 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessToken;
-import org.apache.hadoop.security.AccessTokenHandler;
import org.apache.hadoop.util.StringUtils;
class DatanodeJspHelper {
@@ -258,6 +258,10 @@
out.print("<B>Total number of blocks: " + blocks.size() + "</B><br>");
// generate a table and dump the info
out.println("\n<table>");
+
+ String namenodeHost = datanode.getNameNodeAddr().getHostName();
+ String namenodeHostName = InetAddress.getByName(namenodeHost).getCanonicalHostName();
+
for (LocatedBlock cur : blocks) {
out.print("<tr>");
final String blockidstring = Long.toString(cur.getBlock().getBlockId());
@@ -277,14 +281,18 @@
+ "&genstamp=" + cur.getBlock().getGenerationStamp()
+ "&namenodeInfoPort=" + namenodeInfoPort
+ "&chunkSizeToView=" + chunkSizeToView;
+
+ String blockInfoUrl = "http://" + namenodeHostName + ":"
+ + namenodeInfoPort
+ + "/block_info_xml.jsp?blockId=" + blockidstring;
out.print("<td> </td><td><a href=\"" + blockUrl + "\">"
- + datanodeAddr + "</a></td>");
+ + datanodeAddr + "</a></td><td>"
+ + "<a href=\"" + blockInfoUrl + "\">View Block Info</a></td>");
}
out.println("</tr>");
}
out.println("</table>");
out.print("<hr>");
- String namenodeHost = datanode.getNameNodeAddr().getHostName();
out.print("<br><a href=\"http://"
+ InetAddress.getByName(namenodeHost).getCanonicalHostName() + ":"
+ namenodeInfoPort + "/dfshealth.jsp\">Go back to DFS home</a>");
@@ -317,9 +325,10 @@
final DFSClient dfs = new DFSClient(datanode.getNameNodeAddr(),
JspHelper.conf);
- AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+ BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
if (JspHelper.conf.getBoolean(
- AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false)) {
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT)) {
List<LocatedBlock> blks = dfs.getNamenode().getBlockLocations(filename, 0,
Long.MAX_VALUE).getLocatedBlocks();
if (blks == null || blks.size() == 0) {
@@ -556,7 +565,7 @@
LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
long blockSize = lastBlk.getBlock().getNumBytes();
long blockId = lastBlk.getBlock().getBlockId();
- AccessToken accessToken = lastBlk.getAccessToken();
+ BlockAccessToken accessToken = lastBlk.getAccessToken();
long genStamp = lastBlk.getBlock().getGenerationStamp();
DatanodeInfo chosenNode;
try {
@@ -577,4 +586,4 @@
out.print("</textarea>");
dfs.close();
}
-}
\ No newline at end of file
+}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java Sat Nov 28 20:05:56 2009
@@ -27,6 +27,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
/**
@@ -113,12 +114,12 @@
@Override // Object
public int hashCode() {
- return 37 * 17 + (int) (blockId^(blockId>>>32));
+ return (int)(blockId^(blockId>>>32));
}
public long getGenStamp() {
return metaFile != null ? Block.getGenerationStamp(metaFile.getName()) :
- Block.GRANDFATHER_GENERATION_STAMP;
+ GenerationStamp.GRANDFATHER_GENERATION_STAMP;
}
}