You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/16 23:44:46 UTC
svn commit: r529410 [8/27] - in /lucene/hadoop/trunk: ./
src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/
src/contrib/abacus/src/java/org/apache/hadoop/abacus/
src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/c...
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon Apr 16 14:44:35 2007
@@ -51,1932 +51,1932 @@
* 5) LRU cache of updated-heartbeat machines
***************************************************/
class FSNamesystem implements FSConstants {
- public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FSNamesystem");
+ public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FSNamesystem");
- //
- // Stores the correct file name hierarchy
- //
- FSDirectory dir;
-
- //
- // Stores the block-->datanode(s) map. Updated only in response
- // to client-sent information.
- // Mapping: Block -> { INode, datanodes, self ref }
- //
- BlocksMap blocksMap = new BlocksMap();
+ //
+ // Stores the correct file name hierarchy
+ //
+ FSDirectory dir;
+
+ //
+ // Stores the block-->datanode(s) map. Updated only in response
+ // to client-sent information.
+ // Mapping: Block -> { INode, datanodes, self ref }
+ //
+ BlocksMap blocksMap = new BlocksMap();
- /**
- * Stores the datanode -> block map.
- * <p>
- * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by
- * storage id. In order to keep the storage map consistent it tracks
- * all storages ever registered with the namenode.
- * A descriptor corresponding to a specific storage id can be
- * <ul>
- * <li>added to the map if it is a new storage id;</li>
- * <li>updated with a new datanode started as a replacement for the old one
- * with the same storage id; and </li>
- * <li>removed if and only if an existing datanode is restarted to serve a
- * different storage id.</li>
- * </ul> <br>
- * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
- * in the namespace image file. Only the {@link DatanodeInfo} part is
- * persistent, the list of blocks is restored from the datanode block
- * reports.
- * <p>
- * Mapping: StorageID -> DatanodeDescriptor
- */
- Map<String, DatanodeDescriptor> datanodeMap =
- new TreeMap<String, DatanodeDescriptor>();
-
- //
- // Keeps a Collection for every named machine containing
- // blocks that have recently been invalidated and are thought to live
- // on the machine in question.
- // Mapping: StorageID -> ArrayList<Block>
- //
- private Map<String, Collection<Block>> recentInvalidateSets =
- new TreeMap<String, Collection<Block>>();
-
- //
- // Keeps a TreeSet for every named node. Each treeset contains
- // a list of the blocks that are "extra" at that location. We'll
- // eventually remove these extras.
- // Mapping: StorageID -> TreeSet<Block>
- //
- private Map<String, Collection<Block>> excessReplicateMap =
- new TreeMap<String, Collection<Block>>();
-
- //
- // Keeps track of files that are being created, plus the
- // blocks that make them up.
- // Mapping: fileName -> FileUnderConstruction
- //
- Map<UTF8, FileUnderConstruction> pendingCreates =
- new TreeMap<UTF8, FileUnderConstruction>();
-
- //
- // Keeps track of the blocks that are part of those pending creates
- // Set of: Block
- //
- Collection<Block> pendingCreateBlocks = new TreeSet<Block>();
-
- //
- // Stats on overall usage
- //
- long totalCapacity = 0, totalRemaining = 0;
+ /**
+ * Stores the datanode -> block map.
+ * <p>
+ * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by
+ * storage id. In order to keep the storage map consistent it tracks
+ * all storages ever registered with the namenode.
+ * A descriptor corresponding to a specific storage id can be
+ * <ul>
+ * <li>added to the map if it is a new storage id;</li>
+ * <li>updated with a new datanode started as a replacement for the old one
+ * with the same storage id; and </li>
+ * <li>removed if and only if an existing datanode is restarted to serve a
+ * different storage id.</li>
+ * </ul> <br>
+ * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
+ * in the namespace image file. Only the {@link DatanodeInfo} part is
+ * persistent, the list of blocks is restored from the datanode block
+ * reports.
+ * <p>
+ * Mapping: StorageID -> DatanodeDescriptor
+ */
+ Map<String, DatanodeDescriptor> datanodeMap =
+ new TreeMap<String, DatanodeDescriptor>();
- // total number of connections per live datanode
- int totalLoad = 0;
+ //
+ // Keeps a Collection for every named machine containing
+ // blocks that have recently been invalidated and are thought to live
+ // on the machine in question.
+ // Mapping: StorageID -> ArrayList<Block>
+ //
+ private Map<String, Collection<Block>> recentInvalidateSets =
+ new TreeMap<String, Collection<Block>>();
+
+ //
+ // Keeps a TreeSet for every named node. Each treeset contains
+ // a list of the blocks that are "extra" at that location. We'll
+ // eventually remove these extras.
+ // Mapping: StorageID -> TreeSet<Block>
+ //
+ private Map<String, Collection<Block>> excessReplicateMap =
+ new TreeMap<String, Collection<Block>>();
+
+ //
+ // Keeps track of files that are being created, plus the
+ // blocks that make them up.
+ // Mapping: fileName -> FileUnderConstruction
+ //
+ Map<UTF8, FileUnderConstruction> pendingCreates =
+ new TreeMap<UTF8, FileUnderConstruction>();
+
+ //
+ // Keeps track of the blocks that are part of those pending creates
+ // Set of: Block
+ //
+ Collection<Block> pendingCreateBlocks = new TreeSet<Block>();
+
+ //
+ // Stats on overall usage
+ //
+ long totalCapacity = 0, totalRemaining = 0;
+
+ // total number of connections per live datanode
+ int totalLoad = 0;
+
+
+ //
+ // For the HTTP browsing interface
+ //
+ StatusHttpServer infoServer;
+ int infoPort;
+ String infoBindAddress;
+ Date startTime;
+
+ //
+ Random r = new Random();
+ /**
+ * Stores a set of DatanodeDescriptor objects.
+ * This is a subset of {@link #datanodeMap}, containing nodes that are
+ * considered alive.
+ * The {@link HeartbeatMonitor} periodically checks for outdated entries,
+ * and removes them from the list.
+ */
+ ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
- //
- // For the HTTP browsing interface
- //
- StatusHttpServer infoServer;
- int infoPort;
- String infoBindAddress;
- Date startTime;
-
- //
- Random r = new Random();
+ //
+ // Store set of Blocks that need to be replicated 1 or more times.
+ // We also store pending replication-orders.
+ // Set of: Block
+ //
+ private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
+ private PendingReplicationBlocks pendingReplications;
+
+ //
+ // Used for handling lock-leases
+ // Mapping: leaseHolder -> Lease
+ //
+ private Map<UTF8, Lease> leases = new TreeMap<UTF8, Lease>();
+ // Set of: Lease
+ private SortedSet<Lease> sortedLeases = new TreeSet<Lease>();
+
+ //
+ // Threaded object that checks to see if we have been
+ // getting heartbeats from all clients.
+ //
+ Daemon hbthread = null; // HeartbeatMonitor thread
+ Daemon lmthread = null; // LeaseMonitor thread
+ Daemon smmthread = null; // SafeModeMonitor thread
+ Daemon replthread = null; // Replication thread
+ volatile boolean fsRunning = true;
+ long systemStart = 0;
+
+ // The maximum number of replicates we should allow for a single block
+ private int maxReplication;
+ // How many outgoing replication streams a given node should have at one time
+ private int maxReplicationStreams;
+ // MIN_REPLICATION is how many copies we need in place or else we disallow the write
+ private int minReplication;
+ // Default replication
+ private int defaultReplication;
+ // heartbeatRecheckInterval is how often namenode checks for expired datanodes
+ private long heartbeatRecheckInterval;
+ // heartbeatExpireInterval is how long namenode waits for datanode to report
+ // heartbeat
+ private long heartbeatExpireInterval;
+ //replicationRecheckInterval is how often namenode checks for new replication work
+ private long replicationRecheckInterval;
+ static int replIndex = 0; // last datanode used for replication work
+ static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration
+
+ public static FSNamesystem fsNamesystemObject;
+ private String localMachine;
+ private int port;
+ private SafeModeInfo safeMode; // safe mode information
+
+ // datanode networktoplogy
+ NetworkTopology clusterMap = new NetworkTopology();
+ // for block replicas placement
+ ReplicationTargetChooser replicator;
- /**
- * Stores a set of DatanodeDescriptor objects.
- * This is a subset of {@link #datanodeMap}, containing nodes that are
- * considered alive.
- * The {@link HeartbeatMonitor} periodically checks for outdated entries,
- * and removes them from the list.
- */
- ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
-
- //
- // Store set of Blocks that need to be replicated 1 or more times.
- // We also store pending replication-orders.
- // Set of: Block
- //
- private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
- private PendingReplicationBlocks pendingReplications;
-
- //
- // Used for handling lock-leases
- // Mapping: leaseHolder -> Lease
- //
- private Map<UTF8, Lease> leases = new TreeMap<UTF8, Lease>();
- // Set of: Lease
- private SortedSet<Lease> sortedLeases = new TreeSet<Lease>();
-
- //
- // Threaded object that checks to see if we have been
- // getting heartbeats from all clients.
- //
- Daemon hbthread = null; // HeartbeatMonitor thread
- Daemon lmthread = null; // LeaseMonitor thread
- Daemon smmthread = null; // SafeModeMonitor thread
- Daemon replthread = null; // Replication thread
- volatile boolean fsRunning = true;
- long systemStart = 0;
-
- // The maximum number of replicates we should allow for a single block
- private int maxReplication;
- // How many outgoing replication streams a given node should have at one time
- private int maxReplicationStreams;
- // MIN_REPLICATION is how many copies we need in place or else we disallow the write
- private int minReplication;
- // Default replication
- private int defaultReplication;
- // heartbeatRecheckInterval is how often namenode checks for expired datanodes
- private long heartbeatRecheckInterval;
- // heartbeatExpireInterval is how long namenode waits for datanode to report
- // heartbeat
- private long heartbeatExpireInterval;
- //replicationRecheckInterval is how often namenode checks for new replication work
- private long replicationRecheckInterval;
- static int replIndex = 0; // last datanode used for replication work
- static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration
-
- public static FSNamesystem fsNamesystemObject;
- private String localMachine;
- private int port;
- private SafeModeInfo safeMode; // safe mode information
-
- // datanode networktoplogy
- NetworkTopology clusterMap = new NetworkTopology();
- // for block replicas placement
- ReplicationTargetChooser replicator;
+ private HostsFileReader hostsReader;
+ private Daemon dnthread = null;
- private HostsFileReader hostsReader;
- private Daemon dnthread = null;
+ /**
+ * dirs is a list oif directories where the filesystem directory state
+ * is stored
+ */
+ public FSNamesystem(String hostname,
+ int port,
+ NameNode nn, Configuration conf) throws IOException {
+ fsNamesystemObject = this;
+ this.replicator = new ReplicationTargetChooser(
+ conf.getBoolean("dfs.replication.considerLoad", true));
+ this.defaultReplication = conf.getInt("dfs.replication", 3);
+ this.maxReplication = conf.getInt("dfs.replication.max", 512);
+ this.minReplication = conf.getInt("dfs.replication.min", 1);
+ if( minReplication <= 0 )
+ throw new IOException(
+ "Unexpected configuration parameters: dfs.replication.min = "
+ + minReplication
+ + " must be greater than 0" );
+ if( maxReplication >= (int)Short.MAX_VALUE )
+ throw new IOException(
+ "Unexpected configuration parameters: dfs.replication.max = "
+ + maxReplication + " must be less than " + (Short.MAX_VALUE) );
+ if( maxReplication < minReplication )
+ throw new IOException(
+ "Unexpected configuration parameters: dfs.replication.min = "
+ + minReplication
+ + " must be less than dfs.replication.max = "
+ + maxReplication );
+ this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
+ long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
+ this.heartbeatRecheckInterval = 5 * 60 * 1000; // 5 minutes
+ this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
+ 10 * heartbeatInterval;
+ this.replicationRecheckInterval = 3 * 1000; // 3 second
+
+ this.localMachine = hostname;
+ this.port = port;
+ this.dir = new FSDirectory( this );
+ StartupOption startOpt = (StartupOption)conf.get(
+ "dfs.namenode.startup", StartupOption.REGULAR );
+ this.dir.loadFSImage( getNamespaceDirs(conf), startOpt );
+ this.safeMode = new SafeModeInfo( conf );
+ setBlockTotal();
+ pendingReplications = new PendingReplicationBlocks(LOG);
+ this.hbthread = new Daemon(new HeartbeatMonitor());
+ this.lmthread = new Daemon(new LeaseMonitor());
+ this.replthread = new Daemon(new ReplicationMonitor());
+ hbthread.start();
+ lmthread.start();
+ replthread.start();
+ this.systemStart = now();
+ this.startTime = new Date(systemStart);
+
+ this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
+ conf.get("dfs.hosts.exclude",""));
+ this.dnthread = new Daemon(new DecommissionedMonitor());
+ dnthread.start();
+
+ this.infoPort = conf.getInt("dfs.info.port", 50070);
+ this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0");
+ this.infoServer = new StatusHttpServer("dfs",infoBindAddress, infoPort, false);
+ this.infoServer.setAttribute("name.system", this);
+ this.infoServer.setAttribute("name.node", nn);
+ this.infoServer.setAttribute("name.conf", conf);
+ this.infoServer.addServlet("fsck", "/fsck", FsckServlet.class);
+ this.infoServer.addServlet("getimage", "/getimage", GetImageServlet.class);
+ this.infoServer.start();
+
+ // The web-server port can be ephemeral... ensure we have the correct info
+ this.infoPort = this.infoServer.getPort();
+ conf.set("dfs.info.port", this.infoPort);
+ LOG.info("Web-server up at: " + conf.get("dfs.info.port"));
+ }
- /**
- * dirs is a list oif directories where the filesystem directory state
- * is stored
- */
- public FSNamesystem(String hostname,
- int port,
- NameNode nn, Configuration conf) throws IOException {
- fsNamesystemObject = this;
- this.replicator = new ReplicationTargetChooser(
- conf.getBoolean("dfs.replication.considerLoad", true));
- this.defaultReplication = conf.getInt("dfs.replication", 3);
- this.maxReplication = conf.getInt("dfs.replication.max", 512);
- this.minReplication = conf.getInt("dfs.replication.min", 1);
- if( minReplication <= 0 )
- throw new IOException(
- "Unexpected configuration parameters: dfs.replication.min = "
- + minReplication
- + " must be greater than 0" );
- if( maxReplication >= (int)Short.MAX_VALUE )
- throw new IOException(
- "Unexpected configuration parameters: dfs.replication.max = "
- + maxReplication + " must be less than " + (Short.MAX_VALUE) );
- if( maxReplication < minReplication )
- throw new IOException(
- "Unexpected configuration parameters: dfs.replication.min = "
- + minReplication
- + " must be less than dfs.replication.max = "
- + maxReplication );
- this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
- long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
- this.heartbeatRecheckInterval = 5 * 60 * 1000; // 5 minutes
- this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
- 10 * heartbeatInterval;
- this.replicationRecheckInterval = 3 * 1000; // 3 second
-
- this.localMachine = hostname;
- this.port = port;
- this.dir = new FSDirectory( this );
- StartupOption startOpt = (StartupOption)conf.get(
- "dfs.namenode.startup", StartupOption.REGULAR );
- this.dir.loadFSImage( getNamespaceDirs(conf), startOpt );
- this.safeMode = new SafeModeInfo( conf );
- setBlockTotal();
- pendingReplications = new PendingReplicationBlocks(LOG);
- this.hbthread = new Daemon(new HeartbeatMonitor());
- this.lmthread = new Daemon(new LeaseMonitor());
- this.replthread = new Daemon(new ReplicationMonitor());
- hbthread.start();
- lmthread.start();
- replthread.start();
- this.systemStart = now();
- this.startTime = new Date(systemStart);
-
- this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
- conf.get("dfs.hosts.exclude",""));
- this.dnthread = new Daemon(new DecommissionedMonitor());
- dnthread.start();
-
- this.infoPort = conf.getInt("dfs.info.port", 50070);
- this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0");
- this.infoServer = new StatusHttpServer("dfs",infoBindAddress, infoPort, false);
- this.infoServer.setAttribute("name.system", this);
- this.infoServer.setAttribute("name.node", nn);
- this.infoServer.setAttribute("name.conf", conf);
- this.infoServer.addServlet("fsck", "/fsck", FsckServlet.class);
- this.infoServer.addServlet("getimage", "/getimage", GetImageServlet.class);
- this.infoServer.start();
-
- // The web-server port can be ephemeral... ensure we have the correct info
- this.infoPort = this.infoServer.getPort();
- conf.set("dfs.info.port", this.infoPort);
- LOG.info("Web-server up at: " + conf.get("dfs.info.port"));
- }
-
- static Collection<File> getNamespaceDirs(Configuration conf) {
- String[] dirNames = conf.getStrings("dfs.name.dir");
- if (dirNames == null)
- dirNames = new String[] {"/tmp/hadoop/dfs/name"};
- Collection<File> dirs = new ArrayList<File>( dirNames.length );
- for( int idx = 0; idx < dirNames.length; idx++ ) {
- dirs.add( new File(dirNames[idx] ));
- }
- return dirs;
+ static Collection<File> getNamespaceDirs(Configuration conf) {
+ String[] dirNames = conf.getStrings("dfs.name.dir");
+ if (dirNames == null)
+ dirNames = new String[] {"/tmp/hadoop/dfs/name"};
+ Collection<File> dirs = new ArrayList<File>( dirNames.length );
+ for( int idx = 0; idx < dirNames.length; idx++ ) {
+ dirs.add( new File(dirNames[idx] ));
}
+ return dirs;
+ }
- /**
- * dirs is a list of directories where the filesystem directory state
- * is stored
- */
- FSNamesystem(FSImage fsImage) throws IOException {
- fsNamesystemObject = this;
- this.dir = new FSDirectory(fsImage, this);
- }
+ /**
+ * dirs is a list of directories where the filesystem directory state
+ * is stored
+ */
+ FSNamesystem(FSImage fsImage) throws IOException {
+ fsNamesystemObject = this;
+ this.dir = new FSDirectory(fsImage, this);
+ }
- /** Return the FSNamesystem object
- *
- */
- public static FSNamesystem getFSNamesystem() {
- return fsNamesystemObject;
- }
-
- NamespaceInfo getNamespaceInfo() {
- return new NamespaceInfo( dir.fsImage.getNamespaceID(),
- dir.fsImage.getCTime() );
- }
+ /** Return the FSNamesystem object
+ *
+ */
+ public static FSNamesystem getFSNamesystem() {
+ return fsNamesystemObject;
+ }
+
+ NamespaceInfo getNamespaceInfo() {
+ return new NamespaceInfo( dir.fsImage.getNamespaceID(),
+ dir.fsImage.getCTime() );
+ }
- /** Close down this filesystem manager.
- * Causes heartbeat and lease daemons to stop; waits briefly for
- * them to finish, but a short timeout returns control back to caller.
- */
- public void close() {
- fsRunning = false;
+ /** Close down this filesystem manager.
+ * Causes heartbeat and lease daemons to stop; waits briefly for
+ * them to finish, but a short timeout returns control back to caller.
+ */
+ public void close() {
+ fsRunning = false;
+ try {
+ if (pendingReplications != null) pendingReplications.stop();
+ if (infoServer != null) infoServer.stop();
+ if (hbthread != null) hbthread.interrupt();
+ if (replthread != null) replthread.interrupt();
+ if (dnthread != null) dnthread.interrupt();
+ if (smmthread != null) smmthread.interrupt();
+ } catch (InterruptedException ie) {
+ } finally {
+ // using finally to ensure we also wait for lease daemon
+ try {
+ if (lmthread != null) {
+ lmthread.interrupt();
+ lmthread.join(3000);
+ }
+ } catch (InterruptedException ie) {
+ } finally {
try {
- if (pendingReplications != null) pendingReplications.stop();
- if (infoServer != null) infoServer.stop();
- if (hbthread != null) hbthread.interrupt();
- if (replthread != null) replthread.interrupt();
- if (dnthread != null) dnthread.interrupt();
- if (smmthread != null) smmthread.interrupt();
- } catch (InterruptedException ie) {
- } finally {
- // using finally to ensure we also wait for lease daemon
- try {
- if (lmthread != null) {
- lmthread.interrupt();
- lmthread.join(3000);
- }
- } catch (InterruptedException ie) {
- } finally {
- try {
- dir.close();
- } catch (IOException ex) {
- // do nothing
- }
- }
+ dir.close();
+ } catch (IOException ex) {
+ // do nothing
}
+ }
}
+ }
- /**
- * Dump all metadata into specified file
- */
- void metaSave(String filename) throws IOException {
- File file = new File(System.getProperty("hadoop.log.dir"),
- filename);
- PrintWriter out = new PrintWriter(new BufferedWriter(
- new FileWriter(file, true)));
+ /**
+ * Dump all metadata into specified file
+ */
+ void metaSave(String filename) throws IOException {
+ File file = new File(System.getProperty("hadoop.log.dir"),
+ filename);
+ PrintWriter out = new PrintWriter(new BufferedWriter(
+ new FileWriter(file, true)));
- //
- // Dump contents of neededReplication
- //
- synchronized (neededReplications) {
- out.println("Metasave: Blocks waiting for replication: " +
- neededReplications.size());
- if (neededReplications.size() > 0) {
- for (Iterator<Block> it = neededReplications.iterator();
- it.hasNext();) {
- Block block = it.next();
- out.print(block);
- for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
- jt.hasNext(); ) {
- DatanodeDescriptor node = jt.next();
- out.print(" " + node + " : " );
- }
- out.println("");
+ //
+ // Dump contents of neededReplication
+ //
+ synchronized (neededReplications) {
+ out.println("Metasave: Blocks waiting for replication: " +
+ neededReplications.size());
+ if (neededReplications.size() > 0) {
+ for (Iterator<Block> it = neededReplications.iterator();
+ it.hasNext();) {
+ Block block = it.next();
+ out.print(block);
+ for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
+ jt.hasNext(); ) {
+ DatanodeDescriptor node = jt.next();
+ out.print(" " + node + " : " );
}
+ out.println("");
}
}
+ }
- //
- // Dump blocks from pendingReplication
- //
- pendingReplications.metaSave(out);
+ //
+ // Dump blocks from pendingReplication
+ //
+ pendingReplications.metaSave(out);
- //
- // Dump blocks that are waiting to be deleted
- //
- dumpRecentInvalidateSets(out);
+ //
+ // Dump blocks that are waiting to be deleted
+ //
+ dumpRecentInvalidateSets(out);
- //
- // Dump all datanodes
- //
- datanodeDump(out);
+ //
+ // Dump all datanodes
+ //
+ datanodeDump(out);
- out.flush();
- out.close();
- }
+ out.flush();
+ out.close();
+ }
- /* get replication factor of a block */
- private int getReplication( Block block ) {
- FSDirectory.INode fileINode = blocksMap.getINode( block );
- if( fileINode == null ) { // block does not belong to any file
- return 0;
- } else {
- return fileINode.getReplication();
- }
+ /* get replication factor of a block */
+ private int getReplication( Block block ) {
+ FSDirectory.INode fileINode = blocksMap.getINode( block );
+ if( fileINode == null ) { // block does not belong to any file
+ return 0;
+ } else {
+ return fileINode.getReplication();
}
+ }
- /* Class for keeping track of under replication blocks
- * Blocks have replication priority, with priority 0 indicating the highest
- * Blocks have only one replicas has the highest
- */
- private class UnderReplicatedBlocks {
- private static final int LEVEL = 3;
- TreeSet<Block>[] priorityQueues = new TreeSet[LEVEL];
+ /* Class for keeping track of under replication blocks
+ * Blocks have replication priority, with priority 0 indicating the highest
+ * Blocks have only one replicas has the highest
+ */
+ private class UnderReplicatedBlocks {
+ private static final int LEVEL = 3;
+ TreeSet<Block>[] priorityQueues = new TreeSet[LEVEL];
- /* constructor */
- UnderReplicatedBlocks() {
- for(int i=0; i<LEVEL; i++) {
- priorityQueues[i] = new TreeSet<Block>();
- }
- }
+ /* constructor */
+ UnderReplicatedBlocks() {
+ for(int i=0; i<LEVEL; i++) {
+ priorityQueues[i] = new TreeSet<Block>();
+ }
+ }
- /* Return the total number of under replication blocks */
- synchronized int size() {
- int size = 0;
- for( int i=0; i<LEVEL; i++ ) {
- size += priorityQueues[i].size();
- }
- return size;
- }
+ /* Return the total number of under replication blocks */
+ synchronized int size() {
+ int size = 0;
+ for( int i=0; i<LEVEL; i++ ) {
+ size += priorityQueues[i].size();
+ }
+ return size;
+ }
- /* Check if a block is in the neededReplication queue */
- synchronized boolean contains(Block block) {
- for(TreeSet<Block> set:priorityQueues) {
- if(set.contains(block)) return true;
- }
- return false;
- }
+ /* Check if a block is in the neededReplication queue */
+ synchronized boolean contains(Block block) {
+ for(TreeSet<Block> set:priorityQueues) {
+ if(set.contains(block)) return true;
+ }
+ return false;
+ }
- /* Return the priority of a block
- * @param block a under replication block
- * @param curReplicas current number of replicas of the block
- * @param expectedReplicas expected number of replicas of the block
- */
- private int getPriority(Block block,
- int curReplicas, int expectedReplicas) {
- if (curReplicas<=0 || curReplicas>=expectedReplicas) {
- return LEVEL; // no need to replicate
- } else if(curReplicas==1) {
- return 0; // highest priority
- } else if(curReplicas*3<expectedReplicas) {
- return 1;
- } else {
- return 2;
- }
- }
+ /* Return the priority of a block
+ * @param block a under replication block
+ * @param curReplicas current number of replicas of the block
+ * @param expectedReplicas expected number of replicas of the block
+ */
+ private int getPriority(Block block,
+ int curReplicas, int expectedReplicas) {
+ if (curReplicas<=0 || curReplicas>=expectedReplicas) {
+ return LEVEL; // no need to replicate
+ } else if(curReplicas==1) {
+ return 0; // highest priority
+ } else if(curReplicas*3<expectedReplicas) {
+ return 1;
+ } else {
+ return 2;
+ }
+ }
- /* add a block to a under replication queue according to its priority
- * @param block a under replication block
- * @param curReplicas current number of replicas of the block
- * @param expectedReplicas expected number of replicas of the block
- */
- synchronized boolean add(
- Block block, int curReplicas, int expectedReplicas) {
- if(curReplicas<=0 || expectedReplicas <= curReplicas) {
- return false;
- }
- int priLevel = getPriority(block, curReplicas, expectedReplicas);
- if( priorityQueues[priLevel].add(block) ) {
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.UnderReplicationBlock.add:"
- + block.getBlockName()
- + " has only "+curReplicas
- + " replicas and need " + expectedReplicas
- + " replicas so is added to neededReplications"
- + " at priority level " + priLevel );
- return true;
- }
- return false;
- }
+ /* add a block to a under replication queue according to its priority
+ * @param block a under replication block
+ * @param curReplicas current number of replicas of the block
+ * @param expectedReplicas expected number of replicas of the block
+ */
+ synchronized boolean add(
+ Block block, int curReplicas, int expectedReplicas) {
+ if(curReplicas<=0 || expectedReplicas <= curReplicas) {
+ return false;
+ }
+ int priLevel = getPriority(block, curReplicas, expectedReplicas);
+ if( priorityQueues[priLevel].add(block) ) {
+ NameNode.stateChangeLog.debug(
+ "BLOCK* NameSystem.UnderReplicationBlock.add:"
+ + block.getBlockName()
+ + " has only "+curReplicas
+ + " replicas and need " + expectedReplicas
+ + " replicas so is added to neededReplications"
+ + " at priority level " + priLevel );
+ return true;
+ }
+ return false;
+ }
- /* add a block to a under replication queue */
- synchronized boolean add(Block block) {
- int expectedReplicas = getReplication(block);
- return add(block,
- countContainingNodes( block ),
- expectedReplicas);
- }
-
- /* remove a block from a under replication queue */
- synchronized boolean remove(Block block,
- int oldReplicas, int oldExpectedReplicas) {
- int priLevel = getPriority(block, oldReplicas, oldExpectedReplicas);
- return remove(block, priLevel);
- }
-
- /* remove a block from a under replication queue given a priority*/
- private boolean remove(Block block, int priLevel ) {
- if( priLevel >= 0 && priLevel < LEVEL
- && priorityQueues[priLevel].remove(block) ) {
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.UnderReplicationBlock.remove: "
- + "Removing block " + block.getBlockName()
- + " from priority queue "+ priLevel );
- return true;
- } else {
- for(int i=0; i<LEVEL; i++) {
- if( i!=priLevel && priorityQueues[i].remove(block) ) {
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.UnderReplicationBlock.remove: "
- + "Removing block " + block.getBlockName()
- + " from priority queue "+ i );
- return true;
- }
- }
- }
- return false;
+ /* add a block to a under replication queue */
+ synchronized boolean add(Block block) {
+ int expectedReplicas = getReplication(block);
+ return add(block,
+ countContainingNodes( block ),
+ expectedReplicas);
+ }
+
+ /* remove a block from a under replication queue */
+ synchronized boolean remove(Block block,
+ int oldReplicas, int oldExpectedReplicas) {
+ int priLevel = getPriority(block, oldReplicas, oldExpectedReplicas);
+ return remove(block, priLevel);
+ }
+
+ /* remove a block from a under replication queue given a priority*/
+ private boolean remove(Block block, int priLevel ) {
+ if( priLevel >= 0 && priLevel < LEVEL
+ && priorityQueues[priLevel].remove(block) ) {
+ NameNode.stateChangeLog.debug(
+ "BLOCK* NameSystem.UnderReplicationBlock.remove: "
+ + "Removing block " + block.getBlockName()
+ + " from priority queue "+ priLevel );
+ return true;
+ } else {
+ for(int i=0; i<LEVEL; i++) {
+ if( i!=priLevel && priorityQueues[i].remove(block) ) {
+ NameNode.stateChangeLog.debug(
+ "BLOCK* NameSystem.UnderReplicationBlock.remove: "
+ + "Removing block " + block.getBlockName()
+ + " from priority queue "+ i );
+ return true;
+ }
}
+ }
+ return false;
+ }
- /* remove a block from a under replication queue */
- synchronized boolean remove(Block block) {
- int curReplicas = countContainingNodes( block );
- int expectedReplicas = getReplication(block);
- return remove(block, curReplicas, expectedReplicas);
- }
-
- /* update the priority level of a block */
- synchronized void update(Block block,
- int curReplicasDelta, int expectedReplicasDelta) {
- int curReplicas = countContainingNodes( block );
- int curExpectedReplicas = getReplication(block);
- int oldReplicas = curReplicas-curReplicasDelta;
- int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
- int curPri = getPriority(block, curReplicas, curExpectedReplicas);
- int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas);
- NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " +
- block +
- " curReplicas " + curReplicas +
- " curExpectedReplicas " + curExpectedReplicas +
- " oldReplicas " + oldReplicas +
- " oldExpectedReplicas " + oldExpectedReplicas +
- " curPri " + curPri +
- " oldPri " + oldPri);
- if( oldPri != LEVEL && oldPri != curPri ) {
- remove(block, oldPri);
- }
- if( curPri != LEVEL && oldPri != curPri
- && priorityQueues[curPri].add(block)) {
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.UnderReplicationBlock.update:"
- + block.getBlockName()
- + " has only "+curReplicas
- + " replicas and need " + curExpectedReplicas
- + " replicas so is added to neededReplications"
- + " at priority level " + curPri );
- }
- }
+ /* remove a block from a under replication queue */
+ synchronized boolean remove(Block block) {
+ int curReplicas = countContainingNodes( block );
+ int expectedReplicas = getReplication(block);
+ return remove(block, curReplicas, expectedReplicas);
+ }
+
+ /* update the priority level of a block */
+ synchronized void update(Block block,
+ int curReplicasDelta, int expectedReplicasDelta) {
+ int curReplicas = countContainingNodes( block );
+ int curExpectedReplicas = getReplication(block);
+ int oldReplicas = curReplicas-curReplicasDelta;
+ int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
+ int curPri = getPriority(block, curReplicas, curExpectedReplicas);
+ int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas);
+ NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " +
+ block +
+ " curReplicas " + curReplicas +
+ " curExpectedReplicas " + curExpectedReplicas +
+ " oldReplicas " + oldReplicas +
+ " oldExpectedReplicas " + oldExpectedReplicas +
+ " curPri " + curPri +
+ " oldPri " + oldPri);
+ if( oldPri != LEVEL && oldPri != curPri ) {
+ remove(block, oldPri);
+ }
+ if( curPri != LEVEL && oldPri != curPri
+ && priorityQueues[curPri].add(block)) {
+ NameNode.stateChangeLog.debug(
+ "BLOCK* NameSystem.UnderReplicationBlock.update:"
+ + block.getBlockName()
+ + " has only "+curReplicas
+ + " replicas and need " + curExpectedReplicas
+ + " replicas so is added to neededReplications"
+ + " at priority level " + curPri );
+ }
+ }
- /* return a iterator of all the under replication blocks */
- synchronized Iterator<Block> iterator() {
- return new Iterator<Block>() {
- int level;
- Iterator<Block>[] iterator = new Iterator[LEVEL];
+ /* return a iterator of all the under replication blocks */
+ synchronized Iterator<Block> iterator() {
+ return new Iterator<Block>() {
+ int level;
+ Iterator<Block>[] iterator = new Iterator[LEVEL];
- {
- level=0;
- for(int i=0; i<LEVEL; i++) {
- iterator[i] = priorityQueues[i].iterator();
- }
- }
+ {
+ level=0;
+ for(int i=0; i<LEVEL; i++) {
+ iterator[i] = priorityQueues[i].iterator();
+ }
+ }
- private void update() {
- while( level< LEVEL-1 && !iterator[level].hasNext() ) {
- level++;
- }
- }
+ private void update() {
+ while( level< LEVEL-1 && !iterator[level].hasNext() ) {
+ level++;
+ }
+ }
- public Block next() {
- update();
- return iterator[level].next();
- }
+ public Block next() {
+ update();
+ return iterator[level].next();
+ }
- public boolean hasNext() {
- update();
- return iterator[level].hasNext();
- }
+ public boolean hasNext() {
+ update();
+ return iterator[level].hasNext();
+ }
- public void remove() {
- iterator[level].remove();
- }
- };
+ public void remove() {
+ iterator[level].remove();
}
+ };
}
+ }
- /////////////////////////////////////////////////////////
- //
- // These methods are called by HadoopFS clients
- //
- /////////////////////////////////////////////////////////
- /**
- * The client wants to open the given filename. Return a
- * list of (block,machineArray) pairs. The sequence of unique blocks
- * in the list indicates all the blocks that make up the filename.
- *
- * The client should choose one of the machines from the machineArray
- * at random.
- */
- public Object[] open(String clientMachine, UTF8 src) {
- Object results[] = null;
- Block blocks[] = dir.getFile(src);
- if (blocks != null) {
- results = new Object[2];
- DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][];
- DatanodeDescriptor client =
- host2DataNodeMap.getDatanodeByHost(clientMachine);
-
- for (int i = 0; i < blocks.length; i++) {
- int numNodes = blocksMap.numNodes( blocks[i] );
- if ( numNodes <= 0 ) {
- machineSets[i] = new DatanodeDescriptor[0];
- } else {
- machineSets[i] = new DatanodeDescriptor[ numNodes ];
- numNodes = 0;
- for( Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator( blocks[i] ); it.hasNext(); ) {
- machineSets[i][ numNodes++ ] = it.next();
- }
- clusterMap.sortByDistance( client, machineSets[i] );
- }
- }
-
- results[0] = blocks;
- results[1] = machineSets;
+ /////////////////////////////////////////////////////////
+ //
+ // These methods are called by HadoopFS clients
+ //
+ /////////////////////////////////////////////////////////
+ /**
+ * The client wants to open the given filename. Return a
+ * list of (block,machineArray) pairs. The sequence of unique blocks
+ * in the list indicates all the blocks that make up the filename.
+ *
+ * The client should choose one of the machines from the machineArray
+ * at random.
+ */
+ public Object[] open(String clientMachine, UTF8 src) {
+ Object results[] = null;
+ Block blocks[] = dir.getFile(src);
+ if (blocks != null) {
+ results = new Object[2];
+ DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][];
+ DatanodeDescriptor client =
+ host2DataNodeMap.getDatanodeByHost(clientMachine);
+
+ for (int i = 0; i < blocks.length; i++) {
+ int numNodes = blocksMap.numNodes( blocks[i] );
+ if ( numNodes <= 0 ) {
+ machineSets[i] = new DatanodeDescriptor[0];
+ } else {
+ machineSets[i] = new DatanodeDescriptor[ numNodes ];
+ numNodes = 0;
+ for( Iterator<DatanodeDescriptor> it =
+ blocksMap.nodeIterator( blocks[i] ); it.hasNext(); ) {
+ machineSets[i][ numNodes++ ] = it.next();
+ }
+ clusterMap.sortByDistance( client, machineSets[i] );
}
- return results;
+ }
+
+ results[0] = blocks;
+ results[1] = machineSets;
}
+ return results;
+ }
- /**
- * Set replication for an existing file.
- *
- * The NameNode sets new replication and schedules either replication of
- * under-replicated data blocks or removal of the eccessive block copies
- * if the blocks are over-replicated.
- *
- * @see ClientProtocol#setReplication(String, short)
- * @param src file name
- * @param replication new replication
- * @return true if successful;
- * false if file does not exist or is a directory
- * @author shv
- */
- public synchronized boolean setReplication(String src,
- short replication
- ) throws IOException {
- if( isInSafeMode() )
- throw new SafeModeException( "Cannot set replication for " + src, safeMode );
- verifyReplication(src, replication, null );
+ /**
+ * Set replication for an existing file.
+ *
+ * The NameNode sets new replication and schedules either replication of
+ * under-replicated data blocks or removal of the eccessive block copies
+ * if the blocks are over-replicated.
+ *
+ * @see ClientProtocol#setReplication(String, short)
+ * @param src file name
+ * @param replication new replication
+ * @return true if successful;
+ * false if file does not exist or is a directory
+ * @author shv
+ */
+ public synchronized boolean setReplication(String src,
+ short replication
+ ) throws IOException {
+ if( isInSafeMode() )
+ throw new SafeModeException( "Cannot set replication for " + src, safeMode );
+ verifyReplication(src, replication, null );
- Vector<Integer> oldReplication = new Vector<Integer>();
- Block[] fileBlocks;
- fileBlocks = dir.setReplication( src, replication, oldReplication );
- if( fileBlocks == null ) // file not found or is a directory
- return false;
- int oldRepl = oldReplication.elementAt(0).intValue();
- if( oldRepl == replication ) // the same replication
- return true;
+ Vector<Integer> oldReplication = new Vector<Integer>();
+ Block[] fileBlocks;
+ fileBlocks = dir.setReplication( src, replication, oldReplication );
+ if( fileBlocks == null ) // file not found or is a directory
+ return false;
+ int oldRepl = oldReplication.elementAt(0).intValue();
+ if( oldRepl == replication ) // the same replication
+ return true;
- // update needReplication priority queues
- LOG.info("Increasing replication for file " + src
- + ". New replication is " + replication );
+ // update needReplication priority queues
+ LOG.info("Increasing replication for file " + src
+ + ". New replication is " + replication );
+ for( int idx = 0; idx < fileBlocks.length; idx++ )
+ neededReplications.update( fileBlocks[idx], 0, replication-oldRepl );
+
+ if( oldRepl > replication ) {
+ // old replication > the new one; need to remove copies
+ LOG.info("Reducing replication for file " + src
+ + ". New replication is " + replication );
for( int idx = 0; idx < fileBlocks.length; idx++ )
- neededReplications.update( fileBlocks[idx], 0, replication-oldRepl );
-
- if( oldRepl > replication ) {
- // old replication > the new one; need to remove copies
- LOG.info("Reducing replication for file " + src
- + ". New replication is " + replication );
- for( int idx = 0; idx < fileBlocks.length; idx++ )
- proccessOverReplicatedBlock( fileBlocks[idx], replication );
- }
- return true;
+ proccessOverReplicatedBlock( fileBlocks[idx], replication );
}
+ return true;
+ }
- public long getBlockSize(String filename) throws IOException {
- return dir.getBlockSize(filename);
- }
+ public long getBlockSize(String filename) throws IOException {
+ return dir.getBlockSize(filename);
+ }
- /**
- * Check whether the replication parameter is within the range
- * determined by system configuration.
- */
- private void verifyReplication( String src,
- short replication,
- UTF8 clientName
+ /**
+ * Check whether the replication parameter is within the range
+ * determined by system configuration.
+ */
+ private void verifyReplication( String src,
+ short replication,
+ UTF8 clientName
) throws IOException {
- String text = "file " + src
- + ((clientName != null) ? " on client " + clientName : "")
- + ".\n"
- + "Requested replication " + replication;
-
- if( replication > maxReplication )
- throw new IOException( text + " exceeds maximum " + maxReplication );
-
- if( replication < minReplication )
- throw new IOException(
- text + " is less than the required minimum " + minReplication );
- }
+ String text = "file " + src
+ + ((clientName != null) ? " on client " + clientName : "")
+ + ".\n"
+ + "Requested replication " + replication;
+
+ if( replication > maxReplication )
+ throw new IOException( text + " exceeds maximum " + maxReplication );
+
+ if( replication < minReplication )
+ throw new IOException(
+ text + " is less than the required minimum " + minReplication );
+ }
- /**
- * The client would like to create a new block for the indicated
- * filename. Return an array that consists of the block, plus a set
- * of machines. The first on this list should be where the client
- * writes data. Subsequent items in the list must be provided in
- * the connection to the first datanode.
- * @return Return an array that consists of the block, plus a set
- * of machines
- * @throws IOException if the filename is invalid
- * {@link FSDirectory#isValidToCreate(UTF8)}.
- */
- public synchronized Object[] startFile( UTF8 src,
- UTF8 holder,
- UTF8 clientMachine,
- boolean overwrite,
- short replication,
- long blockSize
+ /**
+ * The client would like to create a new block for the indicated
+ * filename. Return an array that consists of the block, plus a set
+ * of machines. The first on this list should be where the client
+ * writes data. Subsequent items in the list must be provided in
+ * the connection to the first datanode.
+ * @return Return an array that consists of the block, plus a set
+ * of machines
+ * @throws IOException if the filename is invalid
+ * {@link FSDirectory#isValidToCreate(UTF8)}.
+ */
+ public synchronized Object[] startFile( UTF8 src,
+ UTF8 holder,
+ UTF8 clientMachine,
+ boolean overwrite,
+ short replication,
+ long blockSize
) throws IOException {
- NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: file "
- +src+" for "+holder+" at "+clientMachine);
- if( isInSafeMode() )
- throw new SafeModeException( "Cannot create file" + src, safeMode );
- if (!isValidName(src.toString())) {
- throw new IOException("Invalid file name: " + src);
- }
- try {
- FileUnderConstruction pendingFile = pendingCreates.get(src);
- if (pendingFile != null) {
- //
- // If the file exists in pendingCreate, then it must be in our
- // leases. Find the appropriate lease record.
- //
- Lease lease = leases.get(holder);
- //
- // We found the lease for this file. And surprisingly the original
- // holder is trying to recreate this file. This should never occur.
- //
- if (lease != null) {
- throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because current leaseholder is trying to recreate file.");
- }
- //
- // Find the original holder.
- //
- UTF8 oldholder = pendingFile.getClientName();
- lease = leases.get(oldholder);
- if (lease == null) {
- throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because pendingCreates is non-null but no leases found.");
- }
- //
- // If the original holder has not renewed in the last SOFTLIMIT
- // period, then reclaim all resources and allow this request
- // to proceed. Otherwise, prevent this request from creating file.
- //
- if (lease.expiredSoftLimit()) {
- lease.releaseLocks();
- leases.remove(lease.holder);
- LOG.info("Removing lease " + lease + " ");
- if (!sortedLeases.remove(lease)) {
- LOG.error("Unknown failure trying to remove " + lease +
- " from lease set.");
- }
- } else {
- throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- ", because this file is already being created by " +
- pendingFile.getClientName() +
- " on " + pendingFile.getClientMachine());
- }
+ NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: file "
+ +src+" for "+holder+" at "+clientMachine);
+ if( isInSafeMode() )
+ throw new SafeModeException( "Cannot create file" + src, safeMode );
+ if (!isValidName(src.toString())) {
+ throw new IOException("Invalid file name: " + src);
+ }
+ try {
+ FileUnderConstruction pendingFile = pendingCreates.get(src);
+ if (pendingFile != null) {
+ //
+ // If the file exists in pendingCreate, then it must be in our
+ // leases. Find the appropriate lease record.
+ //
+ Lease lease = leases.get(holder);
+ //
+ // We found the lease for this file. And surprisingly the original
+ // holder is trying to recreate this file. This should never occur.
+ //
+ if (lease != null) {
+ throw new AlreadyBeingCreatedException(
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ " because current leaseholder is trying to recreate file.");
}
-
- try {
- verifyReplication(src.toString(), replication, clientMachine );
- } catch( IOException e) {
- throw new IOException( "failed to create "+e.getMessage());
- }
- if (!dir.isValidToCreate(src)) {
- if (overwrite) {
- delete(src);
- } else {
- throw new IOException("failed to create file " + src
- +" on client " + clientMachine
- +" either because the filename is invalid or the file exists");
- }
+ //
+ // Find the original holder.
+ //
+ UTF8 oldholder = pendingFile.getClientName();
+ lease = leases.get(oldholder);
+ if (lease == null) {
+ throw new AlreadyBeingCreatedException(
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ " because pendingCreates is non-null but no leases found.");
}
-
- // Get the array of replication targets
- DatanodeDescriptor clientNode =
- host2DataNodeMap.getDatanodeByHost(clientMachine.toString());
- DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
- clientNode, null, blockSize);
- if (targets.length < this.minReplication) {
- if (clusterMap.getNumOfLeaves() == 0) {
- throw new IOException("Failed to create file "+src
- + " on client " + clientMachine
- + " because this cluster has no datanodes.");
- }
- throw new IOException("Failed to create file "+src
- + " on client " + clientMachine
- + " because there were not enough datanodes available. "
- + "Found " + targets.length
- + " datanodes but MIN_REPLICATION for the cluster is "
- + "configured to be "
- + this.minReplication
- + ".");
- }
-
- // Reserve space for this pending file
- pendingCreates.put(src,
- new FileUnderConstruction(replication,
- blockSize,
- holder,
- clientMachine,
- clientNode));
- NameNode.stateChangeLog.debug( "DIR* NameSystem.startFile: "
- +"add "+src+" to pendingCreates for "+holder );
- synchronized (leases) {
- Lease lease = leases.get(holder);
- if (lease == null) {
- lease = new Lease(holder);
- leases.put(holder, lease);
- sortedLeases.add(lease);
- } else {
- sortedLeases.remove(lease);
- lease.renew();
- sortedLeases.add(lease);
- }
- lease.startedCreate(src);
+ //
+ // If the original holder has not renewed in the last SOFTLIMIT
+ // period, then reclaim all resources and allow this request
+ // to proceed. Otherwise, prevent this request from creating file.
+ //
+ if (lease.expiredSoftLimit()) {
+ lease.releaseLocks();
+ leases.remove(lease.holder);
+ LOG.info("Removing lease " + lease + " ");
+ if (!sortedLeases.remove(lease)) {
+ LOG.error("Unknown failure trying to remove " + lease +
+ " from lease set.");
+ }
+ } else {
+ throw new AlreadyBeingCreatedException(
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ ", because this file is already being created by " +
+ pendingFile.getClientName() +
+ " on " + pendingFile.getClientMachine());
}
-
- // Create next block
- Object results[] = new Object[2];
- results[0] = allocateBlock(src);
- results[1] = targets;
- return results;
- } catch (IOException ie) {
- NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
- +ie.getMessage());
- throw ie;
}
- }
- /**
- * The client would like to obtain an additional block for the indicated
- * filename (which is being written-to). Return an array that consists
- * of the block, plus a set of machines. The first on this list should
- * be where the client writes data. Subsequent items in the list must
- * be provided in the connection to the first datanode.
- *
- * Make sure the previous blocks have been reported by datanodes and
- * are replicated. Will return an empty 2-elt array if we want the
- * client to "try again later".
- */
- public synchronized Object[] getAdditionalBlock(UTF8 src,
- UTF8 clientName
- ) throws IOException {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file "
- +src+" for "+clientName);
- if( isInSafeMode() )
- throw new SafeModeException( "Cannot add block to " + src, safeMode );
- FileUnderConstruction pendingFile = pendingCreates.get(src);
- // make sure that we still have the lease on this file
- if (pendingFile == null) {
- throw new LeaseExpiredException("No lease on " + src);
- }
- if (!pendingFile.getClientName().equals(clientName)) {
- throw new LeaseExpiredException("Lease mismatch on " + src +
- " owned by " + pendingFile.getClientName() +
- " and appended by " + clientName);
+ try {
+ verifyReplication(src.toString(), replication, clientMachine );
+ } catch( IOException e) {
+ throw new IOException( "failed to create "+e.getMessage());
+ }
+ if (!dir.isValidToCreate(src)) {
+ if (overwrite) {
+ delete(src);
+ } else {
+ throw new IOException("failed to create file " + src
+ +" on client " + clientMachine
+ +" either because the filename is invalid or the file exists");
}
+ }
- //
- // If we fail this, bad things happen!
- //
- if (!checkFileProgress(pendingFile, false)) {
- throw new NotReplicatedYetException("Not replicated yet:" + src);
+ // Get the array of replication targets
+ DatanodeDescriptor clientNode =
+ host2DataNodeMap.getDatanodeByHost(clientMachine.toString());
+ DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
+ clientNode, null, blockSize);
+ if (targets.length < this.minReplication) {
+ if (clusterMap.getNumOfLeaves() == 0) {
+ throw new IOException("Failed to create file "+src
+ + " on client " + clientMachine
+ + " because this cluster has no datanodes.");
+ }
+ throw new IOException("Failed to create file "+src
+ + " on client " + clientMachine
+ + " because there were not enough datanodes available. "
+ + "Found " + targets.length
+ + " datanodes but MIN_REPLICATION for the cluster is "
+ + "configured to be "
+ + this.minReplication
+ + ".");
+ }
+
+ // Reserve space for this pending file
+ pendingCreates.put(src,
+ new FileUnderConstruction(replication,
+ blockSize,
+ holder,
+ clientMachine,
+ clientNode));
+ NameNode.stateChangeLog.debug( "DIR* NameSystem.startFile: "
+ +"add "+src+" to pendingCreates for "+holder );
+ synchronized (leases) {
+ Lease lease = leases.get(holder);
+ if (lease == null) {
+ lease = new Lease(holder);
+ leases.put(holder, lease);
+ sortedLeases.add(lease);
+ } else {
+ sortedLeases.remove(lease);
+ lease.renew();
+ sortedLeases.add(lease);
}
+ lease.startedCreate(src);
+ }
- // Get the array of replication targets
- DatanodeDescriptor clientNode = pendingFile.getClientNode();
- DatanodeDescriptor targets[] = replicator.chooseTarget(
- (int)(pendingFile.getReplication()),
- clientNode,
- null,
- pendingFile.getBlockSize());
- if (targets.length < this.minReplication) {
- throw new IOException("File " + src + " could only be replicated to " +
- targets.length + " nodes, instead of " +
- minReplication);
- }
-
- // Create next block
- return new Object[]{allocateBlock(src), targets};
+ // Create next block
+ Object results[] = new Object[2];
+ results[0] = allocateBlock(src);
+ results[1] = targets;
+ return results;
+ } catch (IOException ie) {
+ NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
+ +ie.getMessage());
+ throw ie;
}
+ }
- /**
- * The client would like to let go of the given block
- */
- public synchronized boolean abandonBlock(Block b, UTF8 src) {
- //
- // Remove the block from the pending creates list
- //
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
- +b.getBlockName()+"of file "+src );
- FileUnderConstruction pendingFile = pendingCreates.get(src);
- if (pendingFile != null) {
- Collection<Block> pendingVector = pendingFile.getBlocks();
- for (Iterator<Block> it = pendingVector.iterator(); it.hasNext(); ) {
- Block cur = it.next();
- if (cur.compareTo(b) == 0) {
- pendingCreateBlocks.remove(cur);
- it.remove();
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.abandonBlock: "
- +b.getBlockName()
- +" is removed from pendingCreateBlock and pendingCreates");
- return true;
- }
- }
+ /**
+ * The client would like to obtain an additional block for the indicated
+ * filename (which is being written-to). Return an array that consists
+ * of the block, plus a set of machines. The first on this list should
+ * be where the client writes data. Subsequent items in the list must
+ * be provided in the connection to the first datanode.
+ *
+ * Make sure the previous blocks have been reported by datanodes and
+ * are replicated. Will return an empty 2-elt array if we want the
+ * client to "try again later".
+ */
+ public synchronized Object[] getAdditionalBlock(UTF8 src,
+ UTF8 clientName
+ ) throws IOException {
+ NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file "
+ +src+" for "+clientName);
+ if( isInSafeMode() )
+ throw new SafeModeException( "Cannot add block to " + src, safeMode );
+ FileUnderConstruction pendingFile = pendingCreates.get(src);
+ // make sure that we still have the lease on this file
+ if (pendingFile == null) {
+ throw new LeaseExpiredException("No lease on " + src);
+ }
+ if (!pendingFile.getClientName().equals(clientName)) {
+ throw new LeaseExpiredException("Lease mismatch on " + src +
+ " owned by " + pendingFile.getClientName() +
+ " and appended by " + clientName);
+ }
+
+ //
+ // If we fail this, bad things happen!
+ //
+ if (!checkFileProgress(pendingFile, false)) {
+ throw new NotReplicatedYetException("Not replicated yet:" + src);
+ }
+
+ // Get the array of replication targets
+ DatanodeDescriptor clientNode = pendingFile.getClientNode();
+ DatanodeDescriptor targets[] = replicator.chooseTarget(
+ (int)(pendingFile.getReplication()),
+ clientNode,
+ null,
+ pendingFile.getBlockSize());
+ if (targets.length < this.minReplication) {
+ throw new IOException("File " + src + " could only be replicated to " +
+ targets.length + " nodes, instead of " +
+ minReplication);
+ }
+
+ // Create next block
+ return new Object[]{allocateBlock(src), targets};
+ }
+
+ /**
+ * The client would like to let go of the given block
+ */
+ public synchronized boolean abandonBlock(Block b, UTF8 src) {
+ //
+ // Remove the block from the pending creates list
+ //
+ NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+ +b.getBlockName()+"of file "+src );
+ FileUnderConstruction pendingFile = pendingCreates.get(src);
+ if (pendingFile != null) {
+ Collection<Block> pendingVector = pendingFile.getBlocks();
+ for (Iterator<Block> it = pendingVector.iterator(); it.hasNext(); ) {
+ Block cur = it.next();
+ if (cur.compareTo(b) == 0) {
+ pendingCreateBlocks.remove(cur);
+ it.remove();
+ NameNode.stateChangeLog.debug(
+ "BLOCK* NameSystem.abandonBlock: "
+ +b.getBlockName()
+ +" is removed from pendingCreateBlock and pendingCreates");
+ return true;
}
- return false;
+ }
}
+ return false;
+ }
- /**
- * Abandon the entire file in progress
- */
- public synchronized void abandonFileInProgress(UTF8 src,
- UTF8 holder
- ) throws IOException {
- NameNode.stateChangeLog.debug("DIR* NameSystem.abandonFileInProgress:" + src );
- synchronized (leases) {
- // find the lease
- Lease lease = leases.get(holder);
- if (lease != null) {
- // remove the file from the lease
- if (lease.completedCreate(src)) {
- // if we found the file in the lease, remove it from pendingCreates
- internalReleaseCreate(src, holder);
- } else {
- LOG.info("Attempt by " + holder.toString() +
- " to release someone else's create lock on " +
- src.toString());
- }
+ /**
+ * Abandon the entire file in progress
+ */
+ public synchronized void abandonFileInProgress(UTF8 src,
+ UTF8 holder
+ ) throws IOException {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.abandonFileInProgress:" + src );
+ synchronized (leases) {
+ // find the lease
+ Lease lease = leases.get(holder);
+ if (lease != null) {
+ // remove the file from the lease
+ if (lease.completedCreate(src)) {
+ // if we found the file in the lease, remove it from pendingCreates
+ internalReleaseCreate(src, holder);
} else {
- LOG.info("Attempt to release a lock from an unknown lease holder "
- + holder.toString() + " for " + src.toString());
+ LOG.info("Attempt by " + holder.toString() +
+ " to release someone else's create lock on " +
+ src.toString());
}
+ } else {
+ LOG.info("Attempt to release a lock from an unknown lease holder "
+ + holder.toString() + " for " + src.toString());
}
}
+ }
- /**
- * Finalize the created file and make it world-accessible. The
- * FSNamesystem will already know the blocks that make up the file.
- * Before we return, we make sure that all the file's blocks have
- * been reported by datanodes and are replicated correctly.
- */
- public synchronized int completeFile( UTF8 src,
- UTF8 holder) throws IOException {
- NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder );
- if( isInSafeMode() )
- throw new SafeModeException( "Cannot complete file " + src, safeMode );
- FileUnderConstruction pendingFile = pendingCreates.get(src);
-
- if (dir.getFile(src) != null || pendingFile == null) {
- NameNode.stateChangeLog.warn( "DIR* NameSystem.completeFile: "
- + "failed to complete " + src
- + " because dir.getFile()==" + dir.getFile(src)
- + " and " + pendingFile);
- return OPERATION_FAILED;
- } else if (! checkFileProgress(pendingFile, true)) {
- return STILL_WAITING;
- }
-
- Collection<Block> blocks = pendingFile.getBlocks();
- int nrBlocks = blocks.size();
- Block pendingBlocks[] = blocks.toArray(new Block[nrBlocks]);
+ /**
+ * Finalize the created file and make it world-accessible. The
+ * FSNamesystem will already know the blocks that make up the file.
+ * Before we return, we make sure that all the file's blocks have
+ * been reported by datanodes and are replicated correctly.
+ */
+ public synchronized int completeFile( UTF8 src,
+ UTF8 holder) throws IOException {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder );
+ if( isInSafeMode() )
+ throw new SafeModeException( "Cannot complete file " + src, safeMode );
+ FileUnderConstruction pendingFile = pendingCreates.get(src);
- //
- // We have the pending blocks, but they won't have
- // length info in them (as they were allocated before
- // data-write took place). Find the block stored in
- // node descriptor.
- //
- for (int i = 0; i < nrBlocks; i++) {
- Block b = pendingBlocks[i];
- Block storedBlock = blocksMap.getStoredBlock( b );
- if ( storedBlock != null ) {
- pendingBlocks[i] = storedBlock;
- }
- }
-
- //
- // Now we can add the (name,blocks) tuple to the filesystem
- //
- if ( ! dir.addFile(src, pendingBlocks, pendingFile.getReplication())) {
- return OPERATION_FAILED;
- }
+ if (dir.getFile(src) != null || pendingFile == null) {
+ NameNode.stateChangeLog.warn( "DIR* NameSystem.completeFile: "
+ + "failed to complete " + src
+ + " because dir.getFile()==" + dir.getFile(src)
+ + " and " + pendingFile);
+ return OPERATION_FAILED;
+ } else if (! checkFileProgress(pendingFile, true)) {
+ return STILL_WAITING;
+ }
+
+ Collection<Block> blocks = pendingFile.getBlocks();
+ int nrBlocks = blocks.size();
+ Block pendingBlocks[] = blocks.toArray(new Block[nrBlocks]);
- // The file is no longer pending
- pendingCreates.remove(src);
- NameNode.stateChangeLog.debug(
- "DIR* NameSystem.completeFile: " + src
- + " is removed from pendingCreates");
- for (int i = 0; i < nrBlocks; i++) {
- pendingCreateBlocks.remove(pendingBlocks[i]);
- }
+ //
+ // We have the pending blocks, but they won't have
+ // length info in them (as they were allocated before
+ // data-write took place). Find the block stored in
+ // node descriptor.
+ //
+ for (int i = 0; i < nrBlocks; i++) {
+ Block b = pendingBlocks[i];
+ Block storedBlock = blocksMap.getStoredBlock( b );
+ if ( storedBlock != null ) {
+ pendingBlocks[i] = storedBlock;
+ }
+ }
+
+ //
+ // Now we can add the (name,blocks) tuple to the filesystem
+ //
+ if ( ! dir.addFile(src, pendingBlocks, pendingFile.getReplication())) {
+ return OPERATION_FAILED;
+ }
- synchronized (leases) {
- Lease lease = leases.get(holder);
- if (lease != null) {
- lease.completedCreate(src);
- if (! lease.hasLocks()) {
- leases.remove(holder);
- sortedLeases.remove(lease);
- }
- }
+ // The file is no longer pending
+ pendingCreates.remove(src);
+ NameNode.stateChangeLog.debug(
+ "DIR* NameSystem.completeFile: " + src
+ + " is removed from pendingCreates");
+ for (int i = 0; i < nrBlocks; i++) {
+ pendingCreateBlocks.remove(pendingBlocks[i]);
+ }
+
+ synchronized (leases) {
+ Lease lease = leases.get(holder);
+ if (lease != null) {
+ lease.completedCreate(src);
+ if (! lease.hasLocks()) {
+ leases.remove(holder);
+ sortedLeases.remove(lease);
}
+ }
+ }
- //
- // REMIND - mjc - this should be done only after we wait a few secs.
- // The namenode isn't giving datanodes enough time to report the
- // replicated blocks that are automatically done as part of a client
- // write.
- //
+ //
+ // REMIND - mjc - this should be done only after we wait a few secs.
+ // The namenode isn't giving datanodes enough time to report the
+ // replicated blocks that are automatically done as part of a client
+ // write.
+ //
- // Now that the file is real, we need to be sure to replicate
- // the blocks.
- int numExpectedReplicas = pendingFile.getReplication();
- for (int i = 0; i < nrBlocks; i++) {
- // filter out containingNodes that are marked for decommission.
- int numCurrentReplica = countContainingNodes( pendingBlocks[i] );
- if (numCurrentReplica < numExpectedReplicas) {
- neededReplications.add(
- pendingBlocks[i], numCurrentReplica, numExpectedReplicas);
- }
- }
- return COMPLETE_SUCCESS;
+ // Now that the file is real, we need to be sure to replicate
+ // the blocks.
+ int numExpectedReplicas = pendingFile.getReplication();
+ for (int i = 0; i < nrBlocks; i++) {
+ // filter out containingNodes that are marked for decommission.
+ int numCurrentReplica = countContainingNodes( pendingBlocks[i] );
+ if (numCurrentReplica < numExpectedReplicas) {
+ neededReplications.add(
+ pendingBlocks[i], numCurrentReplica, numExpectedReplicas);
+ }
}
+ return COMPLETE_SUCCESS;
+ }
- static Random randBlockId = new Random();
+ static Random randBlockId = new Random();
- /**
- * Allocate a block at the given pending filename
- */
- synchronized Block allocateBlock(UTF8 src) {
- Block b = null;
- do {
- b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
- } while ( isValidBlock(b) );
- FileUnderConstruction v = pendingCreates.get(src);
- v.getBlocks().add(b);
- pendingCreateBlocks.add(b);
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.allocateBlock: "
- +src+ ". "+b.getBlockName()+
- " is created and added to pendingCreates and pendingCreateBlocks" );
- return b;
- }
+ /**
+ * Allocate a block at the given pending filename
+ */
+ synchronized Block allocateBlock(UTF8 src) {
+ Block b = null;
+ do {
+ b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
+ } while ( isValidBlock(b) );
+ FileUnderConstruction v = pendingCreates.get(src);
+ v.getBlocks().add(b);
+ pendingCreateBlocks.add(b);
+ NameNode.stateChangeLog.debug("BLOCK* NameSystem.allocateBlock: "
+ +src+ ". "+b.getBlockName()+
+ " is created and added to pendingCreates and pendingCreateBlocks" );
+ return b;
+ }
- /**
- * Check that the indicated file's blocks are present and
- * replicated. If not, return false. If checkall is true, then check
- * all blocks, otherwise check only penultimate block.
- */
- synchronized boolean checkFileProgress(FileUnderConstruction v, boolean checkall) {
- if (checkall) {
- //
- // check all blocks of the file.
- //
- for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
- if ( blocksMap.numNodes(it.next()) < this.minReplication ) {
- return false;
- }
- }
- } else {
- //
- // check the penultimate block of this file
- //
- Block b = v.getPenultimateBlock();
- if (b != null) {
- if (blocksMap.numNodes(b) < this.minReplication) {
- return false;
- }
- }
+ /**
+ * Check that the indicated file's blocks are present and
+ * replicated. If not, return false. If checkall is true, then check
+ * all blocks, otherwise check only penultimate block.
+ */
+ synchronized boolean checkFileProgress(FileUnderConstruction v, boolean checkall) {
+ if (checkall) {
+ //
+ // check all blocks of the file.
+ //
+ for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
+ if ( blocksMap.numNodes(it.next()) < this.minReplication ) {
+ return false;
}
- return true;
+ }
+ } else {
+ //
+ // check the penultimate block of this file
+ //
+ Block b = v.getPenultimateBlock();
+ if (b != null) {
+ if (blocksMap.numNodes(b) < this.minReplication) {
+ return false;
+ }
+ }
}
+ return true;
+ }
- /**
- * Adds block to list of blocks which will be invalidated on
- * specified datanode.
- */
- private void addToInvalidates(Block b, DatanodeInfo n) {
- Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
- if (invalidateSet == null) {
- invalidateSet = new ArrayList<Block>();
- recentInvalidateSets.put(n.getStorageID(), invalidateSet);
- }
- invalidateSet.add(b);
+ /**
+ * Adds block to list of blocks which will be invalidated on
+ * specified datanode.
+ */
+ private void addToInvalidates(Block b, DatanodeInfo n) {
+ Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
+ if (invalidateSet == null) {
+ invalidateSet = new ArrayList<Block>();
+ recentInvalidateSets.put(n.getStorageID(), invalidateSet);
}
+ invalidateSet.add(b);
+ }
- /**
- * dumps the contents of recentInvalidateSets
- */
- private synchronized void dumpRecentInvalidateSets(PrintWriter out) {
- Collection<Collection<Block>> values = recentInvalidateSets.values();
- Iterator<Map.Entry<String,Collection<Block>>> it =
- recentInvalidateSets.entrySet().iterator();
- if (values.size() == 0) {
- out.println("Metasave: Blocks waiting deletion: 0");
- return;
- }
- out.println("Metasave: Blocks waiting deletion from " +
- values.size() + " datanodes.");
- while (it.hasNext()) {
- Map.Entry<String,Collection<Block>> entry = it.next();
- String storageId = (String) entry.getKey();
- DatanodeDescriptor node = datanodeMap.get(storageId);
- Collection<Block> blklist = entry.getValue();
- if (blklist.size() > 0) {
- out.print(node.getName());
- for (Iterator jt = blklist.iterator(); jt.hasNext();) {
- Block block = (Block) jt.next();
- out.print(" " + block);
- }
- out.println("");
+ /**
+ * dumps the contents of recentInvalidateSets
+ */
+ private synchronized void dumpRecentInvalidateSets(PrintWriter out) {
+ Collection<Collection<Block>> values = recentInvalidateSets.values();
+ Iterator<Map.Entry<String,Collection<Block>>> it =
+ recentInvalidateSets.entrySet().iterator();
+ if (values.size() == 0) {
+ out.println("Metasave: Blocks waiting deletion: 0");
+ return;
+ }
+ out.println("Metasave: Blocks waiting deletion from " +
+ values.size() + " datanodes.");
+ while (it.hasNext()) {
+ Map.Entry<String,Collection<Block>> entry = it.next();
+ String storageId = (String) entry.getKey();
+ DatanodeDescriptor node = datanodeMap.get(storageId);
+ Collection<Block> blklist = entry.getValue();
+ if (blklist.size() > 0) {
+ out.print(node.getName());
+ for (Iterator jt = blklist.iterator(); jt.hasNext();) {
+ Block block = (Block) jt.next();
+ out.print(" " + block);
}
+ out.println("");
}
}
+ }
- /**
- * Invalidates the given block on the given datanode.
- */
- public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
- throws IOException {
- NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: "
- + blk.getBlockName() + " on "
- + dn.getName());
- if (isInSafeMode()) {
- throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode);
- }
-
- // Check how many copies we have of the block. If we have at least one
- // copy on a live node, then we can delete it.
- int count = countContainingNodes( blk );
- if ( (count > 1) || ( (count == 1) && ( dn.isDecommissionInProgress() ||
- dn.isDecommissioned() ))) {
- addToInvalidates(blk, dn);
- removeStoredBlock(blk, getDatanode(dn));
- NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
- + blk.getBlockName() + " on "
- + dn.getName() + " listed for deletion.");
- } else {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
- + blk.getBlockName() + " on "
- + dn.getName() + " is the only copy and was not deleted.");
- }
+ /**
+ * Invalidates the given block on the given datanode.
+ */
+ public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
+ throws IOException {
+ NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: "
+ + blk.getBlockName() + " on "
+ + dn.getName());
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode);
+ }
+
+ // Check how many copies we have of the block. If we have at least one
+ // copy on a live node, then we can delete it.
+ int count = countContainingNodes( blk );
+ if ( (count > 1) || ( (count == 1) && ( dn.isDecommissionInProgress() ||
+ dn.isDecommissioned() ))) {
+ addToInvalidates(blk, dn);
+ removeStoredBlock(blk, getDatanode(dn));
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
+ + blk.getBlockName() + " on "
[... 5658 lines stripped ...]