You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2008/09/17 07:07:01 UTC
svn commit: r696149 - in /hadoop/core/trunk: CHANGES.txt
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
Author: szetszwo
Date: Tue Sep 16 22:07:00 2008
New Revision: 696149
URL: http://svn.apache.org/viewvc?rev=696149&view=rev
Log:
HADOOP-3623. Refactor LeaseManager. (szetszwo)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=696149&r1=696148&r2=696149&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Sep 16 22:07:00 2008
@@ -577,9 +577,11 @@
HADOOP-4139. Optimize Hive multi group-by.
(Namin Jain via dhruba)
- HADOOP-3911. Add a check to fsck options to make sure -files is not
- the first option to resolve conflicts with GenericOptionsParser
- (lohit)
+ HADOOP-3911. Add a check to fsck options to make sure -files is not
+ the first option to resolve conflicts with GenericOptionsParser
+ (lohit)
+
+ HADOOP-3623. Refactor LeaseManager. (szetszwo)
Release 0.18.1 - 2008-09-17
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=696149&r1=696148&r2=696149&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Sep 16 22:07:00 2008
@@ -33,8 +33,11 @@
import java.security.SecureRandom;
import java.util.AbstractList;
import java.util.ArrayList;
-import java.util.Formatter;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.Semaphore;
@@ -50,11 +53,11 @@
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
@@ -142,6 +145,8 @@
volatile boolean shouldRun = true;
private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
+ /** list of blocks being recovered */
+ private final Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();
private LinkedList<String> delHints = new LinkedList<String>();
public final static String EMPTY_DEL_HINT = "";
int xmitsInProgress = 0;
@@ -1319,8 +1324,15 @@
public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
Daemon d = new Daemon(threadGroup, new Runnable() {
+ /** Recover a list of blocks. It is run by the primary datanode. */
public void run() {
- LeaseManager.recoverBlocks(blocks, targets, DataNode.this, namenode, getConf());
+ for(int i = 0; i < blocks.length; i++) {
+ try {
+ recoverBlock(blocks[i], targets[i], true);
+ } catch (IOException e) {
+ LOG.warn("recoverBlocks, i=" + i, e);
+ }
+ }
}
});
d.start();
@@ -1353,6 +1365,129 @@
+ ": " + protocol);
}
+ /** A convenient class used in lease recovery */
+ private static class BlockRecord {
+ final DatanodeID id;
+ final InterDatanodeProtocol datanode;
+ final Block block;
+
+ BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
+ this.id = id;
+ this.datanode = datanode;
+ this.block = block;
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return "block:" + block + " node:" + id;
+ }
+ }
+
+ /** Recover a block */
+ private LocatedBlock recoverBlock(Block block, DatanodeID[] datanodeids,
+ boolean closeFile) throws IOException {
+
+ // If the block is already being recovered, then skip recovering it.
+ // This can happen if the namenode and client start recovering the same
+ // file at the same time.
+ synchronized (ongoingRecovery) {
+ Block tmp = new Block();
+ tmp.set(block.getBlockId(), block.getNumBytes(), GenerationStamp.WILDCARD_STAMP);
+ if (ongoingRecovery.get(tmp) != null) {
+ String msg = "Block " + block + " is already being recovered, " +
+ " ignoring this request to recover it.";
+ LOG.info(msg);
+ throw new IOException(msg);
+ }
+ ongoingRecovery.put(block, block);
+ }
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("block=" + block
+ + ", datanodeids=" + Arrays.asList(datanodeids));
+ }
+ List<BlockRecord> syncList = new ArrayList<BlockRecord>();
+ long minlength = Long.MAX_VALUE;
+ int errorCount = 0;
+
+ //check generation stamps
+ for(DatanodeID id : datanodeids) {
+ try {
+ InterDatanodeProtocol datanode = dnRegistration.equals(id)?
+ this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
+ BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
+ if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
+ syncList.add(new BlockRecord(id, datanode, new Block(info)));
+ if (info.getNumBytes() < minlength) {
+ minlength = info.getNumBytes();
+ }
+ }
+ } catch (IOException e) {
+ ++errorCount;
+ InterDatanodeProtocol.LOG.warn(
+ "Failed to getBlockMetaDataInfo for block (=" + block
+ + ") from datanode (=" + id + ")", e);
+ }
+ }
+
+ if (syncList.isEmpty() && errorCount > 0) {
+ throw new IOException("All datanodes failed: block=" + block
+ + ", datanodeids=" + Arrays.asList(datanodeids));
+ }
+ return syncBlock(block, minlength, syncList, closeFile);
+ } finally {
+ synchronized (ongoingRecovery) {
+ ongoingRecovery.remove(block);
+ }
+ }
+ }
+
+ /** Block synchronization */
+ private LocatedBlock syncBlock(Block block, long minlength,
+ List<BlockRecord> syncList, boolean closeFile) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("block=" + block + ", minlength=" + minlength
+ + ", syncList=" + syncList + ", closeFile=" + closeFile);
+ }
+
+ //syncList.isEmpty() that all datanodes do not have the block
+ //so the block can be deleted.
+ if (syncList.isEmpty()) {
+ namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
+ DatanodeID.EMPTY_ARRAY);
+ return null;
+ }
+
+ List<DatanodeID> successList = new ArrayList<DatanodeID>();
+
+ long generationstamp = namenode.nextGenerationStamp(block);
+ Block newblock = new Block(block.getBlockId(), minlength, generationstamp);
+
+ for(BlockRecord r : syncList) {
+ try {
+ r.datanode.updateBlock(r.block, newblock, closeFile);
+ successList.add(r.id);
+ } catch (IOException e) {
+ InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+ + newblock + ", datanode=" + r.id + ")", e);
+ }
+ }
+
+ if (!successList.isEmpty()) {
+ DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
+
+ namenode.commitBlockSynchronization(block,
+ newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,
+ nlist);
+ DatanodeInfo[] info = new DatanodeInfo[nlist.length];
+ for (int i = 0; i < nlist.length; i++) {
+ info[i] = new DatanodeInfo(nlist[i]);
+ }
+ return new LocatedBlock(newblock, info); // success
+ }
+ return null; // failed
+ }
+
// ClientDataNodeProtocol implementation
/** {@inheritDoc} */
public LocatedBlock recoverBlock(Block block, DatanodeInfo[] targets
@@ -1366,7 +1501,6 @@
}
LOG.info("Client invoking recoverBlock for block " + block +
" on datanodes " + msg.toString());
- return LeaseManager.recoverBlock(block, targets, this, namenode,
- getConf(), false);
+ return recoverBlock(block, targets, false);
}
}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=696149&r1=696148&r2=696149&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Tue Sep 16 22:07:00 2008
@@ -18,23 +18,20 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
/**
* LeaseManager does the lease housekeeping for writing on files.
@@ -91,10 +88,6 @@
/** @return the lease containing src */
public Lease getLeaseByPath(String src) {return sortedLeasesByPath.get(src);}
- /** list of blocks being recovered */
- private static Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();
-
-
/** @return the number of leases currently in the system */
public synchronized int countLease() {return sortedLeases.size();}
@@ -390,146 +383,4 @@
+ "\n sortedLeasesByPath=" + sortedLeasesByPath
+ "\n}";
}
-
- /*
- * The following codes provides useful static methods for lease recovery.
- */
- /** A convenient class used in lease recovery */
- private static class BlockRecord {
- final DatanodeID id;
- final InterDatanodeProtocol datanode;
- final Block block;
-
- BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
- this.id = id;
- this.datanode = datanode;
- this.block = block;
- }
-
- public String toString() {
- return "block:" + block + " node:" + id;
- }
- }
-
- /**
- * Recover a list of blocks.
- * This method is invoked by the primary datanode.
- */
- public static void recoverBlocks(Block[] blocks, DatanodeID[][] targets,
- DataNode primary, DatanodeProtocol namenode, Configuration conf) {
- for(int i = 0; i < blocks.length; i++) {
- try {
- recoverBlock(blocks[i], targets[i], primary, namenode, conf, true);
- } catch (IOException e) {
- LOG.warn("recoverBlocks, i=" + i, e);
- }
- }
- }
-
- /** Recover a block */
- public static LocatedBlock recoverBlock(Block block, DatanodeID[] datanodeids,
- DataNode primary, DatanodeProtocol namenode, Configuration conf,
- boolean closeFile) throws IOException {
-
- // If the block is already being recovered, then skip recovering it.
- // This can happen if the namenode and client start recovering the same
- // file at the same time.
- synchronized (ongoingRecovery) {
- Block tmp = new Block();
- tmp.set(block.getBlockId(), block.getNumBytes(), GenerationStamp.WILDCARD_STAMP);
- if (ongoingRecovery.get(tmp) != null) {
- String msg = "Block " + block + " is already being recovered, " +
- " ignoring this request to recover it.";
- LOG.info(msg);
- throw new IOException(msg);
- }
- ongoingRecovery.put(block, block);
- }
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("block=" + block
- + ", datanodeids=" + Arrays.asList(datanodeids));
- }
- List<BlockRecord> syncList = new ArrayList<BlockRecord>();
- long minlength = Long.MAX_VALUE;
- int errorCount = 0;
-
- //check generation stamps
- for(DatanodeID id : datanodeids) {
- try {
- InterDatanodeProtocol datanode = primary.dnRegistration.equals(id)?
- primary: DataNode.createInterDataNodeProtocolProxy(id, conf);
- BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
- if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
- syncList.add(new BlockRecord(id, datanode, new Block(info)));
- if (info.getNumBytes() < minlength) {
- minlength = info.getNumBytes();
- }
- }
- } catch (IOException e) {
- ++errorCount;
- InterDatanodeProtocol.LOG.warn(
- "Failed to getBlockMetaDataInfo for block (=" + block
- + ") from datanode (=" + id + ")", e);
- }
- }
-
- if (syncList.isEmpty() && errorCount > 0) {
- throw new IOException("All datanodes failed: block=" + block
- + ", datanodeids=" + Arrays.asList(datanodeids));
- }
- return syncBlock(block, minlength, syncList, namenode, closeFile);
- } finally {
- synchronized (ongoingRecovery) {
- ongoingRecovery.remove(block);
- }
- }
- }
-
- /** Block synchronization */
- private static LocatedBlock syncBlock(Block block, long minlength,
- List<BlockRecord> syncList, DatanodeProtocol namenode,
- boolean closeFile) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("block=" + block + ", minlength=" + minlength
- + ", syncList=" + syncList + ", closeFile=" + closeFile);
- }
-
- //syncList.isEmpty() that all datanodes do not have the block
- //so the block can be deleted.
- if (syncList.isEmpty()) {
- namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
- DatanodeID.EMPTY_ARRAY);
- return null;
- }
-
- List<DatanodeID> successList = new ArrayList<DatanodeID>();
-
- long generationstamp = namenode.nextGenerationStamp(block);
- Block newblock = new Block(block.getBlockId(), minlength, generationstamp);
-
- for(BlockRecord r : syncList) {
- try {
- r.datanode.updateBlock(r.block, newblock, closeFile);
- successList.add(r.id);
- } catch (IOException e) {
- InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
- + newblock + ", datanode=" + r.id + ")", e);
- }
- }
-
- if (!successList.isEmpty()) {
- DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
-
- namenode.commitBlockSynchronization(block,
- newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,
- nlist);
- DatanodeInfo[] info = new DatanodeInfo[nlist.length];
- for (int i = 0; i < nlist.length; i++) {
- info[i] = new DatanodeInfo(nlist[i]);
- }
- return new LocatedBlock(newblock, info); // success
- }
- return null; // failed
- }
}