You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2007/12/24 23:20:53 UTC
svn commit: r606744 [2/3] - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/dfs/FSNamesystem.java
src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java
src/test/org/apache/hadoop/dfs/TestNNThroughputBenchmark.java
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=606744&r1=606743&r2=606744&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon Dec 24 14:20:52 2007
@@ -1,3758 +1,3758 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.dfs;
-
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.dfs.BlocksWithLocations.BlockWithLocations;
-import org.apache.hadoop.util.*;
-import org.apache.hadoop.mapred.StatusHttpServer;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ipc.Server;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.InetSocketAddress;
-import java.util.*;
-import java.util.Map.Entry;
-import java.text.SimpleDateFormat;
-
-/***************************************************
- * FSNamesystem does the actual bookkeeping work for the
- * DataNode.
- *
- * It tracks several important tables.
- *
- * 1) valid fsname --> blocklist (kept on disk, logged)
- * 2) Set of all valid blocks (inverted #1)
- * 3) block --> machinelist (kept in memory, rebuilt dynamically from reports)
- * 4) machine --> blocklist (inverted #2)
- * 5) LRU cache of updated-heartbeat machines
- ***************************************************/
-class FSNamesystem implements FSConstants {
- public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FSNamesystem");
-
- //
- // Stores the correct file name hierarchy
- //
- FSDirectory dir;
-
- //
- // Stores the block-->datanode(s) map. Updated only in response
- // to client-sent information.
- // Mapping: Block -> { INode, datanodes, self ref }
- //
- BlocksMap blocksMap = new BlocksMap();
-
- /**
- * Stores the datanode -> block map.
- * <p>
- * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by
- * storage id. In order to keep the storage map consistent it tracks
- * all storages ever registered with the namenode.
- * A descriptor corresponding to a specific storage id can be
- * <ul>
- * <li>added to the map if it is a new storage id;</li>
- * <li>updated with a new datanode started as a replacement for the old one
- * with the same storage id; and </li>
- * <li>removed if and only if an existing datanode is restarted to serve a
- * different storage id.</li>
- * </ul> <br>
- * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
- * in the namespace image file. Only the {@link DatanodeInfo} part is
- * persistent, the list of blocks is restored from the datanode block
- * reports.
- * <p>
- * Mapping: StorageID -> DatanodeDescriptor
- */
- Map<String, DatanodeDescriptor> datanodeMap =
- new TreeMap<String, DatanodeDescriptor>();
-
- //
- // Keeps a Collection for every named machine containing
- // blocks that have recently been invalidated and are thought to live
- // on the machine in question.
- // Mapping: StorageID -> ArrayList<Block>
- //
- private Map<String, Collection<Block>> recentInvalidateSets =
- new TreeMap<String, Collection<Block>>();
-
- //
- // Keeps a TreeSet for every named node. Each treeset contains
- // a list of the blocks that are "extra" at that location. We'll
- // eventually remove these extras.
- // Mapping: StorageID -> TreeSet<Block>
- //
- private Map<String, Collection<Block>> excessReplicateMap =
- new TreeMap<String, Collection<Block>>();
-
- //
- // Stats on overall usage
- //
- long totalCapacity = 0L, totalUsed=0L, totalRemaining = 0L;
-
- // total number of connections per live datanode
- int totalLoad = 0;
-
-
- //
- // For the HTTP browsing interface
- //
- StatusHttpServer infoServer;
- int infoPort;
- Date startTime;
-
- //
- Random r = new Random();
-
- /**
- * Stores a set of DatanodeDescriptor objects.
- * This is a subset of {@link #datanodeMap}, containing nodes that are
- * considered alive.
- * The {@link HeartbeatMonitor} periodically checks for outdated entries,
- * and removes them from the list.
- */
- ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
-
- //
- // Store set of Blocks that need to be replicated 1 or more times.
- // We also store pending replication-orders.
- // Set of: Block
- //
- private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
- private PendingReplicationBlocks pendingReplications;
-
- //
- // Used for handling lock-leases
- // Mapping: leaseHolder -> Lease
- //
- private Map<StringBytesWritable, Lease> leases = new TreeMap<StringBytesWritable, Lease>();
- // Set of: Lease
- private SortedSet<Lease> sortedLeases = new TreeSet<Lease>();
-
- //
- // Threaded object that checks to see if we have been
- // getting heartbeats from all clients.
- //
- Daemon hbthread = null; // HeartbeatMonitor thread
- Daemon lmthread = null; // LeaseMonitor thread
- Daemon smmthread = null; // SafeModeMonitor thread
- Daemon replthread = null; // Replication thread
- volatile boolean fsRunning = true;
- long systemStart = 0;
-
- // The maximum number of replicates we should allow for a single block
- private int maxReplication;
- // How many outgoing replication streams a given node should have at one time
- private int maxReplicationStreams;
- // MIN_REPLICATION is how many copies we need in place or else we disallow the write
- private int minReplication;
- // Default replication
- private int defaultReplication;
- // heartbeatRecheckInterval is how often namenode checks for expired datanodes
- private long heartbeatRecheckInterval;
- // heartbeatExpireInterval is how long namenode waits for datanode to report
- // heartbeat
- private long heartbeatExpireInterval;
- //replicationRecheckInterval is how often namenode checks for new replication work
- private long replicationRecheckInterval;
- //decommissionRecheckInterval is how often namenode checks if a node has finished decommission
- private long decommissionRecheckInterval;
- // default block size of a file
- private long defaultBlockSize = 0;
- private int replIndex = 0; // last datanode used for replication work
- static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration
-
- public static FSNamesystem fsNamesystemObject;
- private String localMachine;
- private int port;
- private SafeModeInfo safeMode; // safe mode information
- private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
-
- // datanode networktoplogy
- NetworkTopology clusterMap = new NetworkTopology();
- // for block replicas placement
- ReplicationTargetChooser replicator;
-
- private HostsFileReader hostsReader;
- private Daemon dnthread = null;
-
- // can fs-image be rolled?
- volatile private CheckpointStates ckptState = CheckpointStates.START;
-
- private static final SimpleDateFormat DATE_FORM =
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
-
- /**
- * FSNamesystem constructor.
- */
- FSNamesystem(NameNode nn, Configuration conf) throws IOException {
- fsNamesystemObject = this;
- try {
- initialize(nn, conf);
- } catch(IOException e) {
- close();
- throw e;
- }
- }
-
- /**
- * Initialize FSNamesystem.
- */
- private void initialize(NameNode nn, Configuration conf) throws IOException {
- setConfigurationParameters(conf);
-
- this.localMachine = nn.getNameNodeAddress().getHostName();
- this.port = nn.getNameNodeAddress().getPort();
- this.dir = new FSDirectory(this, conf);
- StartupOption startOpt = NameNode.getStartupOption(conf);
- this.dir.loadFSImage(getNamespaceDirs(conf), startOpt);
- this.safeMode = new SafeModeInfo(conf);
- setBlockTotal();
- pendingReplications = new PendingReplicationBlocks(
- conf.getInt("dfs.replication.pending.timeout.sec",
- -1) * 1000L);
- this.hbthread = new Daemon(new HeartbeatMonitor());
- this.lmthread = new Daemon(new LeaseMonitor());
- this.replthread = new Daemon(new ReplicationMonitor());
- hbthread.start();
- lmthread.start();
- replthread.start();
- this.systemStart = now();
- this.startTime = new Date(systemStart);
-
- this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
- conf.get("dfs.hosts.exclude",""));
- this.dnthread = new Daemon(new DecommissionedMonitor());
- dnthread.start();
-
- String infoAddr = conf.get("dfs.http.bindAddress", "0.0.0.0:50070");
- InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
- String infoHost = infoSocAddr.getHostName();
- int tmpInfoPort = infoSocAddr.getPort();
- this.infoServer = new StatusHttpServer("dfs", infoHost, tmpInfoPort,
- tmpInfoPort == 0);
- this.infoServer.setAttribute("name.system", this);
- this.infoServer.setAttribute("name.node", nn);
- this.infoServer.setAttribute("name.conf", conf);
- this.infoServer.addServlet("fsck", "/fsck", FsckServlet.class);
- this.infoServer.addServlet("getimage", "/getimage", GetImageServlet.class);
- this.infoServer.addServlet("listPaths", "/listPaths/*", ListPathsServlet.class);
- this.infoServer.addServlet("data", "/data/*", FileDataServlet.class);
- this.infoServer.start();
-
- // The web-server port can be ephemeral... ensure we have the correct info
- this.infoPort = this.infoServer.getPort();
- conf.set("dfs.http.bindAddress", infoHost + ":" + infoPort);
- LOG.info("Web-server up at: " + conf.get("dfs.http.bindAddress"));
- }
-
- static Collection<File> getNamespaceDirs(Configuration conf) {
- String[] dirNames = conf.getStrings("dfs.name.dir");
- if (dirNames == null)
- dirNames = new String[] {"/tmp/hadoop/dfs/name"};
- Collection<File> dirs = new ArrayList<File>(dirNames.length);
- for(int idx = 0; idx < dirNames.length; idx++) {
- dirs.add(new File(dirNames[idx]));
- }
- return dirs;
- }
-
- /**
- * dirs is a list of directories where the filesystem directory state
- * is stored
- */
- FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
- fsNamesystemObject = this;
- setConfigurationParameters(conf);
- this.dir = new FSDirectory(fsImage, this, conf);
- }
-
- /**
- * Initializes some of the members from configuration
- */
- private void setConfigurationParameters(Configuration conf)
- throws IOException {
- this.replicator = new ReplicationTargetChooser(
- conf.getBoolean("dfs.replication.considerLoad", true),
- this,
- clusterMap);
- this.defaultReplication = conf.getInt("dfs.replication", 3);
- this.maxReplication = conf.getInt("dfs.replication.max", 512);
- this.minReplication = conf.getInt("dfs.replication.min", 1);
- if (minReplication <= 0)
- throw new IOException(
- "Unexpected configuration parameters: dfs.replication.min = "
- + minReplication
- + " must be greater than 0");
- if (maxReplication >= (int)Short.MAX_VALUE)
- throw new IOException(
- "Unexpected configuration parameters: dfs.replication.max = "
- + maxReplication + " must be less than " + (Short.MAX_VALUE));
- if (maxReplication < minReplication)
- throw new IOException(
- "Unexpected configuration parameters: dfs.replication.min = "
- + minReplication
- + " must be less than dfs.replication.max = "
- + maxReplication);
- this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
- long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
- this.heartbeatRecheckInterval = conf.getInt(
- "heartbeat.recheck.interval", 5 * 60 * 1000); // 5 minutes
- this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
- 10 * heartbeatInterval;
- this.replicationRecheckInterval = 3 * 1000; // 3 second
- this.decommissionRecheckInterval = conf.getInt(
- "dfs.namenode.decommission.interval",
- 5 * 60 * 1000);
- this.defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
- }
-
- /** Return the FSNamesystem object
- *
- */
- public static FSNamesystem getFSNamesystem() {
- return fsNamesystemObject;
- }
-
- NamespaceInfo getNamespaceInfo() {
- return new NamespaceInfo(dir.fsImage.getNamespaceID(),
- dir.fsImage.getCTime(),
- getDistributedUpgradeVersion());
- }
-
- /** Close down this filesystem manager.
- * Causes heartbeat and lease daemons to stop; waits briefly for
- * them to finish, but a short timeout returns control back to caller.
- */
- public void close() {
- fsRunning = false;
- try {
- if (pendingReplications != null) pendingReplications.stop();
- if (infoServer != null) infoServer.stop();
- if (hbthread != null) hbthread.interrupt();
- if (replthread != null) replthread.interrupt();
- if (dnthread != null) dnthread.interrupt();
- if (smmthread != null) smmthread.interrupt();
- } catch (InterruptedException ie) {
- } finally {
- // using finally to ensure we also wait for lease daemon
- try {
- if (lmthread != null) {
- lmthread.interrupt();
- lmthread.join(3000);
- }
- } catch (InterruptedException ie) {
- } finally {
- try {
- dir.close();
- } catch (IOException ex) {
- // do nothing
- }
- }
- }
- }
-
- /**
- * Dump all metadata into specified file
- */
- void metaSave(String filename) throws IOException {
- File file = new File(System.getProperty("hadoop.log.dir"),
- filename);
- PrintWriter out = new PrintWriter(new BufferedWriter(
- new FileWriter(file, true)));
-
-
- //
- // Dump contents of neededReplication
- //
- synchronized (neededReplications) {
- out.println("Metasave: Blocks waiting for replication: " +
- neededReplications.size());
- if (neededReplications.size() > 0) {
- for (Iterator<Block> it = neededReplications.iterator();
- it.hasNext();) {
- Block block = it.next();
- out.print(block);
- for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
- jt.hasNext();) {
- DatanodeDescriptor node = jt.next();
- out.print(" " + node + " : ");
- }
- out.println("");
- }
- }
- }
-
- //
- // Dump blocks from pendingReplication
- //
- pendingReplications.metaSave(out);
-
- //
- // Dump blocks that are waiting to be deleted
- //
- dumpRecentInvalidateSets(out);
-
- //
- // Dump all datanodes
- //
- datanodeDump(out);
-
- out.flush();
- out.close();
- }
-
- long getDefaultBlockSize() {
- return defaultBlockSize;
- }
-
- /* get replication factor of a block */
- private int getReplication(Block block) {
- INodeFile fileINode = blocksMap.getINode(block);
- if (fileINode == null) { // block does not belong to any file
- return 0;
- }
- assert !fileINode.isDirectory() : "Block cannot belong to a directory.";
- return fileINode.getReplication();
- }
-
- /* updates a block in under replication queue */
- synchronized void updateNeededReplications(Block block,
- int curReplicasDelta, int expectedReplicasDelta) {
- NumberReplicas repl = countNodes(block);
- int curExpectedReplicas = getReplication(block);
- neededReplications.update(block,
- repl.liveReplicas(),
- repl.decommissionedReplicas(),
- curExpectedReplicas,
- curReplicasDelta, expectedReplicasDelta);
- }
-
- /**
- * Used only during DFS upgrade for block level CRCs (HADOOP-1134).
- * This returns information for a given blocks that includes:
- * <li> full path name for the file that contains the block.
- * <li> offset of first byte of the block.
- * <li> file length and length of the block.
- * <li> all block locations for the crc file (".file.crc").
- * <li> replication for crc file.
- * When replicas is true, it includes replicas of the block.
- */
- public synchronized BlockCrcInfo blockCrcInfo(
- Block block,
- BlockCrcUpgradeObjectNamenode namenodeUpgradeObj,
- boolean replicas) {
- BlockCrcInfo crcInfo = new BlockCrcInfo();
- crcInfo.status = BlockCrcInfo.STATUS_ERROR;
-
- INodeFile fileINode = blocksMap.getINode(block);
- if ( fileINode == null || fileINode.isDirectory() ) {
- // Most probably reason is that this block does not exist
- if (blocksMap.getStoredBlock(block) == null) {
- crcInfo.status = BlockCrcInfo.STATUS_UNKNOWN_BLOCK;
- } else {
- LOG.warn("getBlockCrcInfo(): Could not find file for " + block);
- }
- return crcInfo;
- }
-
- crcInfo.fileName = "localName:" + fileINode.getLocalName();
-
- // Find the offset and length for this block.
- Block[] fileBlocks = fileINode.getBlocks();
- crcInfo.blockLen = -1;
- if ( fileBlocks != null ) {
- for ( Block b:fileBlocks ) {
- if ( block.equals(b) ) {
- crcInfo.blockLen = b.getNumBytes();
- }
- if ( crcInfo.blockLen < 0 ) {
- crcInfo.startOffset += b.getNumBytes();
- }
- crcInfo.fileSize += b.getNumBytes();
- }
- }
-
- if ( crcInfo.blockLen < 0 ) {
- LOG.warn("blockCrcInfo(): " + block +
- " could not be found in blocks for " + crcInfo.fileName);
- return crcInfo;
- }
-
- String fileName = fileINode.getLocalName();
- if ( fileName.startsWith(".") && fileName.endsWith(".crc") ) {
- crcInfo.status = BlockCrcInfo.STATUS_CRC_BLOCK;
- return crcInfo;
- }
-
- if (replicas) {
- // include block replica locations, instead of crcBlocks
- crcInfo.blockLocationsIncluded = true;
-
- DatanodeInfo[] dnInfo = new DatanodeInfo[blocksMap.numNodes(block)];
- Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
- for (int i=0; it != null && it.hasNext(); i++ ) {
- dnInfo[i] = new DatanodeInfo(it.next());
- }
- crcInfo.blockLocations = new LocatedBlock(block, dnInfo,
- crcInfo.startOffset);
- } else {
-
- //Find CRC file
- BlockCrcUpgradeObjectNamenode.INodeMapEntry entry =
- namenodeUpgradeObj.getINodeMapEntry(fileINode);
-
- if (entry == null || entry.parent == null) {
- LOG.warn("Could not find parent INode for " + fileName + " " + block);
- return crcInfo;
- }
-
- crcInfo.fileName = entry.getAbsoluteName();
-
- String crcName = "." + fileName + ".crc";
- INode iNode = entry.getParentINode().getChild(crcName);
- if (iNode == null || iNode.isDirectory()) {
- // Should we log this?
- crcInfo.status = BlockCrcInfo.STATUS_NO_CRC_DATA;
- return crcInfo;
- }
-
- INodeFile crcINode = (INodeFile)iNode;
- Block[] blocks = crcINode.getBlocks();
- if ( blocks == null ) {
- LOG.warn("getBlockCrcInfo(): could not find blocks for crc file for " +
- crcInfo.fileName);
- return crcInfo;
- }
-
- crcInfo.crcBlocks = new LocatedBlock[ blocks.length ];
- for (int i=0; i<blocks.length; i++) {
- DatanodeInfo[] dnArr = new DatanodeInfo[ blocksMap.numNodes(blocks[i]) ];
- int idx = 0;
- for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blocks[i]);
- it.hasNext();) {
- dnArr[ idx++ ] = it.next();
- }
- crcInfo.crcBlocks[i] = new LocatedBlock(blocks[i], dnArr);
- }
-
- crcInfo.crcReplication = crcINode.getReplication();
- }
-
- crcInfo.status = BlockCrcInfo.STATUS_DATA_BLOCK;
- return crcInfo;
- }
-
- /////////////////////////////////////////////////////////
- //
- // These methods are called by secondary namenodes
- //
- /////////////////////////////////////////////////////////
- /**
- * return a list of blocks & their locations on <code>datanode</code> whose
- * total size is <code>size</code>
- *
- * @param datanode on which blocks are located
- * @param size total size of blocks
- */
- synchronized BlocksWithLocations getBlocks(DatanodeID datanode, long size)
- throws IOException {
- DatanodeDescriptor node = getDatanode(datanode);
- if (node == null) {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
- + "Asking for blocks from an unrecorded node " + datanode.getName());
- throw new IllegalArgumentException(
- "Unexpected exception. Got getBlocks message for datanode " +
- datanode.getName() + ", but there is no info for it");
- }
-
- int numBlocks = node.numBlocks();
- if(numBlocks == 0) {
- return new BlocksWithLocations(new BlockWithLocations[0]);
- }
- Iterator<Block> iter = node.getBlockIterator();
- int startBlock = r.nextInt(numBlocks); // starting from a random block
- // skip blocks
- for(int i=0; i<startBlock; i++) {
- iter.next();
- }
- List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
- long totalSize = 0;
- while(totalSize<size && iter.hasNext()) {
- totalSize += addBlock(iter.next(), results);
- }
- if(totalSize<size) {
- iter = node.getBlockIterator(); // start from the beginning
- for(int i=0; i<startBlock&&totalSize<size; i++) {
- totalSize += addBlock(iter.next(), results);
- }
- }
-
- return new BlocksWithLocations(
- results.toArray(new BlockWithLocations[results.size()]));
- }
-
- /* Get all valid locations of the block & add the block to results
- * return the length of the added block; 0 if the block is not added
- */
- private long addBlock(Block block, List<BlockWithLocations> results) {
- ArrayList<String> machineSet =
- new ArrayList<String>(blocksMap.numNodes(block));
- for(Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator(block); it.hasNext();) {
- String storageID = it.next().getStorageID();
- // filter invalidate replicas
- Collection<Block> blocks = recentInvalidateSets.get(storageID);
- if(blocks==null || !blocks.contains(block)) {
- machineSet.add(storageID);
- }
- }
- if(machineSet.size() == 0) {
- return 0;
- } else {
- results.add(new BlockWithLocations(block,
- machineSet.toArray(new String[machineSet.size()])));
- return block.getNumBytes();
- }
- }
-
- /////////////////////////////////////////////////////////
- //
- // These methods are called by HadoopFS clients
- //
- /////////////////////////////////////////////////////////
- /**
- * Get block locations within the specified range.
- *
- * @see ClientProtocol#open(String, long, long)
- * @see ClientProtocol#getBlockLocations(String, long, long)
- */
- LocatedBlocks getBlockLocations(String clientMachine,
- String src,
- long offset,
- long length
- ) throws IOException {
- if (offset < 0) {
- throw new IOException("Negative offset is not supported. File: " + src );
- }
- if (length < 0) {
- throw new IOException("Negative length is not supported. File: " + src );
- }
-
- DatanodeDescriptor client = null;
- LocatedBlocks blocks = getBlockLocations(dir.getFileINode(src),
- offset, length,
- Integer.MAX_VALUE);
- if (blocks == null) {
- return null;
- }
- client = host2DataNodeMap.getDatanodeByHost(clientMachine);
- for (Iterator<LocatedBlock> it = blocks.getLocatedBlocks().iterator();
- it.hasNext();) {
- LocatedBlock block = it.next();
- clusterMap.pseudoSortByDistance(client,
- (DatanodeDescriptor[])(block.getLocations()));
- }
- return blocks;
- }
-
- private synchronized LocatedBlocks getBlockLocations(INodeFile inode,
- long offset,
- long length,
- int nrBlocksToReturn) {
- if(inode == null) {
- return null;
- }
- Block[] blocks = inode.getBlocks();
- if (blocks == null) {
- return null;
- }
- if (blocks.length == 0) {
- return new LocatedBlocks(inode, new ArrayList<LocatedBlock>(blocks.length));
- }
- List<LocatedBlock> results;
- results = new ArrayList<LocatedBlock>(blocks.length);
-
- int curBlk = 0;
- long curPos = 0, blkSize = 0;
- int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
- for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
- blkSize = blocks[curBlk].getNumBytes();
- assert blkSize > 0 : "Block of size 0";
- if (curPos + blkSize > offset) {
- break;
- }
- curPos += blkSize;
- }
-
- if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file
- return null;
-
- long endOff = offset + length;
-
- do {
- // get block locations
- int numNodes = blocksMap.numNodes(blocks[curBlk]);
- DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numNodes];
- if (numNodes > 0) {
- numNodes = 0;
- for(Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
- machineSet[numNodes++] = it.next();
- }
- }
- results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos));
- curPos += blocks[curBlk].getNumBytes();
- curBlk++;
- } while (curPos < endOff
- && curBlk < blocks.length
- && results.size() < nrBlocksToReturn);
-
- return new LocatedBlocks(inode, results);
- }
-
- /**
- * Set replication for an existing file.
- *
- * The NameNode sets new replication and schedules either replication of
- * under-replicated data blocks or removal of the eccessive block copies
- * if the blocks are over-replicated.
- *
- * @see ClientProtocol#setReplication(String, short)
- * @param src file name
- * @param replication new replication
- * @return true if successful;
- * false if file does not exist or is a directory
- */
- public boolean setReplication(String src, short replication)
- throws IOException {
- boolean status = setReplicationInternal(src, replication);
- getEditLog().logSync();
- return status;
- }
-
- private synchronized boolean setReplicationInternal(String src,
- short replication
- ) throws IOException {
- if (isInSafeMode())
- throw new SafeModeException("Cannot set replication for " + src, safeMode);
- verifyReplication(src, replication, null);
-
- int[] oldReplication = new int[1];
- Block[] fileBlocks;
- fileBlocks = dir.setReplication(src, replication, oldReplication);
- if (fileBlocks == null) // file not found or is a directory
- return false;
- int oldRepl = oldReplication[0];
- if (oldRepl == replication) // the same replication
- return true;
-
- // update needReplication priority queues
- LOG.info("Increasing replication for file " + src
- + ". New replication is " + replication);
- for(int idx = 0; idx < fileBlocks.length; idx++)
- updateNeededReplications(fileBlocks[idx], 0, replication-oldRepl);
-
- if (oldRepl > replication) {
- // old replication > the new one; need to remove copies
- LOG.info("Reducing replication for file " + src
- + ". New replication is " + replication);
- for(int idx = 0; idx < fileBlocks.length; idx++)
- proccessOverReplicatedBlock(fileBlocks[idx], replication, null, null);
- }
- return true;
- }
-
- public long getPreferredBlockSize(String filename) throws IOException {
- return dir.getPreferredBlockSize(filename);
- }
-
- /**
- * Check whether the replication parameter is within the range
- * determined by system configuration.
- */
- private void verifyReplication(String src,
- short replication,
- String clientName
- ) throws IOException {
- String text = "file " + src
- + ((clientName != null) ? " on client " + clientName : "")
- + ".\n"
- + "Requested replication " + replication;
-
- if (replication > maxReplication)
- throw new IOException(text + " exceeds maximum " + maxReplication);
-
- if (replication < minReplication)
- throw new IOException(
- text + " is less than the required minimum " + minReplication);
- }
-
- void startFile(String src, String holder, String clientMachine,
- boolean overwrite, short replication, long blockSize
- ) throws IOException {
- startFileInternal(src, holder, clientMachine, overwrite,
- replication, blockSize);
- getEditLog().logSync();
- }
-
- /**
- * The client would like to create a new block for the indicated
- * filename. Return an array that consists of the block, plus a set
- * of machines. The first on this list should be where the client
- * writes data. Subsequent items in the list must be provided in
- * the connection to the first datanode.
- * Return an array that consists of the block, plus a set
- * of machines
- * @throws IOException if the filename is invalid
- * {@link FSDirectory#isValidToCreate(String)}.
- */
- synchronized void startFileInternal(String src,
- String holder,
- String clientMachine,
- boolean overwrite,
- short replication,
- long blockSize
- ) throws IOException {
- NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: file "
- +src+" for "+holder+" at "+clientMachine);
- if (isInSafeMode())
- throw new SafeModeException("Cannot create file" + src, safeMode);
- if (!isValidName(src)) {
- throw new IOException("Invalid file name: " + src);
- }
- try {
- INode myFile = dir.getFileINode(src);
- if (myFile != null && myFile.isUnderConstruction()) {
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile;
- //
- // If the file is under construction , then it must be in our
- // leases. Find the appropriate lease record.
- //
- Lease lease = getLease(holder);
- //
- // We found the lease for this file. And surprisingly the original
- // holder is trying to recreate this file. This should never occur.
- //
- if (lease != null) {
- throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because current leaseholder is trying to recreate file.");
- }
- //
- // Find the original holder.
- //
- lease = getLease(pendingFile.getClientName());
- if (lease == null) {
- throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because pendingCreates is non-null but no leases found.");
- }
- //
- // If the original holder has not renewed in the last SOFTLIMIT
- // period, then reclaim all resources and allow this request
- // to proceed. Otherwise, prevent this request from creating file.
- //
- if (lease.expiredSoftLimit()) {
- synchronized (sortedLeases) {
- lease.releaseLocks();
- removeLease(lease.getHolder());
- LOG.info("startFile: Removing lease " + lease + " ");
- if (!sortedLeases.remove(lease)) {
- LOG.error("startFile: Unknown failure trying to remove " + lease +
- " from lease set.");
- }
- }
- } else {
- throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- ", because this file is already being created by " +
- pendingFile.getClientName() +
- " on " + pendingFile.getClientMachine());
- }
- }
-
- try {
- verifyReplication(src, replication, clientMachine);
- } catch(IOException e) {
- throw new IOException("failed to create "+e.getMessage());
- }
- if (!dir.isValidToCreate(src)) {
- if (overwrite) {
- delete(src);
- } else {
- throw new IOException("failed to create file " + src
- +" on client " + clientMachine
- +" either because the filename is invalid or the file exists");
- }
- }
-
- DatanodeDescriptor clientNode =
- host2DataNodeMap.getDatanodeByHost(clientMachine);
-
- synchronized (sortedLeases) {
- Lease lease = getLease(holder);
- if (lease == null) {
- lease = new Lease(holder);
- putLease(holder, lease);
- sortedLeases.add(lease);
- } else {
- sortedLeases.remove(lease);
- lease.renew();
- sortedLeases.add(lease);
- }
- lease.startedCreate(src);
- }
-
- //
- // Now we can add the name to the filesystem. This file has no
- // blocks associated with it.
- //
- INode newNode = dir.addFile(src, replication, blockSize,
- holder,
- clientMachine,
- clientNode);
- if (newNode == null) {
- throw new IOException("DIR* NameSystem.startFile: " +
- "Unable to add file to namespace.");
- }
- } catch (IOException ie) {
- NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
- +ie.getMessage());
- throw ie;
- }
-
- NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
- +"add "+src+" to namespace for "+holder);
- }
-
- /**
- * The client would like to obtain an additional block for the indicated
- * filename (which is being written-to). Return an array that consists
- * of the block, plus a set of machines. The first on this list should
- * be where the client writes data. Subsequent items in the list must
- * be provided in the connection to the first datanode.
- *
- * Make sure the previous blocks have been reported by datanodes and
- * are replicated. Will return an empty 2-elt array if we want the
- * client to "try again later".
- */
- public LocatedBlock getAdditionalBlock(String src,
- String clientName
- ) throws IOException {
- long fileLength, blockSize;
- int replication;
- DatanodeDescriptor clientNode = null;
- Block newBlock = null;
-
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file "
- +src+" for "+clientName);
-
- synchronized (this) {
- if (isInSafeMode()) {
- throw new SafeModeException("Cannot add block to " + src, safeMode);
- }
-
- INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
-
- //
- // If we fail this, bad things happen!
- //
- if (!checkFileProgress(pendingFile, false)) {
- throw new NotReplicatedYetException("Not replicated yet:" + src);
- }
- fileLength = pendingFile.computeContentsLength();
- blockSize = pendingFile.getPreferredBlockSize();
- clientNode = pendingFile.getClientNode();
- replication = (int)pendingFile.getReplication();
- newBlock = allocateBlock(src, pendingFile);
- }
-
- DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
- clientNode,
- null,
- blockSize);
- if (targets.length < this.minReplication) {
- // if we could not find any targets, remove this block from file
- synchronized (this) {
- INodeFile iFile = dir.getFileINode(src);
- if (iFile != null && iFile.isUnderConstruction()) {
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
- if (pendingFile.getClientName().equals(clientName)) {
- dir.removeBlock(src, pendingFile, newBlock);
- }
- }
- }
- throw new IOException("File " + src + " could only be replicated to " +
- targets.length + " nodes, instead of " +
- minReplication);
- }
-
- // Create next block
- return new LocatedBlock(newBlock, targets, fileLength);
- }
-
- /**
- * The client would like to let go of the given block
- */
- public synchronized boolean abandonBlock(Block b, String src, String holder
- ) throws IOException {
- //
- // Remove the block from the pending creates list
- //
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
- +b.getBlockName()+"of file "+src);
- INode file = checkLease(src, holder);
- dir.removeBlock(src, file, b);
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
- + b.getBlockName()
- + " is removed from pendingCreates");
- return true;
- }
-
- // make sure that we still have the lease on this file
- private INodeFileUnderConstruction checkLease(String src, String holder
- ) throws IOException {
- INode file = dir.getFileINode(src);
- if (file == null || !file.isUnderConstruction()) {
- throw new LeaseExpiredException("No lease on " + src);
- }
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
- if (!pendingFile.getClientName().equals(holder)) {
- throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
- + pendingFile.getClientName() + " but is accessed by " + holder);
- }
- return pendingFile;
- }
-
- /**
- * Abandon the entire file in progress
- */
- public synchronized void abandonFileInProgress(String src,
- String holder
- ) throws IOException {
- NameNode.stateChangeLog.debug("DIR* NameSystem.abandonFileInProgress:" + src);
- synchronized (sortedLeases) {
- // find the lease
- Lease lease = getLease(holder);
- if (lease != null) {
- // remove the file from the lease
- if (lease.completedCreate(src)) {
- // if we found the file in the lease, remove it from pendingCreates
- internalReleaseCreate(src, holder);
- } else {
- LOG.info("Attempt by " + holder +
- " to release someone else's create lock on " + src);
- }
- } else {
- LOG.info("Attempt to release a lock from an unknown lease holder "
- + holder + " for " + src);
- }
- }
- }
-
- /**
- * The FSNamesystem will already know the blocks that make up the file.
- * Before we return, we make sure that all the file's blocks have
- * been reported by datanodes and are replicated correctly.
- */
- public int completeFile(String src, String holder) throws IOException {
- int status = completeFileInternal(src, holder);
- getEditLog().logSync();
- return status;
- }
-
- private synchronized int completeFileInternal(String src,
- String holder) throws IOException {
- NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
- if (isInSafeMode())
- throw new SafeModeException("Cannot complete file " + src, safeMode);
- INode iFile = dir.getFileINode(src);
- INodeFileUnderConstruction pendingFile = null;
- Block[] fileBlocks = null;
-
- if (iFile != null && iFile.isUnderConstruction()) {
- pendingFile = (INodeFileUnderConstruction) iFile;
- fileBlocks = dir.getFileBlocks(src);
- }
- if (fileBlocks == null ) {
- NameNode.stateChangeLog.warn("DIR* NameSystem.completeFile: "
- + "failed to complete " + src
- + " because dir.getFileBlocks() is null " +
- " and pendingFile is " +
- ((pendingFile == null) ? "null" :
- ("from " + pendingFile.getClientMachine()))
- );
- return OPERATION_FAILED;
- } else if (!checkFileProgress(pendingFile, true)) {
- return STILL_WAITING;
- }
-
- // The file is no longer pending.
- // Create permanent INode, update blockmap
- INodeFile newFile = pendingFile.convertToInodeFile();
- dir.replaceNode(src, pendingFile, newFile);
-
- // persist block allocations for this file
- dir.persistBlocks(src, newFile);
-
- NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
- + " blocklist persisted");
-
- synchronized (sortedLeases) {
- Lease lease = getLease(holder);
- if (lease != null) {
- lease.completedCreate(src);
- if (!lease.hasLocks()) {
- removeLease(holder);
- sortedLeases.remove(lease);
- }
- }
- }
-
- //
- // REMIND - mjc - this should be done only after we wait a few secs.
- // The namenode isn't giving datanodes enough time to report the
- // replicated blocks that are automatically done as part of a client
- // write.
- //
-
- // Now that the file is real, we need to be sure to replicate
- // the blocks.
- int numExpectedReplicas = pendingFile.getReplication();
- Block[] pendingBlocks = pendingFile.getBlocks();
- int nrBlocks = pendingBlocks.length;
- for (int i = 0; i < nrBlocks; i++) {
- // filter out containingNodes that are marked for decommission.
- NumberReplicas number = countNodes(pendingBlocks[i]);
- if (number.liveReplicas() < numExpectedReplicas) {
- neededReplications.add(pendingBlocks[i],
- number.liveReplicas(),
- number.decommissionedReplicas,
- numExpectedReplicas);
- }
- }
- return COMPLETE_SUCCESS;
- }
-
- static Random randBlockId = new Random();
-
- /**
- * Allocate a block at the given pending filename
- */
- private Block allocateBlock(String src, INode file) throws IOException {
- Block b = null;
- do {
- b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
- } while (isValidBlock(b));
- b = dir.addBlock(src, file, b);
- NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
- +src+ ". "+b.getBlockName());
- return b;
- }
-
- /**
- * Check that the indicated file's blocks are present and
- * replicated. If not, return false. If checkall is true, then check
- * all blocks, otherwise check only penultimate block.
- */
- synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
- if (checkall) {
- //
- // check all blocks of the file.
- //
- for (Block block: v.getBlocks()) {
- if (blocksMap.numNodes(block) < this.minReplication) {
- return false;
- }
- }
- } else {
- //
- // check the penultimate block of this file
- //
- Block b = v.getPenultimateBlock();
- if (b != null) {
- if (blocksMap.numNodes(b) < this.minReplication) {
- return false;
- }
- }
- }
- return true;
- }
-
- /**
- * Adds block to list of blocks which will be invalidated on
- * specified datanode.
- */
- private void addToInvalidates(Block b, DatanodeInfo n) {
- Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
- if (invalidateSet == null) {
- invalidateSet = new ArrayList<Block>();
- recentInvalidateSets.put(n.getStorageID(), invalidateSet);
- }
- invalidateSet.add(b);
- }
-
- /**
- * dumps the contents of recentInvalidateSets
- */
- private synchronized void dumpRecentInvalidateSets(PrintWriter out) {
- Collection<Collection<Block>> values = recentInvalidateSets.values();
- Iterator<Map.Entry<String,Collection<Block>>> it =
- recentInvalidateSets.entrySet().iterator();
- if (values.size() == 0) {
- out.println("Metasave: Blocks waiting deletion: 0");
- return;
- }
- out.println("Metasave: Blocks waiting deletion from " +
- values.size() + " datanodes.");
- while (it.hasNext()) {
- Map.Entry<String,Collection<Block>> entry = it.next();
- String storageId = entry.getKey();
- DatanodeDescriptor node = datanodeMap.get(storageId);
- Collection<Block> blklist = entry.getValue();
- if (blklist.size() > 0) {
- out.print(node.getName());
- for (Iterator jt = blklist.iterator(); jt.hasNext();) {
- Block block = (Block) jt.next();
- out.print(" " + block);
- }
- out.println("");
- }
- }
- }
-
- /**
- * Invalidates the given block on the given datanode.
- */
- public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
- throws IOException {
- NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: "
- + blk.getBlockName() + " on "
- + dn.getName());
- if (isInSafeMode()) {
- throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode);
- }
-
- // Check how many copies we have of the block. If we have at least one
- // copy on a live node, then we can delete it.
- int count = countNodes(blk).liveReplicas();
- if (count > 1) {
- addToInvalidates(blk, dn);
- removeStoredBlock(blk, getDatanode(dn));
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.invalidateBlocks: "
- + blk.getBlockName() + " on "
- + dn.getName() + " listed for deletion.");
- } else {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
- + blk.getBlockName() + " on "
- + dn.getName() + " is the only copy and was not deleted.");
- }
- }
-
- ////////////////////////////////////////////////////////////////
- // Here's how to handle block-copy failure during client write:
- // -- As usual, the client's write should result in a streaming
- // backup write to a k-machine sequence.
- // -- If one of the backup machines fails, no worries. Fail silently.
- // -- Before client is allowed to close and finalize file, make sure
- // that the blocks are backed up. Namenode may have to issue specific backup
- // commands to make up for earlier datanode failures. Once all copies
- // are made, edit namespace and return to client.
- ////////////////////////////////////////////////////////////////
-
- public boolean renameTo(String src, String dst) throws IOException {
- boolean status = renameToInternal(src, dst);
- getEditLog().logSync();
- return status;
- }
-
- /**
- * Change the indicated filename.
- */
- public synchronized boolean renameToInternal(String src, String dst) throws IOException {
- NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst);
- if (isInSafeMode())
- throw new SafeModeException("Cannot rename " + src, safeMode);
- if (!isValidName(dst)) {
- throw new IOException("Invalid name: " + dst);
- }
- return dir.renameTo(src, dst);
- }
-
- /**
- * Remove the indicated filename from the namespace. This may
- * invalidate some blocks that make up the file.
- */
- public boolean delete(String src) throws IOException {
- boolean status = deleteInternal(src, true);
- getEditLog().logSync();
- return status;
- }
-
- /**
- * An internal delete function that does not enforce safe mode
- */
- boolean deleteInSafeMode(String src) throws IOException {
- boolean status = deleteInternal(src, false);
- getEditLog().logSync();
- return status;
- }
- /**
- * Remove the indicated filename from the namespace. This may
- * invalidate some blocks that make up the file.
- */
- private synchronized boolean deleteInternal(String src,
- boolean enforceSafeMode)
- throws IOException {
- NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
- if (enforceSafeMode && isInSafeMode())
- throw new SafeModeException("Cannot delete " + src, safeMode);
- Block deletedBlocks[] = dir.delete(src);
- if (deletedBlocks != null) {
- for (int i = 0; i < deletedBlocks.length; i++) {
- Block b = deletedBlocks[i];
-
- for (Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator(b); it.hasNext();) {
- DatanodeDescriptor node = it.next();
- addToInvalidates(b, node);
- NameNode.stateChangeLog.info("BLOCK* NameSystem.delete: "
- + b.getBlockName() + " is added to invalidSet of "
- + node.getName());
- }
- }
- }
-
- return (deletedBlocks != null);
- }
-
- /**
- * Return whether the given filename exists
- */
- public boolean exists(String src) {
- if (dir.getFileBlocks(src) != null || dir.isDir(src)) {
- return true;
- } else {
- return false;
- }
- }
-
- /**
- * Whether the given name is a directory
- */
- public boolean isDir(String src) {
- return dir.isDir(src);
- }
-
- /* Get the file info for a specific file.
- * @param src The string representation of the path to the file
- * @throws IOException if file does not exist
- * @return object containing information regarding the file
- */
- DFSFileInfo getFileInfo(String src) throws IOException {
- return dir.getFileInfo(src);
- }
-
- /**
- * Whether the pathname is valid. Currently prohibits relative paths,
- * and names which contain a ":" or "/"
- */
- static boolean isValidName(String src) {
-
- // Path must be absolute.
- if (!src.startsWith(Path.SEPARATOR)) {
- return false;
- }
-
- // Check for ".." "." ":" "/"
- StringTokenizer tokens = new StringTokenizer(src, Path.SEPARATOR);
- while(tokens.hasMoreTokens()) {
- String element = tokens.nextToken();
- if (element.equals("..") ||
- element.equals(".") ||
- (element.indexOf(":") >= 0) ||
- (element.indexOf("/") >= 0)) {
- return false;
- }
- }
- return true;
- }
- /**
- * Create all the necessary directories
- */
- public boolean mkdirs(String src) throws IOException {
- boolean status = mkdirsInternal(src);
- getEditLog().logSync();
- return status;
- }
-
- /**
- * Create all the necessary directories
- */
- private synchronized boolean mkdirsInternal(String src) throws IOException {
- boolean success;
- NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
- if (isInSafeMode())
- throw new SafeModeException("Cannot create directory " + src, safeMode);
- if (!isValidName(src)) {
- throw new IOException("Invalid directory name: " + src);
- }
- success = dir.mkdirs(src, now());
- if (!success) {
- throw new IOException("Invalid directory name: " + src);
- }
- return success;
- }
-
- /* Get the size of the specified directory subtree.
- * @param src The string representation of the path
- * @throws IOException if path does not exist
- * @return size in bytes
- */
- long getContentLength(String src) throws IOException {
- return dir.getContentLength(src);
- }
-
- /************************************************************
- * A Lease governs all the locks held by a single client.
- * For each client there's a corresponding lease, whose
- * timestamp is updated when the client periodically
- * checks in. If the client dies and allows its lease to
- * expire, all the corresponding locks can be released.
- *************************************************************/
- class Lease implements Comparable<Lease> {
- private StringBytesWritable holder;
- private long lastUpdate;
- private Collection<StringBytesWritable> locks = new TreeSet<StringBytesWritable>();
- private Collection<StringBytesWritable> creates = new TreeSet<StringBytesWritable>();
-
- public Lease(String holder) throws IOException {
- this.holder = new StringBytesWritable(holder);
- renew();
- }
- public void renew() {
- this.lastUpdate = now();
- }
- /**
- * Returns true if the Hard Limit Timer has expired
- */
- public boolean expiredHardLimit() {
- if (now() - lastUpdate > LEASE_HARDLIMIT_PERIOD) {
- return true;
- }
- return false;
- }
- /**
- * Returns true if the Soft Limit Timer has expired
- */
- public boolean expiredSoftLimit() {
- if (now() - lastUpdate > LEASE_SOFTLIMIT_PERIOD) {
- return true;
- }
- return false;
- }
- public void obtained(String src) throws IOException {
- locks.add(new StringBytesWritable(src));
- }
- public void released(String src) throws IOException {
- locks.remove(new StringBytesWritable(src));
- }
- public void startedCreate(String src) throws IOException {
- creates.add(new StringBytesWritable(src));
- }
- public boolean completedCreate(String src) throws IOException {
- return creates.remove(new StringBytesWritable(src));
- }
- public boolean hasLocks() {
- return (locks.size() + creates.size()) > 0;
- }
- public void releaseLocks() throws IOException {
- String holderStr = holder.getString();
- locks.clear();
- for (Iterator<StringBytesWritable> it = creates.iterator(); it.hasNext();)
- internalReleaseCreate(it.next().getString(), holderStr);
- creates.clear();
- }
-
- /**
- */
- public String toString() {
- return "[Lease. Holder: " + holder.toString() + ", heldlocks: " +
- locks.size() + ", pendingcreates: " + creates.size() + "]";
- }
-
- /**
- */
- public int compareTo(Lease o) {
- Lease l1 = this;
- Lease l2 = o;
- long lu1 = l1.lastUpdate;
- long lu2 = l2.lastUpdate;
- if (lu1 < lu2) {
- return -1;
- } else if (lu1 > lu2) {
- return 1;
- } else {
- return l1.holder.compareTo(l2.holder);
- }
- }
-
- public boolean equals(Object o) {
- if (!(o instanceof Lease)) {
- return false;
- }
- Lease obj = (Lease) o;
- if (lastUpdate == obj.lastUpdate &&
- holder.equals(obj.holder)) {
- return true;
- }
- return false;
- }
-
- public int hashCode() {
- return holder.hashCode();
- }
-
- String getHolder() throws IOException {
- return holder.getString();
- }
- }
-
- /******************************************************
- * LeaseMonitor checks for leases that have expired,
- * and disposes of them.
- ******************************************************/
- class LeaseMonitor implements Runnable {
- public void run() {
- try {
- while (fsRunning) {
- synchronized (FSNamesystem.this) {
- synchronized (sortedLeases) {
- Lease top;
- while ((sortedLeases.size() > 0) &&
- ((top = sortedLeases.first()) != null)) {
- if (top.expiredHardLimit()) {
- top.releaseLocks();
- leases.remove(top.holder);
- LOG.info("Removing lease " + top + ", leases remaining: " + sortedLeases.size());
- if (!sortedLeases.remove(top)) {
- LOG.info("Unknown failure trying to remove " + top + " from lease set.");
- }
- } else {
- break;
- }
- }
- }
- }
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ie) {
- }
- }
- } catch (Exception e) {
- FSNamesystem.LOG.error(StringUtils.stringifyException(e));
- }
- }
- }
-
- private Lease getLease(String holder) throws IOException {
- return leases.get(new StringBytesWritable(holder));
- }
-
- private void putLease(String holder, Lease lease) throws IOException {
- leases.put(new StringBytesWritable(holder), lease);
- }
-
- private void removeLease(String holder) throws IOException {
- leases.remove(new StringBytesWritable(holder));
- }
-
- /**
- * Move a file that is being written to be immutable.
- * @param src The filename
- * @param holder The datanode that was creating the file
- */
- private void internalReleaseCreate(String src, String holder) throws IOException {
- INodeFile iFile = dir.getFileINode(src);
- if (iFile == null) {
- NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: "
- + "attempt to release a create lock on "
- + src + " file does not exist.");
- return;
- }
- if (!iFile.isUnderConstruction()) {
- NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: "
- + "attempt to release a create lock on "
- + src + " but file is already closed.");
- return;
- }
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
-
- // The last block that was allocated migth not have been used by the
- // client. In this case, the size of the last block would be 0. A fsck
- // will report this block as a missing block because no datanodes have it.
- // Delete this block.
- Block[] blocks = pendingFile.getBlocks();
- if (blocks != null && blocks.length > 1) {
- Block last = blocks[blocks.length - 1];
- if (last.getNumBytes() == 0) {
- pendingFile.removeBlock(last);
- }
- }
-
- // The file is no longer pending.
- // Create permanent INode, update blockmap
- INodeFile newFile = pendingFile.convertToInodeFile();
- dir.replaceNode(src, pendingFile, newFile);
-
- // persist block allocations for this file
- dir.persistBlocks(src, newFile);
-
- NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: " +
- src + " is no longer written to by " +
- holder);
- }
-
- /**
- * Renew the lease(s) held by the given client
- */
- public void renewLease(String holder) throws IOException {
- synchronized (sortedLeases) {
- if (isInSafeMode())
- throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
- Lease lease = getLease(holder);
- if (lease != null) {
- sortedLeases.remove(lease);
- lease.renew();
- sortedLeases.add(lease);
- }
- }
- }
-
- /**
- * Get a listing of all files at 'src'. The Object[] array
- * exists so we can return file attributes (soon to be implemented)
- */
- public DFSFileInfo[] getListing(String src) {
- return dir.getListing(src);
- }
-
- /////////////////////////////////////////////////////////
- //
- // These methods are called by datanodes
- //
- /////////////////////////////////////////////////////////
- /**
- * Register Datanode.
- * <p>
- * The purpose of registration is to identify whether the new datanode
- * serves a new data storage, and will report new data block copies,
- * which the namenode was not aware of; or the datanode is a replacement
- * node for the data storage that was previously served by a different
- * or the same (in terms of host:port) datanode.
- * The data storages are distinguished by their storageIDs. When a new
- * data storage is reported the namenode issues a new unique storageID.
- * <p>
- * Finally, the namenode returns its namespaceID as the registrationID
- * for the datanodes.
- * namespaceID is a persistent attribute of the name space.
- * The registrationID is checked every time the datanode is communicating
- * with the namenode.
- * Datanodes with inappropriate registrationID are rejected.
- * If the namenode stops, and then restarts it can restore its
- * namespaceID and will continue serving the datanodes that has previously
- * registered with the namenode without restarting the whole cluster.
- *
- * @see DataNode#register()
- */
- public synchronized void registerDatanode(DatanodeRegistration nodeReg,
- String networkLocation
- ) throws IOException {
-
- if (!verifyNodeRegistration(nodeReg)) {
- throw new DisallowedDatanodeException(nodeReg);
- }
-
- String dnAddress = Server.getRemoteAddress();
- if (dnAddress == null) {
- //Mostly not called inside an RPC.
- throw new IOException("Could not find remote address for " +
- "registration from " + nodeReg.getName());
- }
-
- String hostName = nodeReg.getHost();
-
- // update the datanode's name with ip:port
- DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
- nodeReg.getStorageID(),
- nodeReg.getInfoPort());
- nodeReg.updateRegInfo(dnReg);
-
- NameNode.stateChangeLog.info(
- "BLOCK* NameSystem.registerDatanode: "
- + "node registration from " + nodeReg.getName()
- + " storage " + nodeReg.getStorageID());
-
- DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
- DatanodeDescriptor nodeN = host2DataNodeMap.getDatanodeByName(nodeReg.getName());
-
- if (nodeN != null && nodeN != nodeS) {
- NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
- + "node from name: " + nodeN.getName());
- // nodeN previously served a different data storage,
- // which is not served by anybody anymore.
- removeDatanode(nodeN);
- // physically remove node from datanodeMap
- wipeDatanode(nodeN);
- nodeN = null;
- }
-
- if (nodeS != null) {
- if (nodeN == nodeS) {
- // The same datanode has been just restarted to serve the same data
- // storage. We do not need to remove old data blocks, the delta will
- // be calculated on the next block report from the datanode
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
- + "node restarted.");
- } else {
- // nodeS is found
- /* The registering datanode is a replacement node for the existing
- data storage, which from now on will be served by a new node.
- If this message repeats, both nodes might have same storageID
- by (insanely rare) random chance. User needs to restart one of the
- nodes with its data cleared (or user can just remove the StorageID
- value in "VERSION" file under the data directory of the datanode,
- but this is might not work if VERSION file format has changed
- */
- NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
- + "node " + nodeS.getName()
- + " is replaced by " + nodeReg.getName() +
- " with the same storageID " +
- nodeReg.getStorageID());
- }
- // update cluster map
- clusterMap.remove(nodeS);
- nodeS.updateRegInfo(nodeReg);
- nodeS.setNetworkLocation(networkLocation);
- clusterMap.add(nodeS);
- nodeS.setHostName(hostName);
-
- // also treat the registration message as a heartbeat
- synchronized(heartbeats) {
- if( !heartbeats.contains(nodeS)) {
- heartbeats.add(nodeS);
- //update its timestamp
- nodeS.updateHeartbeat(0L, 0L, 0L, 0);
- nodeS.isAlive = true;
- }
- }
- return;
- }
-
- // this is a new datanode serving a new data storage
- if (nodeReg.getStorageID().equals("")) {
- // this data storage has never been registered
- // it is either empty or was created by pre-storageID version of DFS
- nodeReg.storageID = newStorageID();
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.registerDatanode: "
- + "new storageID " + nodeReg.getStorageID() + " assigned.");
- }
- // register new datanode
- DatanodeDescriptor nodeDescr
- = new DatanodeDescriptor(nodeReg, networkLocation, hostName);
- unprotectedAddDatanode(nodeDescr);
- clusterMap.add(nodeDescr);
-
- // also treat the registration message as a heartbeat
- synchronized(heartbeats) {
- heartbeats.add(nodeDescr);
- nodeDescr.isAlive = true;
- // no need to update its timestamp
- // because its is done when the descriptor is created
- }
- return;
- }
-
- /**
- * Get registrationID for datanodes based on the namespaceID.
- *
- * @see #registerDatanode(DatanodeRegistration,String)
- * @see FSImage#newNamespaceID()
- * @return registration ID
- */
- public String getRegistrationID() {
- return Storage.getRegistrationID(dir.fsImage);
- }
-
- /**
- * Generate new storage ID.
- *
- * @return unique storage ID
- *
- * Note: that collisions are still possible if somebody will try
- * to bring in a data storage from a different cluster.
- */
- private String newStorageID() {
- String newID = null;
- while(newID == null) {
- newID = "DS" + Integer.toString(r.nextInt());
- if (datanodeMap.get(newID) != null)
- newID = null;
- }
- return newID;
- }
-
- private boolean isDatanodeDead(DatanodeDescriptor node) {
- return (node.getLastUpdate() <
- (now() - heartbeatExpireInterval));
- }
-
- void setDatanodeDead(DatanodeID nodeID) throws IOException {
- DatanodeDescriptor node = getDatanode(nodeID);
- node.setLastUpdate(0);
- }
-
- /**
- * The given node has reported in. This method should:
- * 1) Record the heartbeat, so the datanode isn't timed out
- * 2) Adjust usage stats for future block allocation
- *
- * If a substantial amount of time passed since the last datanode
- * heartbeat then request an immediate block report.
- *
- * @return true if registration is required or false otherwise.
- * @throws IOException
- */
- public boolean gotHeartbeat(DatanodeID nodeID,
- long capacity,
- long dfsUsed,
- long remaining,
- int xceiverCount,
- int xmitsInProgress,
- Object[] xferResults,
- Object deleteList[]
- ) throws IOException {
- synchronized (heartbeats) {
- synchronized (datanodeMap) {
- DatanodeDescriptor nodeinfo;
- try {
- nodeinfo = getDatanode(nodeID);
- if (nodeinfo == null) {
- return true;
- }
- } catch(UnregisteredDatanodeException e) {
- return true;
- }
-
- // Check if this datanode should actually be shutdown instead.
- if (shouldNodeShutdown(nodeinfo)) {
- setDatanodeDead(nodeinfo);
- throw new DisallowedDatanodeException(nodeinfo);
- }
-
- if (!nodeinfo.isAlive) {
- return true;
- } else {
- updateStats(nodeinfo, false);
- nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
- updateStats(nodeinfo, true);
- //
- // Extract pending replication work or block invalidation
- // work from the datanode descriptor
- //
- nodeinfo.getReplicationSets(this.maxReplicationStreams -
- xmitsInProgress, xferResults);
- if (xferResults[0] == null) {
- nodeinfo.getInvalidateBlocks(FSConstants.BLOCK_INVALIDATE_CHUNK,
- deleteList);
- }
- return false;
- }
- }
- }
- }
-
- private void updateStats(DatanodeDescriptor node, boolean isAdded) {
- //
- // The statistics are protected by the heartbeat lock
- //
- assert(Thread.holdsLock(heartbeats));
- if (isAdded) {
- totalCapacity += node.getCapacity();
- totalUsed += node.getDfsUsed();
- totalRemaining += node.getRemaining();
- totalLoad += node.getXceiverCount();
- } else {
- totalCapacity -= node.getCapacity();
- totalUsed -= node.getDfsUsed();
- totalRemaining -= node.getRemaining();
- totalLoad -= node.getXceiverCount();
- }
- }
- /**
- * Periodically calls heartbeatCheck().
- */
- class HeartbeatMonitor implements Runnable {
- /**
- */
- public void run() {
- while (fsRunning) {
- try {
- heartbeatCheck();
- } catch (Exception e) {
- FSNamesystem.LOG.error(StringUtils.stringifyException(e));
- }
- try {
- Thread.sleep(heartbeatRecheckInterval);
- } catch (InterruptedException ie) {
- }
- }
- }
- }
-
- /**
- * Periodically calls computeReplicationWork().
- */
- class ReplicationMonitor implements Runnable {
- public void run() {
- while (fsRunning) {
- try {
- computeDatanodeWork();
- processPendingReplications();
- Thread.sleep(replicationRecheckInterval);
- } catch (InterruptedException ie) {
- } catch (IOException ie) {
- LOG.warn("ReplicationMonitor thread received exception. " + ie);
- } catch (Throwable t) {
- LOG.warn("ReplicationMonitor thread received Runtime exception. " + t);
- Runtime.getRuntime().exit(-1);
- }
- }
- }
- }
-
- /**
- * Look at a few datanodes and compute any replication work that
- * can be scheduled on them. The datanode will be infomed of this
- * work at the next heartbeat.
- */
- void computeDatanodeWork() throws IOException {
- int numiter = 0;
- int foundwork = 0;
- int hsize = 0;
- int lastReplIndex = -1;
-
- while (true) {
- DatanodeDescriptor node = null;
-
- //
- // pick the datanode that was the last one in the
- // previous invocation of this method.
- //
- synchronized (heartbeats) {
- hsize = heartbeats.size();
- if (numiter++ >= hsize) {
- // no change in replIndex.
- if (lastReplIndex >= 0) {
- //next time, start after where the last replication was scheduled
- replIndex = lastReplIndex;
- }
- break;
- }
- if (replIndex >= hsize) {
- replIndex = 0;
- }
- node = heartbeats.get(replIndex);
- replIndex++;
- }
-
- //
- // Is there replication work to be computed for this datanode?
- //
- int precomputed = node.getNumberOfBlocksToBeReplicated();
- int needed = this.maxReplicationStreams - precomputed;
- boolean doReplication = false;
- boolean doInvalidation = false;
- if (needed > 0) {
- //
- // Compute replication work and store work into the datanode
- //
- Object replsets[] = pendingTransfers(node, needed);
- if (replsets != null) {
- doReplication = true;
- addBlocksToBeReplicated(node, (Block[])replsets[0],
- (DatanodeDescriptor[][])replsets[1]);
- lastReplIndex = replIndex;
- }
- }
- if (!doReplication) {
- //
- // Determine if block deletion is pending for this datanode
- //
- Block blocklist[] = blocksToInvalidate(node);
- if (blocklist != null) {
- doInvalidation = true;
- addBlocksToBeInvalidated(node, blocklist);
- }
- }
- if (doReplication || doInvalidation) {
- //
- // If we have already computed work for a predefined
- // number of datanodes in this iteration, then relax
- //
- if (foundwork > ((hsize * REPL_WORK_PER_ITERATION)/100)) {
- break;
- }
- foundwork++;
- }
- }
- }
-
- /**
- * If there were any replication requests that timed out, reap them
- * and put them back into the neededReplication queue
- */
- void processPendingReplications() {
- Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
- if (timedOutItems != null) {
- synchronized (this) {
- for (int i = 0; i < timedOutItems.length; i++) {
- NumberReplicas num = countNodes(timedOutItems[i]);
- neededReplications.add(timedOutItems[i],
- num.liveReplicas(),
- num.decommissionedReplicas(),
- getReplication(timedOutItems[i]));
- }
- }
- }
- }
-
- /**
- * Add more replication work for this datanode.
- */
- synchronized void addBlocksToBeReplicated(DatanodeDescriptor node,
- Block[] blocklist,
- DatanodeDescriptor[][] targets)
- throws IOException {
- //
- // Find the datanode with the FSNamesystem lock held.
- //
- DatanodeDescriptor n = getDatanode(node);
- if (n != null) {
- n.addBlocksToBeReplicated(blocklist, targets);
- }
- }
-
- /**
- * Add more block invalidation work for this datanode.
- */
- synchronized void addBlocksToBeInvalidated(DatanodeDescriptor node,
- Block[] blocklist) throws IOException {
- //
- // Find the datanode with the FSNamesystem lock held.
- //
- DatanodeDescriptor n = getDatanode(node);
- if (n != null) {
- n.addBlocksToBeInvalidated(blocklist);
- }
- }
-
- /**
- * remove a datanode descriptor
- * @param nodeID datanode ID
- */
- synchronized public void removeDatanode(DatanodeID nodeID)
- throws IOException {
- DatanodeDescriptor nodeInfo = getDatanode(nodeID);
- if (nodeInfo != null) {
- removeDatanode(nodeInfo);
- } else {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
- + nodeID.getName() + " does not exist");
- }
- }
-
- /**
- * remove a datanode descriptor
- * @param nodeInfo datanode descriptor
- */
- private void removeDatanode(DatanodeDescriptor nodeInfo) {
- synchronized (heartbeats) {
- if (nodeInfo.isAlive) {
- updateStats(nodeInfo, false);
- heartbeats.remove(nodeInfo);
- nodeInfo.isAlive = false;
- }
- }
-
- for (Iterator<Block> it = nodeInfo.getBlockIterator(); it.hasNext();) {
- removeStoredBlock(it.next(), nodeInfo);
- }
- unprotectedRemoveDatanode(nodeInfo);
- clusterMap.remove(nodeInfo);
- }
-
- void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
- nodeDescr.resetBlocks();
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.unprotectedRemoveDatanode: "
- + nodeDescr.getName() + " is out of service now.");
- }
-
- void unprotectedAddDatanode(DatanodeDescriptor nodeDescr) {
- /* To keep host2DataNodeMap consistent with datanodeMap,
- remove from host2DataNodeMap the datanodeDescriptor removed
- from datanodeMap before adding nodeDescr to host2DataNodeMap.
- */
- host2DataNodeMap.remove(
- datanodeMap.put(nodeDescr.getStorageID(), nodeDescr));
- host2DataNodeMap.add(nodeDescr);
-
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.unprotectedAddDatanode: "
- + "node " + nodeDescr.getName() + " is added to datanodeMap.");
- }
-
- /**
- * Physically remove node from datanodeMap.
- *
- * @param nodeID node
- */
- void wipeDatanode(DatanodeID nodeID) throws IOException {
- String key = nodeID.getStorageID();
- host2DataNodeMap.remove(datanodeMap.remove(key));
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.wipeDatanode: "
- + nodeID.getName() + " storage " + key
- + " is removed from datanodeMap.");
- }
-
- FSImage getFSImage() {
- return dir.fsImage;
- }
-
- FSEditLog getEditLog() {
- return getFSImage().getEditLog();
- }
-
- /**
- * Check if there are any expired heartbeats, and if so,
- * whether any blocks have to be re-replicated.
- * While removing dead datanodes, make sure that only one datanode is marked
- * dead at a time within the synchronized section. Otherwise, a cascading
- * effect causes more datanodes to be declared dead.
- */
- void heartbeatCheck() {
- boolean allAlive = false;
- while (!allAlive) {
- boolean foundDead = false;
- DatanodeID nodeID = null;
-
- // locate the first dead node.
- synchronized(heartbeats) {
- for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
- it.hasNext();) {
- DatanodeDescriptor nodeInfo = it.next();
- if (isDatanodeDead(nodeInfo)) {
- foundDead = true;
- nodeID = nodeInfo;
- break;
- }
- }
- }
-
- // acquire the fsnamesystem lock, and then remove the dead node.
- if (foundDead) {
- synchronized (this) {
- synchronized(heartbeats) {
- synchronized (datanodeMap) {
- DatanodeDescriptor nodeInfo = null;
- try {
- nodeInfo = getDatanode(nodeID);
- } catch (IOException e) {
- nodeInfo = null;
- }
- if (nodeInfo != null && isDatanodeDead(nodeInfo)) {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
- + "lost heartbeat from " + nodeInfo.getName());
- removeDatanode(nodeInfo);
- }
- }
- }
- }
- }
- allAlive = !foundDead;
- }
- }
-
- /**
- * The given node is reporting all its blocks. Use this info to
- * update the (machine-->blocklist) and (block-->machinelist) tables.
- */
- public synchronized Block[] processReport(DatanodeID nodeID,
- Block newReport[]
- ) throws IOException {
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
- +"from "+nodeID.getName()+" "+newReport.length+" blocks");
- }
- DatanodeDescriptor node = getDatanode(nodeID);
- if (node == null) {
- throw new IOException("ProcessReport from unregisterted node: "
- + nodeID.getName());
- }
-
- // Check if this datanode should actually be shutdown instead.
- if (shouldNodeShutdown(node)) {
- setDatanodeDead(node);
- throw new DisallowedDatanodeException(node);
- }
-
- //
- // Modify the (block-->datanode) map, according to the difference
- // between the old and new block report.
- //
- Collection<Block> toAdd = new LinkedList<Block>();
- Collection<Block> toRemove = new LinkedList<Block>();
- node.reportDiff(blocksMap, newReport, toAdd, toRemove);
-
- for (Block b : toRemove) {
- removeStoredBlock(b, node);
- }
- for (Block b : toAdd) {
- addStoredBlock(b, node, null);
- }
-
- //
- // We've now completely updated the node's block report profile.
- // We now go through all its blocks and find which ones are invalid,
- // no longer pending, or over-replicated.
- //
- // (Note it's not enough to just invalidate blocks at lease expiry
- // time; datanodes can go down before the client's lease on
- // the failed file expires and miss the "expire" event.)
- //
- // This function considers every block on a datanode, and thus
- // should only be invoked infrequently.
- //
- Collection<Block> obsolete = new ArrayList<Block>();
- for (Iterator<Block> it = node.getBlockIterator(); it.hasNext();) {
- Block b = it.next();
-
- //
- // A block report can only send BLOCK_INVALIDATE_CHUNK number of
- // blocks to be deleted. If there are more blocks to be deleted,
- // they are added to recentInvalidateSets and will be sent out
- // thorugh succeeding heartbeat responses.
- //
- if (!isValidBlock(b)) {
- if (obsolete.size() > FSConstants.BLOCK_INVALIDATE_CHUNK) {
- addToInvalidates(b, node);
- } else {
- obsolete.add(b);
- }
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
- +"ask "+nodeID.getName()+" to delete "+b.getBlockName());
- }
- }
- return obsolete.toArray(new Block[obsolete.size()]);
- }
-
- /**
- * Modify (block-->datanode) map. Remove block from set of
- * needed replications if this takes care of the problem.
- * @return the block that is stored in blockMap.
- */
- synchronized Block addStoredBlock(Block block,
- DatanodeDescriptor node,
- DatanodeDescriptor delNodeHint) {
-
- INodeFile fileINode = blocksMap.getINode(block);
- int replication = (fileINode != null) ? fileINode.getReplication() :
- defaultReplication;
- boolean added = blocksMap.addNode(block, node, replication);
-
- Block storedBlock = blocksMap.getStoredBlock(block); //extra look up!
- if (storedBlock != null && block != storedBlock) {
- if (block.getNumBytes() > 0) {
- long cursize = storedBlock.getNumBytes();
- if (cursize == 0) {
- storedBlock.setNumBytes(block.getNumBytes());
- } else if (cursize != block.getNumBytes()) {
- LOG.warn("Inconsistent size for block " + block +
- " reported from " + node.getName() +
- " current size is " + cursize +
- " reported size is " + block.getNumBytes());
- // Accept this block even if there is a problem with its
- // size. Clients should detect data corruption because of
- // CRC mismatch.
- }
- }
- block = storedBlock;
- }
-
- int curReplicaDelta = 0;
-
- if (added) {
- curReplicaDelta = 1;
- //
- // At startup time, because too many new blocks come in
- // they take up lots of space in the log file.
- // So, we log only when namenode is out of safemode.
- //
- if (!isInSafeMode()) {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
- +"blockMap updated: "+node.getName()+" is added to "+block.getBlockName());
- }
- } else {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
- + "Redundant addStoredBlock request received for "
- + block.getBlockName() + " on " + node.getName());
- }
-
- //
- // if file is being actively written to, then do not check
- // replication-factor here. It will be checked when the file is closed.
- //
- if (fileINode == null || fileINode.isUnderConstruction()) {
- return block;
- }
-
- // filter out containingNodes that are marked for decommission.
- NumberReplicas num = countNodes(block);
- int numCurrentReplica = num.liveReplicas()
- + pendingReplications.getNumReplicas(block);
-
- // check whether safe replication is reached for the block
- // only if it is a part of a files
- incrementSafeBlockCount(numCurrentReplica);
-
- // handle underReplication/overReplication
- short fileReplication = fileINode.getReplication();
- if (numCurrentReplica >= fileReplication) {
- neededReplications.remove(block, numCurrentReplica,
- num.decommissionedReplicas, fileReplication);
- } else {
- updateNeededReplications(block, curReplicaDelta, 0);
- }
- if (numCurrentReplica > fileReplication) {
- proccessOverReplicatedBlock(block, fileReplication, node, delNodeHint);
- }
- return block;
- }
-
- /**
- * Find how many of the containing nodes are "extra", if any.
- * If there are any extras, call chooseExcessReplicates() to
- * mark them in the excessReplicateMap.
- */
- private void proccessOverReplicatedBlock(Block block, short replication,
- DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
- if(addedNode == delNodeHint) {
- delNodeHint = null;
- }
- Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
- for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
- it.hasNext();) {
- DatanodeDescriptor cur = it.next();
- Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
- if (excessBlocks == null || !excessBlocks.contains(block)) {
- if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
- nonExcess.add(cur);
- }
- }
- }
- chooseExcessReplicates(nonExcess, block, replication,
- addedNode, delNodeHint);
- }
-
- /**
- * We want "replication" replicates for the block, but we now have too many.
[... 5119 lines stripped ...]