You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:06:08 UTC
svn commit: r885143 [11/18] - in /hadoop/hdfs/branches/HDFS-326: ./
.eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/
src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs...
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Sat Nov 28 20:05:56 2009
@@ -22,69 +22,145 @@
import java.io.FileOutputStream;
import java.io.IOException;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FileUtil.HardLink;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
import org.apache.hadoop.io.IOUtils;
/**
- * This class is used by the datanode to maintain the map from a block
- * to its metadata.
+ * This class is used by datanodes to maintain meta data of its replicas.
+ * It provides a general interface for meta information of a replica.
*/
-class ReplicaInfo {
+abstract public class ReplicaInfo extends Block implements Replica {
+ private FSVolume volume; // volume where the replica belongs
+ private File dir; // directory where block & meta files belong
- private FSVolume volume; // volume where the block belongs
- private File file; // block file
- private boolean detached; // copy-on-write done for block
-
- ReplicaInfo(FSVolume vol, File file) {
+ /**
+ * Constructor for a zero length replica
+ * @param blockId block id
+ * @param genStamp replica generation stamp
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ ReplicaInfo(long blockId, long genStamp, FSVolume vol, File dir) {
+ this( blockId, 0L, genStamp, vol, dir);
+ }
+
+ /**
+ * Constructor
+ * @param block a block
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ ReplicaInfo(Block block, FSVolume vol, File dir) {
+ this(block.getBlockId(), block.getNumBytes(),
+ block.getGenerationStamp(), vol, dir);
+ }
+
+ /**
+ * Constructor
+ * @param blockId block id
+ * @param len replica length
+ * @param genStamp replica generation stamp
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ ReplicaInfo(long blockId, long len, long genStamp,
+ FSVolume vol, File dir) {
+ super(blockId, len, genStamp);
this.volume = vol;
- this.file = file;
- detached = false;
+ this.dir = dir;
}
- ReplicaInfo(FSVolume vol) {
- this.volume = vol;
- this.file = null;
- detached = false;
+ /**
+ * Copy constructor.
+ * @param from
+ */
+ ReplicaInfo(ReplicaInfo from) {
+ this(from, from.getVolume(), from.getDir());
}
+ /**
+ * Get this replica's meta file name
+ * @return this replica's meta file name
+ */
+ private String getMetaFileName() {
+ return getBlockName() + "_" + getGenerationStamp() + METADATA_EXTENSION;
+ }
+
+ /**
+ * Get the full path of this replica's data file
+ * @return the full path of this replica's data file
+ */
+ File getBlockFile() {
+ return new File(getDir(), getBlockName());
+ }
+
+ /**
+ * Get the full path of this replica's meta file
+ * @return the full path of this replica's meta file
+ */
+ File getMetaFile() {
+ return new File(getDir(), getMetaFileName());
+ }
+
+ /**
+ * Get the volume where this replica is located on disk
+ * @return the volume where this replica is located on disk
+ */
FSVolume getVolume() {
return volume;
}
-
- File getFile() {
- return file;
+
+ /**
+ * Set the volume where this replica is located on disk
+ */
+ void setVolume(FSVolume vol) {
+ this.volume = vol;
}
-
- void setFile(File f) {
- file = f;
+
+ /**
+ * Return the parent directory path where this replica is located
+ * @return the parent directory path where this replica is located
+ */
+ File getDir() {
+ return dir;
}
/**
- * Is this block already detached?
+ * Set the parent directory where this replica is located
+ * @param dir the parent directory where the replica is located
*/
- boolean isDetached() {
- return detached;
+ void setDir(File dir) {
+ this.dir = dir;
}
/**
- * Block has been successfully detached
+ * check if this replica has already been unlinked.
+ * @return true if the replica has already been unlinked
+ * or no need to be detached; false otherwise
*/
- void setDetached() {
- detached = true;
+ boolean isUnlinked() {
+ return true; // no need to be unlinked
}
/**
+ * set that this replica is unlinked
+ */
+ void setUnlinked() {
+ // no need to be unlinked
+ }
+
+ /**
* Copy specified file into a temporary file. Then rename the
* temporary file to the original name. This will cause any
* hardlinks to the original file to be removed. The temporary
- * files are created in the detachDir. The temporary files will
+ * files are created in the same directory. The temporary files will
* be recovered (especially on Windows) on datanode restart.
*/
- private void detachFile(File file, Block b) throws IOException {
- File tmpFile = volume.createDetachFile(b, file.getName());
+ private void unlinkFile(File file, Block b) throws IOException {
+ File tmpFile = FSDataset.createTmpFile(b, FSDataset.getUnlinkTmpFile(file));
try {
FileInputStream in = new FileInputStream(file);
try {
@@ -114,33 +190,60 @@
}
/**
- * Returns true if this block was copied, otherwise returns false.
+ * Remove a hard link by copying the block to a temporary place and
+ * then moving it back
+ * @param numLinks number of hard links
+ * @return true if copy is successful;
+ * false if it is already detached or no need to be detached
+ * @throws IOException if there is any copy error
*/
- boolean detachBlock(Block block, int numLinks) throws IOException {
- if (isDetached()) {
+ boolean unlinkBlock(int numLinks) throws IOException {
+ if (isUnlinked()) {
return false;
}
- if (file == null || volume == null) {
- throw new IOException("detachBlock:Block not found. " + block);
+ File file = getBlockFile();
+ if (file == null || getVolume() == null) {
+ throw new IOException("detachBlock:Block not found. " + this);
}
- File meta = FSDataset.getMetaFile(file, block);
+ File meta = getMetaFile();
if (meta == null) {
- throw new IOException("Meta file not found for block " + block);
+ throw new IOException("Meta file not found for block " + this);
}
if (HardLink.getLinkCount(file) > numLinks) {
- DataNode.LOG.info("CopyOnWrite for block " + block);
- detachFile(file, block);
+ DataNode.LOG.info("CopyOnWrite for block " + this);
+ unlinkFile(file, this);
}
if (HardLink.getLinkCount(meta) > numLinks) {
- detachFile(meta, block);
+ unlinkFile(meta, this);
}
- setDetached();
+ setUnlinked();
return true;
}
+
+ /**
+ * Set this replica's generation stamp to be a newer one
+ * @param newGS new generation stamp
+ * @throws IOException is the new generation stamp is not greater than the current one
+ */
+ void setNewerGenerationStamp(long newGS) throws IOException {
+ long curGS = getGenerationStamp();
+ if (newGS <= curGS) {
+ throw new IOException("New generation stamp (" + newGS
+ + ") must be greater than current one (" + curGS + ")");
+ }
+ setGenerationStamp(newGS);
+ }
+ @Override //Object
public String toString() {
- return getClass().getSimpleName() + "(volume=" + volume
- + ", file=" + file + ", detached=" + detached + ")";
+ return getClass().getSimpleName()
+ + ", " + super.toString()
+ + ", " + getState()
+ + "\n getNumBytes() = " + getNumBytes()
+ + "\n getBytesOnDisk() = " + getBytesOnDisk()
+ + "\n getVisibleLength()= " + getVisibleLength()
+ + "\n getVolume() = " + getVolume()
+ + "\n getBlockFile() = " + getBlockFile();
}
}
Propchange: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:05:56 2009
@@ -1,2 +1,6 @@
/hadoop/core/branches/branch-0.19/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java:713112
+/hadoop/core/branches/branch-0.19/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:713112
/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java:776175-785643,785929-786278
+/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:776175-785643,785929-786278
+/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:796829-820463
+/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:820487
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeActivityMBean.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeActivityMBean.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeActivityMBean.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeActivityMBean.java Sat Nov 28 20:05:56 2009
@@ -28,7 +28,7 @@
*
* This is the JMX MBean for reporting the DataNode Activity.
* The MBean is register using the name
- * "hadoop:service=DataNode,name=DataNodeActivity-<storageid>"
+ * "hadoop:service=DataNode,name=DataNodeActivity-<hostname>-<portNumber>"
*
* Many of the activity metrics are sampled and averaged on an interval
* which can be specified in the metrics config file.
@@ -57,15 +57,17 @@
final private ObjectName mbeanName;
private Random rand = new Random();
- public DataNodeActivityMBean(final MetricsRegistry mr, final String storageId) {
+ public DataNodeActivityMBean(final MetricsRegistry mr,
+ final String datanodeName) {
super(mr, "Activity statistics at the DataNode");
- String storageName;
- if (storageId.equals("")) {// Temp fix for the uninitialized storage
- storageName = "UndefinedStorageId" + rand.nextInt();
+ String name;
+ if (datanodeName.equals("")) {// Temp fix for the uninitialized name
+ name = "UndefinedDataNodeName" + rand.nextInt();
} else {
- storageName = storageId;
+ name = datanodeName.replace(":", "-");
}
- mbeanName = MBeanUtil.registerMBean("DataNode", "DataNodeActivity-" + storageName, this);
+ mbeanName = MBeanUtil.registerMBean("DataNode",
+ "DataNodeActivity-" + name, this);
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Sat Nov 28 20:05:56 2009
@@ -28,6 +28,7 @@
import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
/**
@@ -90,14 +91,14 @@
new MetricsTimeVaryingRate("blockReports", registry);
- public DataNodeMetrics(Configuration conf, String storageId) {
- String sessionId = conf.get("session.id");
+ public DataNodeMetrics(Configuration conf, String datanodeName) {
+ String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
// Initiate reporting of Java VM metrics
JvmMetrics.init("DataNode", sessionId);
// Now the MBean for the data node
- datanodeActivityMBean = new DataNodeActivityMBean(registry, storageId);
+ datanodeActivityMBean = new DataNodeActivityMBean(registry, datanodeName);
// Create record for DataNode metrics
MetricsContext context = MetricsUtil.getContext("dfs");
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Sat Nov 28 20:05:56 2009
@@ -32,6 +32,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.DNS;
@@ -52,10 +53,10 @@
* </ol>
*/
public class BackupNode extends NameNode {
- private static final String BN_ADDRESS_NAME_KEY = "dfs.backup.address";
- private static final String BN_ADDRESS_DEFAULT = "localhost:50100";
- private static final String BN_HTTP_ADDRESS_NAME_KEY = "dfs.backup.http.address";
- private static final String BN_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50105";
+ private static final String BN_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
+ private static final String BN_ADDRESS_DEFAULT = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT;
+ private static final String BN_HTTP_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
+ private static final String BN_HTTP_ADDRESS_DEFAULT = DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT;
/** Name-node proxy */
NamenodeProtocol namenode;
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java Sat Nov 28 20:05:56 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
/**
* Internal class for block metadata.
@@ -35,12 +36,22 @@
*/
private Object[] triplets;
- BlockInfo(Block blk, int replication) {
+ protected BlockInfo(Block blk, int replication) {
super(blk);
this.triplets = new Object[3*replication];
this.inode = null;
}
+ /**
+ * Copy construction.
+ * This is used to convert BlockInfoUnderConstruction
+ * @param from BlockInfo to copy from.
+ */
+ protected BlockInfo(BlockInfo from) {
+ this(from, from.inode.getReplication());
+ this.inode = from.inode;
+ }
+
INodeFile getINode() {
return inode;
}
@@ -64,7 +75,7 @@
assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
BlockInfo info = (BlockInfo)triplets[index*3+1];
assert info == null ||
- BlockInfo.class.getName().equals(info.getClass().getName()) :
+ info.getClass().getName().startsWith(BlockInfo.class.getName()) :
"BlockInfo is expected at " + index*3;
return info;
}
@@ -74,7 +85,7 @@
assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
BlockInfo info = (BlockInfo)triplets[index*3+2];
assert info == null ||
- BlockInfo.class.getName().equals(info.getClass().getName()) :
+ info.getClass().getName().startsWith(BlockInfo.class.getName()) :
"BlockInfo is expected at " + index*3;
return info;
}
@@ -262,6 +273,43 @@
return true;
}
+ /**
+ * BlockInfo represents a block that is not being constructed.
+ * In order to start modifying the block, the BlockInfo should be converted
+ * to {@link BlockInfoUnderConstruction}.
+ * @return {@link BlockUCState#COMPLETE}
+ */
+ BlockUCState getBlockUCState() {
+ return BlockUCState.COMPLETE;
+ }
+
+ /**
+ * Is this block complete?
+ *
+ * @return true if the state of the block is {@link BlockUCState#COMPLETE}
+ */
+ boolean isComplete() {
+ return getBlockUCState().equals(BlockUCState.COMPLETE);
+ }
+
+ /**
+ * Convert a complete block to an under construction block.
+ *
+ * @return BlockInfoUnderConstruction - an under construction block.
+ */
+ BlockInfoUnderConstruction convertToBlockUnderConstruction(
+ BlockUCState s, DatanodeDescriptor[] targets) {
+ if(isComplete()) {
+ return new BlockInfoUnderConstruction(
+ this, getINode().getReplication(), s, targets);
+ }
+ // the block is already under construction
+ BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this;
+ ucBlock.setBlockUCState(s);
+ ucBlock.setExpectedLocations(targets);
+ return ucBlock;
+ }
+
@Override
public int hashCode() {
// Super implementation is sufficient
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Sat Nov 28 20:05:56 2009
@@ -22,7 +22,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -37,9 +36,12 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
-import org.apache.hadoop.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.mortbay.log.Log;
/**
* Keeps information related to the blocks stored in the Hadoop cluster.
@@ -105,6 +107,9 @@
// Default number of replicas
int defaultReplication;
+ // variable to enable check for enough racks
+ boolean shouldCheckForEnoughRacks = true;
+
/**
* Last block index used for replication work.
*/
@@ -114,7 +119,7 @@
Random r = new Random();
// for block replicas placement
- ReplicationTargetChooser replicator;
+ BlockPlacementPolicy replicator;
BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
this(fsn, conf, DEFAULT_INITIAL_MAP_CAPACITY);
@@ -124,24 +129,25 @@
throws IOException {
namesystem = fsn;
pendingReplications = new PendingReplicationBlocks(
- conf.getInt("dfs.replication.pending.timeout.sec",
- -1) * 1000L);
+ conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
setConfigurationParameters(conf);
blocksMap = new BlocksMap(capacity, DEFAULT_MAP_LOAD_FACTOR);
}
void setConfigurationParameters(Configuration conf) throws IOException {
- this.replicator = new ReplicationTargetChooser(
- conf.getBoolean("dfs.replication.considerLoad", true),
+ this.replicator = BlockPlacementPolicy.getInstance(
+ conf,
namesystem,
namesystem.clusterMap);
this.defaultReplication = conf.getInt("dfs.replication", 3);
this.maxReplication = conf.getInt("dfs.replication.max", 512);
- this.minReplication = conf.getInt("dfs.replication.min", 1);
+ this.minReplication = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
if (minReplication <= 0)
throw new IOException(
- "Unexpected configuration parameters: dfs.replication.min = "
+ "Unexpected configuration parameters: dfs.namenode.replication.min = "
+ minReplication
+ " must be greater than 0");
if (maxReplication >= (int)Short.MAX_VALUE)
@@ -150,15 +156,19 @@
+ maxReplication + " must be less than " + (Short.MAX_VALUE));
if (maxReplication < minReplication)
throw new IOException(
- "Unexpected configuration parameters: dfs.replication.min = "
+ "Unexpected configuration parameters: dfs.namenode.replication.min = "
+ minReplication
+ " must be less than dfs.replication.max = "
+ maxReplication);
- this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
+ this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
+ this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false
+ : true;
FSNamesystem.LOG.info("defaultReplication = " + defaultReplication);
FSNamesystem.LOG.info("maxReplication = " + maxReplication);
FSNamesystem.LOG.info("minReplication = " + minReplication);
FSNamesystem.LOG.info("maxReplicationStreams = " + maxReplicationStreams);
+ FSNamesystem.LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
}
void activate() {
@@ -184,6 +194,11 @@
chooseSourceDatanode(block, containingNodes, numReplicas);
int usableReplicas = numReplicas.liveReplicas() +
numReplicas.decommissionedReplicas();
+
+ if (block instanceof BlockInfo) {
+ String fileName = ((BlockInfo)block).getINode().getFullPathName();
+ out.print(fileName + ": ");
+ }
// l: == live:, d: == decommissioned c: == corrupt e: == excess
out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
" (replicas:" +
@@ -227,7 +242,81 @@
* @return true if the block has minimum replicas
*/
boolean checkMinReplication(Block block) {
- return (blocksMap.numNodes(block) >= minReplication);
+ return (countNodes(block).liveReplicas() >= minReplication);
+ }
+
+ /**
+ * Commit the last block of the file.
+ *
+ * @param fileINode file inode
+ * @param commitBlock - contains client reported block length and generation
+ * @throws IOException if the block does not have at least a minimal number
+ * of replicas reported from data-nodes.
+ */
+ void commitLastBlock(INodeFileUnderConstruction fileINode,
+ Block commitBlock) throws IOException {
+ if(commitBlock == null)
+ return; // not committing, this is a block allocation retry
+ BlockInfo lastBlock = fileINode.getLastBlock();
+ if(lastBlock == null)
+ return; // no blocks in file yet
+ if(lastBlock.isComplete())
+ return; // already completed (e.g. by syncBlock)
+ assert lastBlock.getNumBytes() <= commitBlock.getNumBytes() :
+ "commitBlock length is less than the stored one "
+ + commitBlock.getNumBytes() + " vs. " + lastBlock.getNumBytes();
+ ((BlockInfoUnderConstruction)lastBlock).commitBlock(commitBlock);
+ }
+
+ /**
+ * Convert a specified block of the file to a complete block.
+ * @param fileINode file
+ * @param blkIndex block index in the file
+ * @throws IOException if the block does not have at least a minimal number
+ * of replicas reported from data-nodes.
+ */
+ BlockInfo completeBlock(INodeFile fileINode, int blkIndex)
+ throws IOException {
+ if(blkIndex < 0)
+ return null;
+ BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
+ if(curBlock.isComplete())
+ return curBlock;
+ BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
+ if(ucBlock.numNodes() < minReplication)
+ throw new IOException("Cannot complete block: " +
+ "block does not satisfy minimal replication requirement.");
+ BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
+ // replace penultimate block in file
+ fileINode.setBlock(blkIndex, completeBlock);
+ // replace block in the blocksMap
+ return blocksMap.replaceBlock(completeBlock);
+ }
+
+ BlockInfo completeBlock(INodeFile fileINode, BlockInfo block)
+ throws IOException {
+ BlockInfo[] fileBlocks = fileINode.getBlocks();
+ for(int idx = 0; idx < fileBlocks.length; idx++)
+ if(fileBlocks[idx] == block) {
+ return completeBlock(fileINode, idx);
+ }
+ return block;
+ }
+
+ /**
+ * Convert the last block of the file to an under construction block.
+ * @param fileINode file
+ * @param targets data-nodes that will form the pipeline for this block
+ */
+ void convertLastBlockToUnderConstruction(
+ INodeFileUnderConstruction fileINode,
+ DatanodeDescriptor[] targets) throws IOException {
+ BlockInfo oldBlock = fileINode.getLastBlock();
+ if(oldBlock == null)
+ return;
+ BlockInfoUnderConstruction ucBlock =
+ fileINode.setLastBlock(oldBlock, targets);
+ blocksMap.replaceBlock(ucBlock);
}
/**
@@ -248,7 +337,7 @@
return machineSet;
}
- List<LocatedBlock> getBlockLocations(Block[] blocks, long offset,
+ List<LocatedBlock> getBlockLocations(BlockInfo[] blocks, long offset,
long length, int nrBlocksToReturn) throws IOException {
int curBlk = 0;
long curPos = 0, blkSize = 0;
@@ -263,43 +352,12 @@
}
if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file
- return null;
+ return Collections.<LocatedBlock>emptyList();
long endOff = offset + length;
List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
do {
- // get block locations
- int numNodes = blocksMap.numNodes(blocks[curBlk]);
- int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
- int numCorruptReplicas = corruptReplicas
- .numCorruptReplicas(blocks[curBlk]);
- if (numCorruptNodes != numCorruptReplicas) {
- FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for "
- + blocks[curBlk] + "blockMap has " + numCorruptNodes
- + " but corrupt replicas map has " + numCorruptReplicas);
- }
- boolean blockCorrupt = (numCorruptNodes == numNodes);
- int numMachineSet = blockCorrupt ? numNodes :
- (numNodes - numCorruptNodes);
- DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
- if (numMachineSet > 0) {
- numNodes = 0;
- for (Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
- DatanodeDescriptor dn = it.next();
- boolean replicaCorrupt =
- corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
- if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
- machineSet[numNodes++] = dn;
- }
- }
- LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
- blockCorrupt);
- if (namesystem.isAccessTokenEnabled) {
- b.setAccessToken(namesystem.accessTokenHandler.generateToken(b.getBlock()
- .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.READ)));
- }
- results.add(b);
+ results.add(getBlockLocation(blocks[curBlk], curPos));
curPos += blocks[curBlk].getNumBytes();
curBlk++;
} while (curPos < endOff
@@ -308,6 +366,41 @@
return results;
}
+ /** @return a LocatedBlock for the given block */
+ LocatedBlock getBlockLocation(final BlockInfo blk, final long pos
+ ) throws IOException {
+ if (!blk.isComplete()) {
+ final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
+ final DatanodeDescriptor[] locations = uc.getExpectedLocations();
+ return namesystem.createLocatedBlock(uc, locations, pos, false);
+ }
+
+ // get block locations
+ final int numCorruptNodes = countNodes(blk).corruptReplicas();
+ final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
+ if (numCorruptNodes != numCorruptReplicas) {
+ FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for "
+ + blk + " blockMap has " + numCorruptNodes
+ + " but corrupt replicas map has " + numCorruptReplicas);
+ }
+
+ final int numNodes = blocksMap.numNodes(blk);
+ final boolean isCorrupt = numCorruptNodes == numNodes;
+ final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
+ final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
+ if (numMachines > 0) {
+ int j = 0;
+ for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blk);
+ it.hasNext();) {
+ final DatanodeDescriptor d = it.next();
+ final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
+ if (isCorrupt || (!isCorrupt && !replicaCorrupt))
+ machines[j++] = d;
+ }
+ }
+ return namesystem.createLocatedBlock(blk, machines, pos, isCorrupt);
+ }
+
/**
* Check whether the replication parameter is within the range
* determined by system configuration.
@@ -346,12 +439,13 @@
/**
* Adds block to list of blocks which will be invalidated on specified
- * datanode and log the move
+ * datanode
*
* @param b block
* @param dn datanode
+ * @param log true to create an entry in the log
*/
- void addToInvalidates(Block b, DatanodeInfo dn) {
+ void addToInvalidates(Block b, DatanodeInfo dn, boolean log) {
Collection<Block> invalidateSet = recentInvalidateSets
.get(dn.getStorageID());
if (invalidateSet == null) {
@@ -360,20 +454,39 @@
}
if (invalidateSet.add(b)) {
pendingDeletionBlocksCount++;
- NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
- + b.getBlockName() + " is added to invalidSet of " + dn.getName());
+ if (log) {
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
+ + b + " to " + dn.getName());
+ }
}
}
/**
+ * Adds block to list of blocks which will be invalidated on specified
+ * datanode and log the operation
+ *
+ * @param b block
+ * @param dn datanode
+ */
+ void addToInvalidates(Block b, DatanodeInfo dn) {
+ addToInvalidates(b, dn, true);
+ }
+
+ /**
* Adds block to list of blocks which will be invalidated on all its
* datanodes.
*/
private void addToInvalidates(Block b) {
+ StringBuilder datanodes = new StringBuilder();
for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); it
.hasNext();) {
DatanodeDescriptor node = it.next();
- addToInvalidates(b, node);
+ addToInvalidates(b, node, false);
+ datanodes.append(node.getName()).append(" ");
+ }
+ if (datanodes.length() != 0) {
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
+ + b + " to " + datanodes.toString());
}
}
@@ -431,6 +544,10 @@
addToInvalidates(storedBlock, node);
return;
}
+
+ // Add replica to the data-node if it is not already there
+ node.addBlock(storedBlock);
+
// Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(storedBlock, node);
if (countNodes(storedBlock).liveReplicas() > inode.getReplication()) {
@@ -613,17 +730,20 @@
int requiredReplication, numEffectiveReplicas;
List<DatanodeDescriptor> containingNodes;
DatanodeDescriptor srcNode;
+ INodeFile fileINode = null;
+ int additionalReplRequired;
synchronized (namesystem) {
synchronized (neededReplications) {
// block should belong to a file
- INodeFile fileINode = blocksMap.getINode(block);
+ fileINode = blocksMap.getINode(block);
// abandoned block or block reopened for append
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
return false;
}
+
requiredReplication = fileINode.getReplication();
// get a source data-node
@@ -640,21 +760,34 @@
// do not schedule more if enough replicas is already pending
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
- if(numEffectiveReplicas >= requiredReplication) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- NameNode.stateChangeLog.info("BLOCK* "
- + "Removing block " + block
- + " from neededReplications as it has enough replicas.");
- return false;
+
+ if (numEffectiveReplicas >= requiredReplication) {
+ if ( (pendingReplications.getNumReplicas(block) > 0) ||
+ (blockHasEnoughRacks(block)) ) {
+ neededReplications.remove(block, priority); // remove from neededReplications
+ replIndex--;
+ NameNode.stateChangeLog.info("BLOCK* "
+ + "Removing block " + block
+ + " from neededReplications as it has enough replicas.");
+ return false;
+ }
+ }
+
+ if (numReplicas.liveReplicas() < requiredReplication) {
+ additionalReplRequired = requiredReplication - numEffectiveReplicas;
+ } else {
+ additionalReplRequired = 1; //Needed on a new rack
}
+
}
}
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
- DatanodeDescriptor targets[] = replicator.chooseTarget(
- requiredReplication - numEffectiveReplicas,
- srcNode, containingNodes, null, block.getNumBytes());
+ // It is costly to extract the filename for which chooseTargets is called,
+ // so for now we pass in the Inode itself.
+ DatanodeDescriptor targets[] =
+ replicator.chooseTarget(fileINode, additionalReplRequired,
+ srcNode, containingNodes, block.getNumBytes());
if(targets.length == 0)
return false;
@@ -662,7 +795,7 @@
synchronized (neededReplications) {
// Recheck since global lock was released
// block should belong to a file
- INodeFile fileINode = blocksMap.getINode(block);
+ fileINode = blocksMap.getINode(block);
// abandoned block or block reopened for append
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
@@ -675,13 +808,25 @@
NumberReplicas numReplicas = countNodes(block);
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
- if(numEffectiveReplicas >= requiredReplication) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- NameNode.stateChangeLog.info("BLOCK* "
- + "Removing block " + block
- + " from neededReplications as it has enough replicas.");
- return false;
+
+ if (numEffectiveReplicas >= requiredReplication) {
+ if ( (pendingReplications.getNumReplicas(block) > 0) ||
+ (blockHasEnoughRacks(block)) ) {
+ neededReplications.remove(block, priority); // remove from neededReplications
+ replIndex--;
+ NameNode.stateChangeLog.info("BLOCK* "
+ + "Removing block " + block
+ + " from neededReplications as it has enough replicas.");
+ return false;
+ }
+ }
+
+ if ( (numReplicas.liveReplicas() >= requiredReplication) &&
+ (!blockHasEnoughRacks(block)) ) {
+ if (srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
+ //No use continuing, unless a new rack in this case
+ return false;
+ }
}
// Add block to the to be replicated list
@@ -803,10 +948,13 @@
synchronized (namesystem) {
for (int i = 0; i < timedOutItems.length; i++) {
NumberReplicas num = countNodes(timedOutItems[i]);
- neededReplications.add(timedOutItems[i],
- num.liveReplicas(),
- num.decommissionedReplicas(),
- getReplication(timedOutItems[i]));
+ if (isNeededReplication(timedOutItems[i], getReplication(timedOutItems[i]),
+ num.liveReplicas())) {
+ neededReplications.add(timedOutItems[i],
+ num.liveReplicas(),
+ num.decommissionedReplicas(),
+ getReplication(timedOutItems[i]));
+ }
}
}
/* If we know the target datanodes where the replication timedout,
@@ -828,7 +976,8 @@
Collection<Block> toAdd = new LinkedList<Block>();
Collection<Block> toRemove = new LinkedList<Block>();
Collection<Block> toInvalidate = new LinkedList<Block>();
- node.reportDiff(blocksMap, report, toAdd, toRemove, toInvalidate);
+ Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
+ node.reportDiff(this, report, toAdd, toRemove, toInvalidate, toCorrupt);
for (Block b : toRemove) {
removeStoredBlock(b, node);
@@ -842,6 +991,9 @@
+ " does not belong to any file.");
addToInvalidates(b, node);
}
+ for (BlockInfo b : toCorrupt) {
+ markBlockAsCorrupt(b, node);
+ }
}
/**
@@ -851,7 +1003,8 @@
*/
private Block addStoredBlock(final Block block,
DatanodeDescriptor node,
- DatanodeDescriptor delNodeHint) {
+ DatanodeDescriptor delNodeHint)
+ throws IOException {
BlockInfo storedBlock = blocksMap.getStoredBlock(block);
if (storedBlock == null || storedBlock.getINode() == null) {
// If this block does not belong to anyfile, then we are done.
@@ -963,13 +1116,17 @@
int numCurrentReplica = numLiveReplicas
+ pendingReplications.getNumReplicas(storedBlock);
+ if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
+ numLiveReplicas >= minReplication)
+ storedBlock = completeBlock(fileINode, storedBlock);
+
// check whether safe replication is reached for the block
- namesystem.incrementSafeBlockCount(numCurrentReplica);
+ // only complete blocks are counted towards that
+ if(storedBlock.isComplete())
+ namesystem.incrementSafeBlockCount(numCurrentReplica);
- //
- // if file is being actively written to, then do not check
- // replication-factor here. It will be checked when the file is closed.
- //
+ // if file is under construction, then check whether the block
+ // can be completed
if (fileINode.isUnderConstruction()) {
return storedBlock;
}
@@ -980,7 +1137,7 @@
// handle underReplication/overReplication
short fileReplication = fileINode.getReplication();
- if (numCurrentReplica >= fileReplication) {
+ if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
neededReplications.remove(storedBlock, numCurrentReplica,
num.decommissionedReplicas, fileReplication);
} else {
@@ -1021,8 +1178,10 @@
boolean gotException = false;
if (nodes == null)
return;
- for (Iterator<DatanodeDescriptor> it = nodes.iterator(); it.hasNext(); ) {
- DatanodeDescriptor node = it.next();
+ // make a copy of the array of nodes in order to avoid
+ // ConcurrentModificationException, when the block is removed from the node
+ DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
+ for (DatanodeDescriptor node : nodesCopy) {
try {
invalidateBlock(blk, node);
} catch (IOException e) {
@@ -1058,9 +1217,11 @@
NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas();
// add to under-replicated queue if need to be
- if (neededReplications.add(block, numCurrentReplica, num
- .decommissionedReplicas(), expectedReplication)) {
- nrUnderReplicated++;
+ if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
+ if (neededReplications.add(block, numCurrentReplica, num
+ .decommissionedReplicas(), expectedReplication)) {
+ nrUnderReplicated++;
+ }
}
if (numCurrentReplica > expectedReplication) {
@@ -1104,7 +1265,7 @@
}
}
namesystem.chooseExcessReplicates(nonExcess, block, replication,
- addedNode, delNodeHint);
+ addedNode, delNodeHint, replicator);
}
void addToExcessReplicate(DatanodeInfo dn, Block block) {
@@ -1192,7 +1353,30 @@
// Modify the blocks->datanode map and node's map.
//
pendingReplications.remove(block);
- addStoredBlock(block, node, delHintNode);
+
+ // blockReceived reports a finalized block
+ Collection<Block> toAdd = new LinkedList<Block>();
+ Collection<Block> toInvalidate = new LinkedList<Block>();
+ Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
+ node.processReportedBlock(this, block, ReplicaState.FINALIZED,
+ toAdd, toInvalidate, toCorrupt);
+ // the block is only in one of the lists
+ // if it is in none then data-node already has it
+ assert toAdd.size() + toInvalidate.size() <= 1 :
+ "The block should be only in one of the lists.";
+
+ for (Block b : toAdd) {
+ addStoredBlock(b, node, delHintNode);
+ }
+ for (Block b : toInvalidate) {
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.addBlock: block "
+ + b + " on " + node.getName() + " size " + b.getNumBytes()
+ + " does not belong to any file.");
+ addToInvalidates(b, node);
+ }
+ for (BlockInfo b : toCorrupt) {
+ markBlockAsCorrupt(b, node);
+ }
}
/**
@@ -1224,12 +1408,38 @@
return new NumberReplicas(live, count, corrupt, excess);
}
+ private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
+ NumberReplicas num) {
+ int curReplicas = num.liveReplicas();
+ int curExpectedReplicas = getReplication(block);
+ INode fileINode = blocksMap.getINode(block);
+ Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(block);
+ StringBuffer nodeList = new StringBuffer();
+ while (nodeIter.hasNext()) {
+ DatanodeDescriptor node = nodeIter.next();
+ nodeList.append(node.name);
+ nodeList.append(" ");
+ }
+ FSNamesystem.LOG.info("Block: " + block + ", Expected Replicas: "
+ + curExpectedReplicas + ", live replicas: " + curReplicas
+ + ", corrupt replicas: " + num.corruptReplicas()
+ + ", decommissioned replicas: " + num.decommissionedReplicas()
+ + ", excess replicas: " + num.excessReplicas()
+ + ", Is Open File: " + fileINode.isUnderConstruction()
+ + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+ + srcNode.name + ", Is current datanode decommissioning: "
+ + srcNode.isDecommissionInProgress());
+ }
+
/**
* Return true if there are any blocks on this node that have not
* yet reached their replication factor. Otherwise returns false.
*/
boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
boolean status = false;
+ int underReplicatedBlocks = 0;
+ int decommissionOnlyReplicas = 0;
+ int underReplicatedInOpenFiles = 0;
final Iterator<? extends Block> it = srcNode.getBlockIterator();
while(it.hasNext()) {
final Block block = it.next();
@@ -1239,14 +1449,27 @@
NumberReplicas num = countNodes(block);
int curReplicas = num.liveReplicas();
int curExpectedReplicas = getReplication(block);
- if (curExpectedReplicas > curReplicas) {
- status = true;
+ if (isNeededReplication(block, curExpectedReplicas, curReplicas)) {
+ if (curExpectedReplicas > curReplicas) {
+ //Log info about one block for this node which needs replication
+ if (!status) {
+ status = true;
+ logBlockReplicationInfo(block, srcNode, num);
+ }
+ underReplicatedBlocks++;
+ if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
+ decommissionOnlyReplicas++;
+ }
+ if (fileINode.isUnderConstruction()) {
+ underReplicatedInOpenFiles++;
+ }
+ }
if (!neededReplications.contains(block) &&
pendingReplications.getNumReplicas(block) == 0) {
//
// These blocks have been reported from the datanode
// after the startDecommission method has been executed. These
- // blocks were in flight when the decommission was started.
+ // blocks were in flight when the decommissioning was started.
//
neededReplications.add(block,
curReplicas,
@@ -1256,6 +1479,9 @@
}
}
}
+ srcNode.decommissioningStatus.set(underReplicatedBlocks,
+ decommissionOnlyReplicas,
+ underReplicatedInOpenFiles);
return status;
}
@@ -1293,16 +1519,23 @@
synchronized (namesystem) {
NumberReplicas repl = countNodes(block);
int curExpectedReplicas = getReplication(block);
- neededReplications.update(block, repl.liveReplicas(), repl
- .decommissionedReplicas(), curExpectedReplicas, curReplicasDelta,
- expectedReplicasDelta);
+ if (isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) {
+ neededReplications.update(block, repl.liveReplicas(), repl
+ .decommissionedReplicas(), curExpectedReplicas, curReplicasDelta,
+ expectedReplicasDelta);
+ } else {
+ int oldReplicas = repl.liveReplicas()-curReplicasDelta;
+ int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
+ neededReplications.remove(block, oldReplicas, repl.decommissionedReplicas(),
+ oldExpectedReplicas);
+ }
}
}
void checkReplication(Block block, int numExpectedReplicas) {
// filter out containingNodes that are marked for decommission.
NumberReplicas number = countNodes(block);
- if (number.liveReplicas() < numExpectedReplicas) {
+ if (isNeededReplication(block, numExpectedReplicas, number.liveReplicas())) {
neededReplications.add(block,
number.liveReplicas(),
number.decommissionedReplicas,
@@ -1384,13 +1617,74 @@
return blocksToInvalidate.size();
}
}
+
+ //Returns the number of racks over which a given block is replicated
+ //decommissioning/decommissioned nodes are not counted. corrupt replicas
+ //are also ignored
+ int getNumberOfRacks(Block b) {
+ HashSet<String> rackSet = new HashSet<String>(0);
+ Collection<DatanodeDescriptor> corruptNodes =
+ corruptReplicas.getNodes(b);
+ for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
+ it.hasNext();) {
+ DatanodeDescriptor cur = it.next();
+ if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
+ if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
+ String rackName = cur.getNetworkLocation();
+ if (!rackSet.contains(rackName)) {
+ rackSet.add(rackName);
+ }
+ }
+ }
+ }
+ return rackSet.size();
+ }
+
+ boolean blockHasEnoughRacks(Block b) {
+ if (!this.shouldCheckForEnoughRacks) {
+ return true;
+ }
+ boolean enoughRacks = false;;
+ Collection<DatanodeDescriptor> corruptNodes =
+ corruptReplicas.getNodes(b);
+ int numExpectedReplicas = getReplication(b);
+ String rackName = null;
+ for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
+ it.hasNext();) {
+ DatanodeDescriptor cur = it.next();
+ if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
+ if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
+ if (numExpectedReplicas == 1) {
+ enoughRacks = true;
+ break;
+ }
+ String rackNameNew = cur.getNetworkLocation();
+ if (rackName == null) {
+ rackName = rackNameNew;
+ } else if (!rackName.equals(rackNameNew)) {
+ enoughRacks = true;
+ break;
+ }
+ }
+ }
+ }
+ return enoughRacks;
+ }
+ boolean isNeededReplication(Block b, int expectedReplication, int curReplicas) {
+ if ((curReplicas >= expectedReplication) && (blockHasEnoughRacks(b))) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
long getMissingBlocksCount() {
// not locking
return Math.max(missingBlocksInPrevIter, missingBlocksInCurIter);
}
- BlockInfo addINode(Block block, INodeFile iNode) {
+ BlockInfo addINode(BlockInfo block, INodeFile iNode) {
return blocksMap.addINode(block, iNode);
}
@@ -1409,7 +1703,7 @@
void removeBlockFromMap(Block block) {
blocksMap.removeBlock(block);
}
-
+
int getCapacity() {
synchronized(namesystem) {
return blocksMap.getCapacity();
@@ -1419,4 +1713,26 @@
float getLoadFactor() {
return blocksMap.getLoadFactor();
}
+
+
+ /**
+ * Return a range of corrupt replica block ids. Up to numExpectedBlocks
+ * blocks starting at the next block after startingBlockId are returned
+ * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId
+ * is null, up to numExpectedBlocks blocks are returned from the beginning.
+ * If startingBlockId cannot be found, null is returned.
+ *
+ * @param numExpectedBlocks Number of block ids to return.
+ * 0 <= numExpectedBlocks <= 100
+ * @param startingBlockId Block id from which to start. If null, start at
+ * beginning.
+ * @return Up to numExpectedBlocks blocks from startingBlockId if it exists
+ *
+ */
+ long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
+ Long startingBlockId) {
+ return corruptReplicas.getCorruptReplicaBlockIds(numExpectedBlocks,
+ startingBlockId);
+ }
+
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java Sat Nov 28 20:05:56 2009
@@ -75,11 +75,10 @@
/**
* Add block b belonging to the specified file inode to the map.
*/
- BlockInfo addINode(Block b, INodeFile iNode) {
- int replication = iNode.getReplication();
+ BlockInfo addINode(BlockInfo b, INodeFile iNode) {
BlockInfo info = map.get(b);
- if (info == null) {
- info = new BlockInfo(b, replication);
+ if (info != b) {
+ info = b;
map.put(info, info);
}
info.setINode(iNode);
@@ -191,4 +190,23 @@
float getLoadFactor() {
return loadFactor;
}
+
+ /**
+ * Replace a block in the block map by a new block.
+ * The new block and the old one have the same key.
+ * @param newBlock - block for replacement
+ * @return new block
+ */
+ BlockInfo replaceBlock(BlockInfo newBlock) {
+ BlockInfo currentBlock = map.get(newBlock);
+ assert currentBlock != null : "the block if not in blocksMap";
+ // replace block in data-node lists
+ for(int idx = currentBlock.numNodes()-1; idx >= 0; idx--) {
+ DatanodeDescriptor dn = currentBlock.getDatanode(idx);
+ dn.replaceBlock(currentBlock, newBlock);
+ }
+ // replace block in the map itself
+ map.put(newBlock, newBlock);
+ return newBlock;
+ }
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Sat Nov 28 20:05:56 2009
@@ -34,6 +34,7 @@
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer;
@@ -86,8 +87,10 @@
shouldRun = true;
// Initialize other scheduling parameters from the configuration
- checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600);
- checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);
+ checkpointPeriod = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
+ DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
+ checkpointSize = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_SIZE_KEY,
+ DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_SIZE_DEFAULT);
HttpServer httpServer = backupNode.httpServer;
httpServer.setAttribute("name.system.image", getFSImage());
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/CorruptReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/CorruptReplicasMap.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/CorruptReplicasMap.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/CorruptReplicasMap.java Sat Nov 28 20:05:56 2009
@@ -33,7 +33,7 @@
public class CorruptReplicasMap{
- private Map<Block, Collection<DatanodeDescriptor>> corruptReplicasMap =
+ private SortedMap<Block, Collection<DatanodeDescriptor>> corruptReplicasMap =
new TreeMap<Block, Collection<DatanodeDescriptor>>();
/**
@@ -126,4 +126,59 @@
public int size() {
return corruptReplicasMap.size();
}
+
+ /**
+ * Return a range of corrupt replica block ids. Up to numExpectedBlocks
+ * blocks starting at the next block after startingBlockId are returned
+ * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId
+ * is null, up to numExpectedBlocks blocks are returned from the beginning.
+ * If startingBlockId cannot be found, null is returned.
+ *
+ * @param numExpectedBlocks Number of block ids to return.
+ * 0 <= numExpectedBlocks <= 100
+ * @param startingBlockId Block id from which to start. If null, start at
+ * beginning.
+ * @return Up to numExpectedBlocks blocks from startingBlockId if it exists
+ *
+ */
+ long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
+ Long startingBlockId) {
+ if (numExpectedBlocks < 0 || numExpectedBlocks > 100) {
+ return null;
+ }
+
+ Iterator<Block> blockIt = corruptReplicasMap.keySet().iterator();
+
+ // if the starting block id was specified, iterate over keys until
+ // we find the matching block. If we find a matching block, break
+ // to leave the iterator on the next block after the specified block.
+ if (startingBlockId != null) {
+ boolean isBlockFound = false;
+ while (blockIt.hasNext()) {
+ Block b = blockIt.next();
+ if (b.getBlockId() == startingBlockId) {
+ isBlockFound = true;
+ break;
+ }
+ }
+
+ if (!isBlockFound) {
+ return null;
+ }
+ }
+
+ ArrayList<Long> corruptReplicaBlockIds = new ArrayList<Long>();
+
+ // append up to numExpectedBlocks blockIds to our list
+ for(int i=0; i<numExpectedBlocks && blockIt.hasNext(); i++) {
+ corruptReplicaBlockIds.add(blockIt.next().getBlockId());
+ }
+
+ long[] ret = new long[corruptReplicaBlockIds.size()];
+ for(int i=0; i<ret.length; i++) {
+ ret[i] = corruptReplicaBlockIds.get(i);
+ }
+
+ return ret;
+ }
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Sat Nov 28 20:05:56 2009
@@ -25,7 +25,11 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
@@ -43,6 +47,11 @@
**************************************************/
public class DatanodeDescriptor extends DatanodeInfo {
+
+ // Stores status of decommissioning.
+ // If node is not decommissioning, do not use this object for anything.
+ DecommissioningStatus decommissioningStatus = new DecommissioningStatus();
+
/** Block and targets pair */
public static class BlockTargetPair {
public final Block block;
@@ -55,29 +64,36 @@
}
/** A BlockTargetPair queue. */
- private static class BlockQueue {
- private final Queue<BlockTargetPair> blockq = new LinkedList<BlockTargetPair>();
+ private static class BlockQueue<E> {
+ private final Queue<E> blockq = new LinkedList<E>();
/** Size of the queue */
synchronized int size() {return blockq.size();}
/** Enqueue */
- synchronized boolean offer(Block block, DatanodeDescriptor[] targets) {
- return blockq.offer(new BlockTargetPair(block, targets));
+ synchronized boolean offer(E e) {
+ return blockq.offer(e);
}
/** Dequeue */
- synchronized List<BlockTargetPair> poll(int numBlocks) {
+ synchronized List<E> poll(int numBlocks) {
if (numBlocks <= 0 || blockq.isEmpty()) {
return null;
}
- List<BlockTargetPair> results = new ArrayList<BlockTargetPair>();
+ List<E> results = new ArrayList<E>();
for(; !blockq.isEmpty() && numBlocks > 0; numBlocks--) {
results.add(blockq.poll());
}
return results;
}
+
+ /**
+ * Returns <tt>true</tt> if the queue contains the specified element.
+ */
+ boolean contains(E e) {
+ return blockq.contains(e);
+ }
}
private volatile BlockInfo blockList = null;
@@ -87,9 +103,10 @@
protected boolean needKeyUpdate = false;
/** A queue of blocks to be replicated by this datanode */
- private BlockQueue replicateBlocks = new BlockQueue();
+ private BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<BlockTargetPair>();
/** A queue of blocks to be recovered by this datanode */
- private BlockQueue recoverBlocks = new BlockQueue();
+ private BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
+ new BlockQueue<BlockInfoUnderConstruction>();
/** A set of blocks to be invalidated by this datanode */
private Set<Block> invalidateBlocks = new TreeSet<Block>();
@@ -201,6 +218,21 @@
blockList = b.listInsert(blockList, this);
}
+ /**
+ * Replace specified old block with a new one in the DataNodeDescriptor.
+ *
+ * @param oldBlock - block to be replaced
+ * @param newBlock - a replacement block
+ * @return the new block
+ */
+ BlockInfo replaceBlock(BlockInfo oldBlock, BlockInfo newBlock) {
+ boolean done = removeBlock(oldBlock);
+ assert done : "Old block should belong to the data-node when replacing";
+ done = addBlock(newBlock);
+ assert done : "New block should not belong to the data-node when replacing";
+ return newBlock;
+ }
+
void resetBlocks() {
this.capacity = 0;
this.remaining = 0;
@@ -262,15 +294,20 @@
*/
void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
assert(block != null && targets != null && targets.length > 0);
- replicateBlocks.offer(block, targets);
+ replicateBlocks.offer(new BlockTargetPair(block, targets));
}
/**
* Store block recovery work.
*/
- void addBlockToBeRecovered(Block block, DatanodeDescriptor[] targets) {
- assert(block != null && targets != null && targets.length > 0);
- recoverBlocks.offer(block, targets);
+ void addBlockToBeRecovered(BlockInfoUnderConstruction block) {
+ if(recoverBlocks.contains(block)) {
+ // this prevents adding the same block twice to the recovery queue
+ FSNamesystem.LOG.info("Block " + block +
+ " is already in the recovery queue.");
+ return;
+ }
+ recoverBlocks.offer(block);
}
/**
@@ -308,10 +345,16 @@
new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
}
- BlockCommand getLeaseRecoveryCommand(int maxTransfers) {
- List<BlockTargetPair> blocktargetlist = recoverBlocks.poll(maxTransfers);
- return blocktargetlist == null? null:
- new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK, blocktargetlist);
+ BlockRecoveryCommand getLeaseRecoveryCommand(int maxTransfers) {
+ List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
+ if(blocks == null)
+ return null;
+ BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.size());
+ for(BlockInfoUnderConstruction b : blocks) {
+ brCommand.add(new RecoveringBlock(
+ b, b.getExpectedLocations(), b.getBlockRecoveryId()));
+ }
+ return brCommand;
}
/**
@@ -361,44 +404,29 @@
return blockarray;
}
- void reportDiff(BlocksMap blocksMap,
+ void reportDiff(BlockManager blockManager,
BlockListAsLongs newReport,
Collection<Block> toAdd, // add to DatanodeDescriptor
Collection<Block> toRemove, // remove from DatanodeDescriptor
- Collection<Block> toInvalidate) { // should be removed from DN
- // place a deilimiter in the list which separates blocks
+ Collection<Block> toInvalidate, // should be removed from DN
+ Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
+ // place a delimiter in the list which separates blocks
// that have been reported from those that have not
BlockInfo delimiter = new BlockInfo(new Block(), 1);
boolean added = this.addBlock(delimiter);
assert added : "Delimiting block cannot be present in the node";
if(newReport == null)
- newReport = new BlockListAsLongs( new long[0]);
- // scan the report and collect newly reported blocks
- // Note we are taking special precaution to limit tmp blocks allocated
- // as part this block report - which why block list is stored as longs
- Block iblk = new Block(); // a fixed new'ed block to be reused with index i
- for (int i = 0; i < newReport.getNumberOfBlocks(); ++i) {
- iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i),
- newReport.getBlockGenStamp(i));
- BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
- if(storedBlock == null) {
- // If block is not in blocksMap it does not belong to any file
- toInvalidate.add(new Block(iblk));
- continue;
- }
- if(storedBlock.findDatanode(this) < 0) {// Known block, but not on the DN
- // if the size differs from what is in the blockmap, then return
- // the new block. addStoredBlock will then pick up the right size of this
- // block and will update the block object in the BlocksMap
- if (storedBlock.getNumBytes() != iblk.getNumBytes()) {
- toAdd.add(new Block(iblk));
- } else {
- toAdd.add(storedBlock);
- }
- continue;
- }
+ newReport = new BlockListAsLongs();
+ // scan the report and process newly reported blocks
+ BlockReportIterator itBR = newReport.getBlockReportIterator();
+ while(itBR.hasNext()) {
+ Block iblk = itBR.next();
+ ReplicaState iState = itBR.getCurrentReplicaState();
+ BlockInfo storedBlock = processReportedBlock(blockManager, iblk, iState,
+ toAdd, toInvalidate, toCorrupt);
// move block to the head of the list
- this.moveBlockToHead(storedBlock);
+ if(storedBlock != null && storedBlock.findDatanode(this) >= 0)
+ this.moveBlockToHead(storedBlock);
}
// collect blocks that have not been reported
// all of them are next to the delimiter
@@ -408,6 +436,97 @@
this.removeBlock(delimiter);
}
+ /**
+ * Process a block replica reported by the data-node.
+ *
+ * <ol>
+ * <li>If the block is not known to the system (not in blocksMap) then the
+ * data-node should be notified to invalidate this block.</li>
+ * <li>If the reported replica is valid that is has the same generation stamp
+ * and length as recorded on the name-node, then the replica location is
+ * added to the name-node.</li>
+ * <li>If the reported replica is not valid, then it is marked as corrupt,
+ * which triggers replication of the existing valid replicas.
+ * Corrupt replicas are removed from the system when the block
+ * is fully replicated.</li>
+ * </ol>
+ *
+ * @param blockManager
+ * @param block reported block replica
+ * @param rState reported replica state
+ * @param toAdd add to DatanodeDescriptor
+ * @param toInvalidate missing blocks (not in the blocks map)
+ * should be removed from the data-node
+ * @param toCorrupt replicas with unexpected length or generation stamp;
+ * add to corrupt replicas
+ * @return
+ */
+ BlockInfo processReportedBlock(
+ BlockManager blockManager,
+ Block block, // reported block replica
+ ReplicaState rState, // reported replica state
+ Collection<Block> toAdd, // add to DatanodeDescriptor
+ Collection<Block> toInvalidate, // should be removed from DN
+ Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
+ FSNamesystem.LOG.debug("Reported block " + block
+ + " on " + getName() + " size " + block.getNumBytes()
+ + " replicaState = " + rState);
+
+ // find block by blockId
+ BlockInfo storedBlock = blockManager.blocksMap.getStoredBlock(block);
+ if(storedBlock == null) {
+ // If blocksMap does not contain reported block id,
+ // the replica should be removed from the data-node.
+ toInvalidate.add(new Block(block));
+ return null;
+ }
+
+ FSNamesystem.LOG.debug("In memory blockUCState = " + storedBlock.getBlockUCState());
+
+ // Block is on the DN
+ boolean isCorrupt = false;
+ switch(rState) {
+ case FINALIZED:
+ switch(storedBlock.getBlockUCState()) {
+ case COMPLETE:
+ case COMMITTED:
+ if(storedBlock.getGenerationStamp() != block.getGenerationStamp()
+ || storedBlock.getNumBytes() != block.getNumBytes())
+ isCorrupt = true;
+ break;
+ case UNDER_CONSTRUCTION:
+ case UNDER_RECOVERY:
+ ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
+ this, block, rState);
+ }
+ if(!isCorrupt && storedBlock.findDatanode(this) < 0)
+ if (storedBlock.getNumBytes() != block.getNumBytes()) {
+ toAdd.add(new Block(block));
+ } else {
+ toAdd.add(storedBlock);
+ }
+ break;
+ case RBW:
+ case RWR:
+ if(!storedBlock.isComplete())
+ ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
+ this, block, rState);
+ else
+ isCorrupt = true;
+ break;
+ case RUR: // should not be reported
+ case TEMPORARY: // should not be reported
+ default:
+ FSNamesystem.LOG.warn("Unexpected replica state " + rState
+ + " for block: " + storedBlock +
+ " on " + getName() + " size " + storedBlock.getNumBytes());
+ break;
+ }
+ if(isCorrupt)
+ toCorrupt.add(storedBlock);
+ return storedBlock;
+ }
+
/** Serialization for FSEditLog */
void readFieldsFromFSEditLog(DataInput in) throws IOException {
this.name = DeprecatedUTF8.readString(in);
@@ -475,4 +594,53 @@
// by DatanodeID
return (this == obj) || super.equals(obj);
}
+
+ class DecommissioningStatus {
+ int underReplicatedBlocks;
+ int decommissionOnlyReplicas;
+ int underReplicatedInOpenFiles;
+ long startTime;
+
+ synchronized void set(int underRep,
+ int onlyRep, int underConstruction) {
+ if (isDecommissionInProgress() == false) {
+ return;
+ }
+ underReplicatedBlocks = underRep;
+ decommissionOnlyReplicas = onlyRep;
+ underReplicatedInOpenFiles = underConstruction;
+ }
+
+ synchronized int getUnderReplicatedBlocks() {
+ if (isDecommissionInProgress() == false) {
+ return 0;
+ }
+ return underReplicatedBlocks;
+ }
+ synchronized int getDecommissionOnlyReplicas() {
+ if (isDecommissionInProgress() == false) {
+ return 0;
+ }
+ return decommissionOnlyReplicas;
+ }
+
+ synchronized int getUnderReplicatedInOpenFiles() {
+ if (isDecommissionInProgress() == false) {
+ return 0;
+ }
+ return underReplicatedInOpenFiles;
+ }
+
+ synchronized void setStartTime(long time) {
+ startTime = time;
+ }
+
+ synchronized long getStartTime() {
+ if (isDecommissionInProgress() == false) {
+ return 0;
+ }
+ return startTime;
+ }
+ } // End of class DecommissioningStatus
+
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java Sat Nov 28 20:05:56 2009
@@ -35,6 +35,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
@@ -66,7 +67,7 @@
) throws IOException {
ServletContext context = getServletContext();
InetSocketAddress nnAddr = (InetSocketAddress)context.getAttribute("name.node.address");
- Configuration conf = new Configuration(
+ Configuration conf = new HdfsConfiguration(
(Configuration)context.getAttribute("name.conf"));
UnixUserGroupInformation.saveToConf(conf,
UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Sat Nov 28 20:05:56 2009
@@ -23,6 +23,7 @@
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -79,7 +80,7 @@
try {
this.backupNode =
(NamenodeProtocol) RPC.getProxy(NamenodeProtocol.class,
- NamenodeProtocol.versionID, bnAddress, new Configuration());
+ NamenodeProtocol.versionID, bnAddress, new HdfsConfiguration());
} catch(IOException e) {
Storage.LOG.error("Error connecting to: " + bnAddress, e);
throw e;