You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2012/10/12 02:15:37 UTC
svn commit: r1397387 [3/5] - in
/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project:
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/
hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/
hadoop-hdfs/ hadoop-hdfs...
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Fri Oct 12 00:15:22 2012
@@ -64,23 +64,20 @@ public class BlockPlacementPolicyWithNod
* @return the chosen node
*/
@Override
- protected DatanodeDescriptor chooseLocalNode(
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results)
+ protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
// otherwise try local machine first
Node oldNode = excludedNodes.put(localMachine, localMachine);
if (oldNode == null) { // was not in the excluded list
if (isGoodTarget(localMachine, blocksize,
- maxNodesPerRack, false, results)) {
+ maxNodesPerRack, false, results, avoidStaleNodes)) {
results.add(localMachine);
// Nodes under same nodegroup should be excluded.
addNodeGroupToExcludedNodes(excludedNodes,
@@ -92,13 +89,13 @@ public class BlockPlacementPolicyWithNod
// try a node on local node group
DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
(NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
if (chosenNode != null) {
return chosenNode;
}
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
@Override
@@ -119,17 +116,15 @@ public class BlockPlacementPolicyWithNod
}
@Override
- protected DatanodeDescriptor chooseLocalRack(
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results)
- throws NotEnoughReplicasException {
+ protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+ throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results,
+ avoidStaleNodes);
}
// choose one from the local rack, but off-nodegroup
@@ -137,7 +132,8 @@ public class BlockPlacementPolicyWithNod
return chooseRandom(NetworkTopology.getFirstHalf(
localMachine.getNetworkLocation()),
excludedNodes, blocksize,
- maxNodesPerRack, results);
+ maxNodesPerRack, results,
+ avoidStaleNodes);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
@@ -151,39 +147,39 @@ public class BlockPlacementPolicyWithNod
}
if (newLocal != null) {
try {
- return chooseRandom(clusterMap.getRack(newLocal.getNetworkLocation()),
- excludedNodes, blocksize, maxNodesPerRack, results);
+ return chooseRandom(
+ clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
} else {
//otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
}
}
@Override
protected void chooseRemoteRack(int numOfReplicas,
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxReplicasPerRack,
- List<DatanodeDescriptor> results)
- throws NotEnoughReplicasException {
+ DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes,
+ long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes) throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks
try {
- chooseRandom(numOfReplicas, "~"+NetworkTopology.getFirstHalf(
- localMachine.getNetworkLocation()),
- excludedNodes, blocksize, maxReplicasPerRack, results);
+ chooseRandom(
+ numOfReplicas,
+ "~" + NetworkTopology.getFirstHalf(localMachine.getNetworkLocation()),
+ excludedNodes, blocksize, maxReplicasPerRack, results,
+ avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
- chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
- localMachine.getNetworkLocation(), excludedNodes, blocksize,
- maxReplicasPerRack, results);
+ chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
+ localMachine.getNetworkLocation(), excludedNodes, blocksize,
+ maxReplicasPerRack, results, avoidStaleNodes);
}
}
@@ -193,19 +189,22 @@ public class BlockPlacementPolicyWithNod
* if still no such node is available, choose a random node in the cluster.
* @return the chosen node
*/
- private DatanodeDescriptor chooseLocalNodeGroup(NetworkTopologyWithNodeGroup clusterMap,
- DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes, long blocksize,
- int maxNodesPerRack, List<DatanodeDescriptor> results) throws NotEnoughReplicasException {
+ private DatanodeDescriptor chooseLocalNodeGroup(
+ NetworkTopologyWithNodeGroup clusterMap, DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+ throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
// choose one from the local node group
try {
- return chooseRandom(clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
- excludedNodes, blocksize, maxNodesPerRack, results);
+ return chooseRandom(
+ clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
+ excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
@@ -219,17 +218,19 @@ public class BlockPlacementPolicyWithNod
}
if (newLocal != null) {
try {
- return chooseRandom(clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
- excludedNodes, blocksize, maxNodesPerRack, results);
+ return chooseRandom(
+ clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
+ excludedNodes, blocksize, maxNodesPerRack, results,
+ avoidStaleNodes);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
} else {
//otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes);
}
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Oct 12 00:15:22 2012
@@ -76,6 +76,7 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses;
/**
@@ -88,8 +89,8 @@ public class DatanodeManager {
private final Namesystem namesystem;
private final BlockManager blockManager;
-
private final HeartbeatManager heartbeatManager;
+ private Daemon decommissionthread = null;
/**
* Stores the datanode -> block map.
@@ -127,28 +128,33 @@ public class DatanodeManager {
/** Ask Datanode only up to this many blocks to delete. */
final int blockInvalidateLimit;
+ /** Whether or not to check stale DataNodes for read/write */
+ private final boolean checkForStaleDataNodes;
+
+ /** The interval for judging stale DataNodes for read/write */
+ private final long staleInterval;
+
+ /** Whether or not to avoid using stale DataNodes for writing */
+ private volatile boolean avoidStaleDataNodesForWrite;
+
+ /** The number of stale DataNodes */
+ private volatile int numStaleNodes;
+
/**
* Whether or not this cluster has ever consisted of more than 1 rack,
* according to the NetworkTopology.
*/
private boolean hasClusterEverBeenMultiRack = false;
- /** Whether or not to check the stale datanodes */
- private volatile boolean checkForStaleNodes;
- /** The time interval for detecting stale datanodes */
- private volatile long staleInterval;
-
- DatanodeManager(final BlockManager blockManager,
- final Namesystem namesystem, final Configuration conf
- ) throws IOException {
+ DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
+ final Configuration conf) throws IOException {
this.namesystem = namesystem;
this.blockManager = blockManager;
Class<? extends NetworkTopology> networkTopologyClass =
conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
NetworkTopology.class, NetworkTopology.class);
- networktopology = (NetworkTopology) ReflectionUtils.newInstance(
- networkTopologyClass, conf);
+ networktopology = ReflectionUtils.newInstance(networkTopologyClass, conf);
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
@@ -181,25 +187,69 @@ public class DatanodeManager {
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
+ "=" + this.blockInvalidateLimit);
- // set the value of stale interval based on configuration
- this.checkForStaleNodes = conf.getBoolean(
+
+ checkForStaleDataNodes = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
- if (this.checkForStaleNodes) {
- this.staleInterval = conf.getLong(
- DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
- DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT);
- if (this.staleInterval < DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT) {
- LOG.warn("The given interval for marking stale datanode = "
- + this.staleInterval + ", which is smaller than the default value "
- + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT
- + ".");
- }
+
+ staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
+ avoidStaleDataNodesForWrite = getAvoidStaleForWriteFromConf(conf,
+ checkForStaleDataNodes);
+ }
+
+ private static long getStaleIntervalFromConf(Configuration conf,
+ long heartbeatExpireInterval) {
+ long staleInterval = conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
+ Preconditions.checkArgument(staleInterval > 0,
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY +
+ " = '" + staleInterval + "' is invalid. " +
+ "It should be a positive non-zero value.");
+
+ final long heartbeatIntervalSeconds = conf.getLong(
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+ // The stale interval value cannot be smaller than
+ // 3 times of heartbeat interval
+ final long minStaleInterval = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT)
+ * heartbeatIntervalSeconds * 1000;
+ if (staleInterval < minStaleInterval) {
+ LOG.warn("The given interval for marking stale datanode = "
+ + staleInterval + ", which is less than "
+ + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT
+ + " heartbeat intervals. This may cause too frequent changes of "
+ + "stale states of DataNodes since a heartbeat msg may be missing "
+ + "due to temporary short-term failures. Reset stale interval to "
+ + minStaleInterval + ".");
+ staleInterval = minStaleInterval;
+ }
+ if (staleInterval > heartbeatExpireInterval) {
+ LOG.warn("The given interval for marking stale datanode = "
+ + staleInterval + ", which is larger than heartbeat expire interval "
+ + heartbeatExpireInterval + ".");
}
+ return staleInterval;
}
-
- private Daemon decommissionthread = null;
-
+
+ static boolean getAvoidStaleForWriteFromConf(Configuration conf,
+ boolean checkForStale) {
+ boolean avoid = conf.getBoolean(
+ DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
+ DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
+ boolean avoidStaleDataNodesForWrite = checkForStale && avoid;
+ if (!checkForStale && avoid) {
+ LOG.warn("Cannot set "
+ + DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY
+ + " as false while setting "
+ + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY
+ + " as true.");
+ }
+ return avoidStaleDataNodesForWrite;
+ }
+
void activate(final Configuration conf) {
final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
this.decommissionthread = new Daemon(dm.new Monitor(
@@ -253,9 +303,10 @@ public class DatanodeManager {
client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost);
}
- Comparator<DatanodeInfo> comparator = checkForStaleNodes ?
- new DFSUtil.DecomStaleComparator(staleInterval) :
- DFSUtil.DECOM_COMPARATOR;
+ Comparator<DatanodeInfo> comparator = checkForStaleDataNodes ?
+ new DFSUtil.DecomStaleComparator(staleInterval) :
+ DFSUtil.DECOM_COMPARATOR;
+
for (LocatedBlock b : locatedblocks) {
networktopology.pseudoSortByDistance(client, b.getLocations());
// Move decommissioned/stale datanodes to the bottom
@@ -612,7 +663,8 @@ public class DatanodeManager {
+ " storage " + nodeReg.getStorageID());
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
- DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getXferAddr());
+ DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
+ nodeReg.getIpAddr(), nodeReg.getXferPort());
if (nodeN != null && nodeN != nodeS) {
NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
@@ -722,7 +774,7 @@ public class DatanodeManager {
* 3. Added to exclude --> start decommission.
* 4. Removed from exclude --> stop decommission.
*/
- private void refreshDatanodes() throws IOException {
+ private void refreshDatanodes() {
for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include.
if (!inHostsList(node)) {
@@ -781,7 +833,61 @@ public class DatanodeManager {
namesystem.readUnlock();
}
}
+
+ /* Getter and Setter for stale DataNodes related attributes */
+
+ /**
+ * @return whether or not to avoid writing to stale datanodes
+ */
+ public boolean isAvoidingStaleDataNodesForWrite() {
+ return avoidStaleDataNodesForWrite;
+ }
+ /**
+ * Set the value of {@link DatanodeManager#avoidStaleDataNodesForWrite}.
+ * The HeartbeatManager disable avoidStaleDataNodesForWrite when more than
+ * half of the DataNodes are marked as stale.
+ *
+ * @param avoidStaleDataNodesForWrite
+ * The value to set to
+ * {@link DatanodeManager#avoidStaleDataNodesForWrite}
+ */
+ void setAvoidStaleDataNodesForWrite(boolean avoidStaleDataNodesForWrite) {
+ this.avoidStaleDataNodesForWrite = avoidStaleDataNodesForWrite;
+ }
+
+ /**
+ * @return Whether or not to check stale DataNodes for R/W
+ */
+ boolean isCheckingForStaleDataNodes() {
+ return checkForStaleDataNodes;
+ }
+
+ /**
+ * @return The time interval used to mark DataNodes as stale.
+ */
+ long getStaleInterval() {
+ return staleInterval;
+ }
+
+ /**
+ * Set the number of current stale DataNodes. The HeartbeatManager got this
+ * number based on DataNodes' heartbeats.
+ *
+ * @param numStaleNodes
+ * The number of stale DataNodes to be set.
+ */
+ void setNumStaleNodes(int numStaleNodes) {
+ this.numStaleNodes = numStaleNodes;
+ }
+
+ /**
+ * @return Return the current number of stale DataNodes (detected by
+ * HeartbeatManager).
+ */
+ int getNumStaleNodes() {
+ return this.numStaleNodes;
+ }
/** Fetch live and dead datanodes. */
public void fetchDatanodes(final List<DatanodeDescriptor> live,
@@ -960,7 +1066,7 @@ public class DatanodeManager {
return nodes;
}
- private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
+ private void setDatanodeDead(DatanodeDescriptor node) {
node.setLastUpdate(0);
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Fri Oct 12 00:15:22 2012
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
+import com.google.common.base.Preconditions;
+
/**
* Manage the heartbeats received from datanodes.
* The datanode list and statistics are synchronized
@@ -54,18 +56,48 @@ class HeartbeatManager implements Datano
private final long heartbeatRecheckInterval;
/** Heartbeat monitor thread */
private final Daemon heartbeatThread = new Daemon(new Monitor());
-
+ /**
+ * The initial setting of end user which indicates whether or not to avoid
+ * writing to stale datanodes.
+ */
+ private final boolean initialAvoidWriteStaleNodes;
+ /**
+ * When the ratio of stale datanodes reaches this number, stop avoiding
+ * writing to stale datanodes, i.e., continue using stale nodes for writing.
+ */
+ private final float ratioUseStaleDataNodesForWrite;
+
final Namesystem namesystem;
final BlockManager blockManager;
- HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager,
- final Configuration conf) {
- this.heartbeatRecheckInterval = conf.getInt(
- DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
- DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
-
+ HeartbeatManager(final Namesystem namesystem,
+ final BlockManager blockManager, final Configuration conf) {
this.namesystem = namesystem;
this.blockManager = blockManager;
+ boolean checkStaleNodes = conf.getBoolean(
+ DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
+ DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
+ long recheckInterval = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
+ long staleInterval = conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
+ this.initialAvoidWriteStaleNodes = DatanodeManager
+ .getAvoidStaleForWriteFromConf(conf, checkStaleNodes);
+ this.ratioUseStaleDataNodesForWrite = conf.getFloat(
+ DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
+ DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
+ Preconditions.checkArgument(
+ (ratioUseStaleDataNodesForWrite > 0 &&
+ ratioUseStaleDataNodesForWrite <= 1.0f),
+ DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
+ " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
+ "It should be a positive non-zero float value, not greater than 1.0f.");
+
+ this.heartbeatRecheckInterval = (checkStaleNodes
+ && initialAvoidWriteStaleNodes
+ && staleInterval < recheckInterval) ? staleInterval : recheckInterval;
}
void activate(Configuration conf) {
@@ -210,16 +242,39 @@ class HeartbeatManager implements Datano
if (namesystem.isInSafeMode()) {
return;
}
+ boolean checkStaleNodes = dm.isCheckingForStaleDataNodes();
boolean allAlive = false;
while (!allAlive) {
// locate the first dead node.
DatanodeID dead = null;
+ // check the number of stale nodes
+ int numOfStaleNodes = 0;
synchronized(this) {
for (DatanodeDescriptor d : datanodes) {
- if (dm.isDatanodeDead(d)) {
+ if (dead == null && dm.isDatanodeDead(d)) {
stats.incrExpiredHeartbeats();
dead = d;
- break;
+ if (!checkStaleNodes) {
+ break;
+ }
+ }
+ if (checkStaleNodes &&
+ d.isStale(dm.getStaleInterval())) {
+ numOfStaleNodes++;
+ }
+ }
+
+ // Change whether to avoid using stale datanodes for writing
+ // based on proportion of stale datanodes
+ if (checkStaleNodes) {
+ dm.setNumStaleNodes(numOfStaleNodes);
+ if (numOfStaleNodes >
+ datanodes.size() * ratioUseStaleDataNodesForWrite) {
+ dm.setAvoidStaleDataNodesForWrite(false);
+ } else {
+ if (this.initialAvoidWriteStaleNodes) {
+ dm.setAvoidStaleDataNodesForWrite(true);
+ }
}
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java Fri Oct 12 00:15:22 2012
@@ -159,6 +159,35 @@ class Host2NodesMap {
}
}
+ /**
+ * Find data node by its transfer address
+ *
+ * @return DatanodeDescriptor if found or null otherwise
+ */
+ public DatanodeDescriptor getDatanodeByXferAddr(String ipAddr,
+ int xferPort) {
+ if (ipAddr==null) {
+ return null;
+ }
+
+ hostmapLock.readLock().lock();
+ try {
+ DatanodeDescriptor[] nodes = map.get(ipAddr);
+ // no entry
+ if (nodes== null) {
+ return null;
+ }
+ for(DatanodeDescriptor containedNode:nodes) {
+ if (xferPort == containedNode.getXferPort()) {
+ return containedNode;
+ }
+ }
+ return null;
+ } finally {
+ hostmapLock.readLock().unlock();
+ }
+ }
+
@Override
public String toString() {
final StringBuilder b = new StringBuilder(getClass().getSimpleName())
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java Fri Oct 12 00:15:22 2012
@@ -39,7 +39,8 @@ public final class HdfsServerConstants {
*/
static public enum NodeType {
NAME_NODE,
- DATA_NODE;
+ DATA_NODE,
+ JOURNAL_NODE;
}
/** Startup options */
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Fri Oct 12 00:15:22 2012
@@ -276,6 +276,9 @@ public class JspHelper {
FIELD_PERCENT_REMAINING = 9,
FIELD_ADMIN_STATE = 10,
FIELD_DECOMMISSIONED = 11,
+ FIELD_BLOCKPOOL_USED = 12,
+ FIELD_PERBLOCKPOOL_USED = 13,
+ FIELD_FAILED_VOLUMES = 14,
SORT_ORDER_ASC = 1,
SORT_ORDER_DSC = 2;
@@ -303,6 +306,12 @@ public class JspHelper {
sortField = FIELD_ADMIN_STATE;
} else if (field.equals("decommissioned")) {
sortField = FIELD_DECOMMISSIONED;
+ } else if (field.equals("bpused")) {
+ sortField = FIELD_BLOCKPOOL_USED;
+ } else if (field.equals("pcbpused")) {
+ sortField = FIELD_PERBLOCKPOOL_USED;
+ } else if (field.equals("volfails")) {
+ sortField = FIELD_FAILED_VOLUMES;
} else {
sortField = FIELD_NAME;
}
@@ -361,6 +370,18 @@ public class JspHelper {
case FIELD_NAME:
ret = d1.getHostName().compareTo(d2.getHostName());
break;
+ case FIELD_BLOCKPOOL_USED:
+ dlong = d1.getBlockPoolUsed() - d2.getBlockPoolUsed();
+ ret = (dlong < 0) ? -1 : ((dlong > 0) ? 1 : 0);
+ break;
+ case FIELD_PERBLOCKPOOL_USED:
+ ddbl = d1.getBlockPoolUsedPercent() - d2.getBlockPoolUsedPercent();
+ ret = (ddbl < 0) ? -1 : ((ddbl > 0) ? 1 : 0);
+ break;
+ case FIELD_FAILED_VOLUMES:
+ int dint = d1.getVolumeFailures() - d2.getVolumeFailures();
+ ret = (dint < 0) ? -1 : ((dint > 0) ? 1 : 0);
+ break;
}
return (sortOrder == SORT_ORDER_DSC) ? -ret : ret;
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Fri Oct 12 00:15:22 2012
@@ -42,6 +42,8 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.VersionInfo;
+import com.google.common.base.Preconditions;
+
/**
@@ -76,7 +78,7 @@ public abstract class Storage extends St
/** Layout versions of 0.20.203 release */
public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};
- private static final String STORAGE_FILE_LOCK = "in_use.lock";
+ public static final String STORAGE_FILE_LOCK = "in_use.lock";
protected static final String STORAGE_FILE_VERSION = "VERSION";
public static final String STORAGE_DIR_CURRENT = "current";
public static final String STORAGE_DIR_PREVIOUS = "previous";
@@ -752,6 +754,15 @@ public abstract class Storage extends St
return storageDirs.get(idx);
}
+ /**
+ * @return the storage directory, with the precondition that this storage
+ * has exactly one storage directory
+ */
+ public StorageDirectory getSingularStorageDir() {
+ Preconditions.checkState(storageDirs.size() == 1);
+ return storageDirs.get(0);
+ }
+
protected void addStorageDir(StorageDirectory sd) {
storageDirs.add(sd);
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Oct 12 00:15:22 2012
@@ -251,7 +251,7 @@ public class DataNode extends Configured
Daemon dataXceiverServer = null;
ThreadGroup threadGroup = null;
private DNConf dnConf;
- private boolean heartbeatsDisabledForTests = false;
+ private volatile boolean heartbeatsDisabledForTests = false;
private DataStorage storage = null;
private HttpServer infoServer = null;
DataNodeMetrics metrics;
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Fri Oct 12 00:15:22 2012
@@ -114,7 +114,7 @@ class EditLogBackupOutputStream extends
}
@Override // EditLogOutputStream
- protected void flushAndSync() throws IOException {
+ protected void flushAndSync(boolean durable) throws IOException {
assert out.getLength() == 0 : "Output buffer is not empty";
int numReadyTxns = doubleBuf.countReadyTxns();
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Fri Oct 12 00:15:22 2012
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -367,30 +368,36 @@ public class EditLogFileInputStream exte
@Override
public InputStream getInputStream() throws IOException {
- HttpURLConnection connection = (HttpURLConnection)
- SecurityUtil.openSecureHttpConnection(url);
-
- if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
- throw new HttpGetFailedException(
- "Fetch of " + url +
- " failed with status code " + connection.getResponseCode() +
- "\nResponse message:\n" + connection.getResponseMessage(),
- connection);
- }
-
- String contentLength = connection.getHeaderField(CONTENT_LENGTH);
- if (contentLength != null) {
- advertisedSize = Long.parseLong(contentLength);
- if (advertisedSize <= 0) {
- throw new IOException("Invalid " + CONTENT_LENGTH + " header: " +
- contentLength);
- }
- } else {
- throw new IOException(CONTENT_LENGTH + " header is not provided " +
- "by the server when trying to fetch " + url);
- }
-
- return connection.getInputStream();
+ return SecurityUtil.doAsCurrentUser(
+ new PrivilegedExceptionAction<InputStream>() {
+ @Override
+ public InputStream run() throws IOException {
+ HttpURLConnection connection = (HttpURLConnection)
+ SecurityUtil.openSecureHttpConnection(url);
+
+ if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
+ throw new HttpGetFailedException(
+ "Fetch of " + url +
+ " failed with status code " + connection.getResponseCode() +
+ "\nResponse message:\n" + connection.getResponseMessage(),
+ connection);
+ }
+
+ String contentLength = connection.getHeaderField(CONTENT_LENGTH);
+ if (contentLength != null) {
+ advertisedSize = Long.parseLong(contentLength);
+ if (advertisedSize <= 0) {
+ throw new IOException("Invalid " + CONTENT_LENGTH + " header: " +
+ contentLength);
+ }
+ } else {
+ throw new IOException(CONTENT_LENGTH + " header is not provided " +
+ "by the server when trying to fetch " + url);
+ }
+
+ return connection.getInputStream();
+ }
+ });
}
@Override
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Fri Oct 12 00:15:22 2012
@@ -176,7 +176,7 @@ public class EditLogFileOutputStream ext
* accumulates new log records while readyBuffer will be flushed and synced.
*/
@Override
- public void flushAndSync() throws IOException {
+ public void flushAndSync(boolean durable) throws IOException {
if (fp == null) {
throw new IOException("Trying to use aborted output stream");
}
@@ -186,7 +186,7 @@ public class EditLogFileOutputStream ext
}
preallocate(); // preallocate file if necessay
doubleBuf.flushTo(fp);
- if (!shouldSkipFsyncForTests) {
+ if (durable && !shouldSkipFsyncForTests) {
fc.force(false); // metadata updates not needed
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Fri Oct 12 00:15:22 2012
@@ -24,6 +24,7 @@ import static org.apache.hadoop.util.Tim
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.jasper.compiler.JspUtil;
/**
* A generic abstract class to support journaling of edits logs into
@@ -92,18 +93,24 @@ public abstract class EditLogOutputStrea
/**
* Flush and sync all data that is ready to be flush
* {@link #setReadyToFlush()} into underlying persistent store.
+ * @param durable if true, the edits should be made truly durable before
+ * returning
* @throws IOException
*/
- abstract protected void flushAndSync() throws IOException;
+ abstract protected void flushAndSync(boolean durable) throws IOException;
/**
* Flush data to persistent store.
* Collect sync metrics.
*/
public void flush() throws IOException {
+ flush(true);
+ }
+
+ public void flush(boolean durable) throws IOException {
numSync++;
long start = now();
- flushAndSync();
+ flushAndSync(durable);
long end = now();
totalTimeSync += (end - start);
}
@@ -132,4 +139,12 @@ public abstract class EditLogOutputStrea
protected long getNumSync() {
return numSync;
}
+
+ /**
+ * @return a short HTML snippet suitable for describing the current
+ * status of the stream
+ */
+ public String generateHtmlReport() {
+ return JspUtil.escapeXml(this.toString());
+ }
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java Fri Oct 12 00:15:22 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -34,7 +35,8 @@ import com.google.common.base.Preconditi
* to progress concurrently to flushes without allocating new buffers each
* time.
*/
-class EditsDoubleBuffer {
+@InterfaceAudience.Private
+public class EditsDoubleBuffer {
private TxnBuffer bufCurrent; // current buffer for writing
private TxnBuffer bufReady; // buffer ready for flushing
@@ -51,11 +53,11 @@ class EditsDoubleBuffer {
bufCurrent.writeOp(op);
}
- void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+ public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
bufCurrent.write(bytes, offset, length);
}
- void close() throws IOException {
+ public void close() throws IOException {
Preconditions.checkNotNull(bufCurrent);
Preconditions.checkNotNull(bufReady);
@@ -69,7 +71,7 @@ class EditsDoubleBuffer {
bufCurrent = bufReady = null;
}
- void setReadyToFlush() {
+ public void setReadyToFlush() {
assert isFlushed() : "previous data not flushed yet";
TxnBuffer tmp = bufReady;
bufReady = bufCurrent;
@@ -80,12 +82,12 @@ class EditsDoubleBuffer {
* Writes the content of the "ready" buffer to the given output stream,
* and resets it. Does not swap any buffers.
*/
- void flushTo(OutputStream out) throws IOException {
+ public void flushTo(OutputStream out) throws IOException {
bufReady.writeTo(out); // write data to file
bufReady.reset(); // erase all data in the buffer
}
- boolean shouldForceSync() {
+ public boolean shouldForceSync() {
return bufCurrent.size() >= initBufferSize;
}
@@ -120,6 +122,12 @@ class EditsDoubleBuffer {
return bufReady.numTxns;
}
+ /**
+ * @return the number of bytes that are ready to be flushed
+ */
+ public int countReadyBytes() {
+ return bufReady.size();
+ }
private static class TxnBuffer extends DataOutputBuffer {
long firstTxId;
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Fri Oct 12 00:15:22 2012
@@ -32,8 +32,16 @@ public interface FSClusterStats {
* @return a count of the total number of block transfers and block
* writes that are currently occuring on the cluster.
*/
-
- public int getTotalLoad() ;
+ public int getTotalLoad();
+
+ /**
+ * Indicate whether or not the cluster is now avoiding
+ * to use stale DataNodes for writing.
+ *
+ * @return True if the cluster is currently avoiding using stale DataNodes
+ * for writing targets, and false otherwise.
+ */
+ public boolean isAvoidingStaleDataNodesForWrite();
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Oct 12 00:15:22 2012
@@ -1171,6 +1171,7 @@ public class FSEditLog implements LogsPu
journalSet.recoverUnfinalizedSegments();
} catch (IOException ex) {
// All journals have failed, it is handled in logSync.
+ // TODO: are we sure this is OK?
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Oct 12 00:15:22 2012
@@ -4059,7 +4059,10 @@ public class FSNamesystem implements Nam
return "Safe mode is OFF.";
String leaveMsg = "";
if (areResourcesLow()) {
- leaveMsg = "Resources are low on NN. Safe mode must be turned off manually";
+ leaveMsg = "Resources are low on NN. "
+ + "Please add or free up more resources then turn off safe mode manually. "
+ + "NOTE: If you turn off safe mode before adding resources, "
+ + "the NN will immediately return to safe mode.";
} else {
leaveMsg = "Safe mode will be turned off automatically";
}
@@ -5536,4 +5539,10 @@ public class FSNamesystem implements Nam
public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
this.nnResourceChecker = nnResourceChecker;
}
+
+ @Override
+ public boolean isAvoidingStaleDataNodesForWrite() {
+ return this.blockManager.getDatanodeManager()
+ .isAvoidingStaleDataNodesForWrite();
+ }
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Fri Oct 12 00:15:22 2012
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.ComparisonChain;
@@ -51,7 +53,8 @@ import com.google.common.collect.Compari
* Note: this class is not thread-safe and should be externally
* synchronized.
*/
-class FileJournalManager implements JournalManager {
+@InterfaceAudience.Private
+public class FileJournalManager implements JournalManager {
private static final Log LOG = LogFactory.getLog(FileJournalManager.class);
private final StorageDirectory sd;
@@ -164,7 +167,7 @@ class FileJournalManager implements Jour
* @return a list of remote edit logs
* @throws IOException if edit logs cannot be listed.
*/
- List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
+ public List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
File currentDir = sd.getCurrentDir();
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
@@ -182,6 +185,8 @@ class FileJournalManager implements Jour
}
}
+ Collections.sort(ret);
+
return ret;
}
@@ -195,7 +200,7 @@ class FileJournalManager implements Jour
* @throws IOException
* IOException thrown for invalid logDir
*/
- static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
+ public static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
return matchEditLogs(FileUtil.listFiles(logDir));
}
@@ -223,7 +228,7 @@ class FileJournalManager implements Jour
try {
long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
ret.add(
- new EditLogFile(f, startTxId, startTxId, true));
+ new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID, true));
} catch (NumberFormatException nfe) {
LOG.error("In-progress edits file " + f + " has improperly " +
"formatted transaction ID");
@@ -237,15 +242,8 @@ class FileJournalManager implements Jour
@Override
synchronized public void selectInputStreams(
Collection<EditLogInputStream> streams, long fromTxId,
- boolean inProgressOk) {
- List<EditLogFile> elfs;
- try {
- elfs = matchEditLogs(sd.getCurrentDir());
- } catch (IOException e) {
- LOG.error("error listing files in " + this + ". " +
- "Skipping all edit logs in this directory.", e);
- return;
- }
+ boolean inProgressOk) throws IOException {
+ List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir());
LOG.debug(this + ": selecting input streams starting at " + fromTxId +
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
"from among " + elfs.size() + " candidate file(s)");
@@ -321,7 +319,7 @@ class FileJournalManager implements Jour
}
}
- List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
+ public List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
File currentDir = sd.getCurrentDir();
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
List<EditLogFile> logFiles = Lists.newArrayList();
@@ -337,6 +335,32 @@ class FileJournalManager implements Jour
return logFiles;
}
+
+ public EditLogFile getLogFile(long startTxId) throws IOException {
+ return getLogFile(sd.getCurrentDir(), startTxId);
+ }
+
+ public static EditLogFile getLogFile(File dir, long startTxId)
+ throws IOException {
+ List<EditLogFile> files = matchEditLogs(dir);
+ List<EditLogFile> ret = Lists.newLinkedList();
+ for (EditLogFile elf : files) {
+ if (elf.getFirstTxId() == startTxId) {
+ ret.add(elf);
+ }
+ }
+
+ if (ret.isEmpty()) {
+ // no matches
+ return null;
+ } else if (ret.size() == 1) {
+ return ret.get(0);
+ } else {
+ throw new IllegalStateException("More than one log segment in " +
+ dir + " starting at txid " + startTxId + ": " +
+ Joiner.on(", ").join(ret));
+ }
+ }
@Override
public String toString() {
@@ -346,7 +370,8 @@ class FileJournalManager implements Jour
/**
* Record of an edit log that has been located and had its filename parsed.
*/
- static class EditLogFile {
+ @InterfaceAudience.Private
+ public static class EditLogFile {
private File file;
private final long firstTxId;
private long lastTxId;
@@ -379,17 +404,20 @@ class FileJournalManager implements Jour
assert (firstTxId > 0) || (firstTxId == HdfsConstants.INVALID_TXID);
assert file != null;
+ Preconditions.checkArgument(!isInProgress ||
+ lastTxId == HdfsConstants.INVALID_TXID);
+
this.firstTxId = firstTxId;
this.lastTxId = lastTxId;
this.file = file;
this.isInProgress = isInProgress;
}
- long getFirstTxId() {
+ public long getFirstTxId() {
return firstTxId;
}
- long getLastTxId() {
+ public long getLastTxId() {
return lastTxId;
}
@@ -402,17 +430,17 @@ class FileJournalManager implements Jour
* This will update the lastTxId of the EditLogFile or
* mark it as corrupt if it is.
*/
- void validateLog() throws IOException {
+ public void validateLog() throws IOException {
EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
this.lastTxId = val.getEndTxId();
this.hasCorruptHeader = val.hasCorruptHeader();
}
- boolean isInProgress() {
+ public boolean isInProgress() {
return isInProgress;
}
- File getFile() {
+ public File getFile() {
return file;
}
@@ -425,7 +453,7 @@ class FileJournalManager implements Jour
renameSelf(".corrupt");
}
- void moveAsideEmptyFile() throws IOException {
+ public void moveAsideEmptyFile() throws IOException {
assert lastTxId == HdfsConstants.INVALID_TXID;
renameSelf(".empty");
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Fri Oct 12 00:15:22 2012
@@ -65,9 +65,11 @@ public interface JournalManager extends
* @param inProgressOk whether or not in-progress streams should be returned
*
* @return a list of streams
+ * @throws IOException if the underlying storage has an error or is otherwise
+ * inaccessible
*/
void selectInputStreams(Collection<EditLogInputStream> streams,
- long fromTxnId, boolean inProgressOk);
+ long fromTxnId, boolean inProgressOk) throws IOException;
/**
* Set the amount of memory that this stream should use to buffer edits
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java Fri Oct 12 00:15:22 2012
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.SortedSet;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -147,7 +148,7 @@ public class JournalSet implements Journ
return journal;
}
- private boolean isDisabled() {
+ boolean isDisabled() {
return disabled;
}
@@ -165,8 +166,12 @@ public class JournalSet implements Journ
return required;
}
}
-
- private List<JournalAndStream> journals = Lists.newArrayList();
+
+ // COW implementation is necessary since some users (eg the web ui) call
+ // getAllJournalStreams() and then iterate. Since this is rarely
+ // mutated, there is no performance concern.
+ private List<JournalAndStream> journals =
+ new CopyOnWriteArrayList<JournalSet.JournalAndStream>();
final int minimumRedundantJournals;
JournalSet(int minimumRedundantResources) {
@@ -242,8 +247,20 @@ public class JournalSet implements Journ
LOG.info("Skipping jas " + jas + " since it's disabled");
continue;
}
- jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
+ try {
+ jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
+ } catch (IOException ioe) {
+ LOG.warn("Unable to determine input streams from " + jas.getManager() +
+ ". Skipping.", ioe);
+ }
}
+ chainAndMakeRedundantStreams(streams, allStreams, fromTxId, inProgressOk);
+ }
+
+ public static void chainAndMakeRedundantStreams(
+ Collection<EditLogInputStream> outStreams,
+ PriorityQueue<EditLogInputStream> allStreams,
+ long fromTxId, boolean inProgressOk) {
// We want to group together all the streams that start on the same start
// transaction ID. To do this, we maintain an accumulator (acc) of all
// the streams we've seen at a given start transaction ID. When we see a
@@ -261,7 +278,7 @@ public class JournalSet implements Journ
if (accFirstTxId == elis.getFirstTxId()) {
acc.add(elis);
} else if (accFirstTxId < elis.getFirstTxId()) {
- streams.add(new RedundantEditLogInputStream(acc, fromTxId));
+ outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear();
acc.add(elis);
} else if (accFirstTxId > elis.getFirstTxId()) {
@@ -272,7 +289,7 @@ public class JournalSet implements Journ
}
}
if (!acc.isEmpty()) {
- streams.add(new RedundantEditLogInputStream(acc, fromTxId));
+ outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear();
}
}
@@ -454,12 +471,12 @@ public class JournalSet implements Journ
}
@Override
- protected void flushAndSync() throws IOException {
+ protected void flushAndSync(final boolean durable) throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
- jas.getCurrentStream().flushAndSync();
+ jas.getCurrentStream().flushAndSync(durable);
}
}
}, "flushAndSync");
@@ -512,7 +529,6 @@ public class JournalSet implements Journ
}
}
- @VisibleForTesting
List<JournalAndStream> getAllJournalStreams() {
return journals;
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Oct 12 00:15:22 2012
@@ -722,6 +722,12 @@ public class NameNode {
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
initializeGenericKeys(conf, nsId, namenodeId);
checkAllowFormat(conf);
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ InetSocketAddress socAddr = getAddress(conf);
+ SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
+ DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
+ }
Collection<URI> nameDirsToFormat = FSNamesystem.getNamespaceDirs(conf);
List<URI> sharedDirs = FSNamesystem.getSharedEditsDirs(conf);
@@ -763,13 +769,13 @@ public class NameNode {
}
@VisibleForTesting
- public static boolean initializeSharedEdits(Configuration conf) {
+ public static boolean initializeSharedEdits(Configuration conf) throws IOException {
return initializeSharedEdits(conf, true);
}
@VisibleForTesting
public static boolean initializeSharedEdits(Configuration conf,
- boolean force) {
+ boolean force) throws IOException {
return initializeSharedEdits(conf, force, false);
}
@@ -783,7 +789,7 @@ public class NameNode {
* @return true if the command aborts, false otherwise
*/
private static boolean initializeSharedEdits(Configuration conf,
- boolean force, boolean interactive) {
+ boolean force, boolean interactive) throws IOException {
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
initializeGenericKeys(conf, nsId, namenodeId);
@@ -794,6 +800,12 @@ public class NameNode {
return false;
}
+ if (UserGroupInformation.isSecurityEnabled()) {
+ InetSocketAddress socAddr = getAddress(conf);
+ SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
+ DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
+ }
+
NNStorage existingStorage = null;
try {
Configuration confWithoutShared = new Configuration(conf);
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java Fri Oct 12 00:15:22 2012
@@ -107,6 +107,10 @@ public class NameNodeHttpServer {
DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
SecurityUtil.getServerPrincipal(principalInConf,
bindAddress.getHostName()));
+ } else if (UserGroupInformation.isSecurityEnabled()) {
+ LOG.error("WebHDFS and security are enabled, but configuration property '" +
+ DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY +
+ "' is not set.");
}
String httpKeytab = conf.get(
DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY);
@@ -117,6 +121,10 @@ public class NameNodeHttpServer {
params.put(
DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY,
httpKeytab);
+ } else if (UserGroupInformation.isSecurityEnabled()) {
+ LOG.error("WebHDFS and security are enabled, but configuration property '" +
+ DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY +
+ "' is not set.");
}
return params;
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java Fri Oct 12 00:15:22 2012
@@ -41,6 +41,14 @@ final class NameNodeResourcePolicy {
static boolean areResourcesAvailable(
Collection<? extends CheckableNameNodeResource> resources,
int minimumRedundantResources) {
+
+ // TODO: workaround:
+ // - during startup, if there are no edits dirs on disk, then there is
+ // a call to areResourcesAvailable() with no dirs at all, which was
+ // previously causing the NN to enter safemode
+ if (resources.isEmpty()) {
+ return true;
+ }
int requiredResourceCount = 0;
int redundantResourceCount = 0;
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Fri Oct 12 00:15:22 2012
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.blo
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.io.Text;
@@ -60,6 +61,8 @@ import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.znerd.xmlenc.XMLOutputter;
+import com.google.common.base.Preconditions;
+
class NamenodeJspHelper {
static String getSafeModeText(FSNamesystem fsn) {
if (!fsn.isInSafeMode())
@@ -212,6 +215,52 @@ class NamenodeJspHelper {
out.print("</table></div>\n");
}
+
+ /**
+ * Generate an HTML report containing the current status of the HDFS
+ * journals.
+ */
+ void generateJournalReport(JspWriter out, NameNode nn,
+ HttpServletRequest request) throws IOException {
+ FSEditLog log = nn.getFSImage().getEditLog();
+ Preconditions.checkArgument(log != null, "no edit log set in %s", nn);
+
+ out.println("<h3> " + nn.getRole() + " Journal Status: </h3>");
+
+ out.println("<b>Current transaction ID:</b> " +
+ nn.getFSImage().getLastAppliedOrWrittenTxId() + "<br/>");
+
+
+ boolean openForWrite = log.isOpenForWrite();
+
+ out.println("<div class=\"dfstable\">");
+ out.println("<table class=\"storage\" title=\"NameNode Journals\">\n"
+ + "<thead><tr><td><b>Journal Manager</b></td><td><b>State</b></td></tr></thead>");
+ for (JournalAndStream jas : log.getJournals()) {
+ out.print("<tr>");
+ out.print("<td>" + jas.getManager());
+ if (jas.isRequired()) {
+ out.print(" [required]");
+ }
+ out.print("</td><td>");
+
+ if (jas.isDisabled()) {
+ out.print("<span class=\"failed\">Failed</span>");
+ } else if (openForWrite) {
+ EditLogOutputStream elos = jas.getCurrentStream();
+ if (elos != null) {
+ out.println(elos.generateHtmlReport());
+ } else {
+ out.println("not currently writing");
+ }
+ } else {
+ out.println("open for read");
+ }
+ out.println("</td></tr>");
+ }
+
+ out.println("</table></div>");
+ }
void generateHealthReport(JspWriter out, NameNode nn,
HttpServletRequest request) throws IOException {
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Fri Oct 12 00:15:22 2012
@@ -78,6 +78,7 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
/**********************************************************
@@ -122,6 +123,8 @@ public class SecondaryNameNode implement
private CheckpointConf checkpointConf;
private FSNamesystem namesystem;
+ private Thread checkpointThread;
+
@Override
public String toString() {
@@ -277,6 +280,15 @@ public class SecondaryNameNode implement
*/
public void shutdown() {
shouldRun = false;
+ if (checkpointThread != null) {
+ checkpointThread.interrupt();
+ try {
+ checkpointThread.join(10000);
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted waiting to join on checkpointer thread");
+ Thread.currentThread().interrupt(); // maintain status
+ }
+ }
try {
if (infoServer != null) infoServer.stop();
} catch (Exception e) {
@@ -586,12 +598,20 @@ public class SecondaryNameNode implement
terminate(ret);
}
- // Create a never ending deamon
- Daemon checkpointThread = new Daemon(secondary);
- checkpointThread.start();
+ secondary.startCheckpointThread();
}
+ public void startCheckpointThread() {
+ Preconditions.checkState(checkpointThread == null,
+ "Should not already have a thread");
+ Preconditions.checkState(shouldRun, "shouldRun should be true");
+
+ checkpointThread = new Daemon(this);
+ checkpointThread.start();
+ }
+
+
/**
* Container for parsed command-line options.
*/
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java Fri Oct 12 00:15:22 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.na
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import org.apache.commons.logging.Log;
@@ -172,11 +173,20 @@ public class EditLogTailer {
Preconditions.checkState(tailerThread == null ||
!tailerThread.isAlive(),
"Tailer thread should not be running once failover starts");
- try {
- doTailEdits();
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
+ // Important to do tailing as the login user, in case the shared
+ // edits storage is implemented by a JournalManager that depends
+ // on security credentials to access the logs (eg QuorumJournalManager).
+ SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ try {
+ doTailEdits();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ return null;
+ }
+ });
}
@VisibleForTesting
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java Fri Oct 12 00:15:22 2012
@@ -17,18 +17,15 @@
*/
package org.apache.hadoop.hdfs.server.protocol;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.io.Writable;
import com.google.common.base.Function;
import com.google.common.collect.ComparisonChain;
-public class RemoteEditLog implements Writable, Comparable<RemoteEditLog> {
+public class RemoteEditLog implements Comparable<RemoteEditLog> {
private long startTxId = HdfsConstants.INVALID_TXID;
private long endTxId = HdfsConstants.INVALID_TXID;
+ private boolean isInProgress = false;
public RemoteEditLog() {
}
@@ -36,6 +33,13 @@ public class RemoteEditLog implements Wr
public RemoteEditLog(long startTxId, long endTxId) {
this.startTxId = startTxId;
this.endTxId = endTxId;
+ this.isInProgress = (endTxId == HdfsConstants.INVALID_TXID);
+ }
+
+ public RemoteEditLog(long startTxId, long endTxId, boolean inProgress) {
+ this.startTxId = startTxId;
+ this.endTxId = endTxId;
+ this.isInProgress = inProgress;
}
public long getStartTxId() {
@@ -45,22 +49,18 @@ public class RemoteEditLog implements Wr
public long getEndTxId() {
return endTxId;
}
-
- @Override
- public String toString() {
- return "[" + startTxId + "," + endTxId + "]";
- }
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(startTxId);
- out.writeLong(endTxId);
+ public boolean isInProgress() {
+ return isInProgress;
}
@Override
- public void readFields(DataInput in) throws IOException {
- startTxId = in.readLong();
- endTxId = in.readLong();
+ public String toString() {
+ if (!isInProgress) {
+ return "[" + startTxId + "," + endTxId + "]";
+ } else {
+ return "[" + startTxId + "-? (in-progress)]";
+ }
}
@Override
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java Fri Oct 12 00:15:22 2012
@@ -40,8 +40,8 @@ public class RemoteEditLogManifest {
/**
- * Check that the logs are contiguous and non-overlapping
- * sequences of transactions, in sorted order
+ * Check that the logs are non-overlapping sequences of transactions,
+ * in sorted order. They do not need to be contiguous.
* @throws IllegalStateException if incorrect
*/
private void checkState() {
@@ -50,8 +50,10 @@ public class RemoteEditLogManifest {
RemoteEditLog prev = null;
for (RemoteEditLog log : logs) {
if (prev != null) {
- if (log.getStartTxId() != prev.getEndTxId() + 1) {
- throw new IllegalStateException("Invalid log manifest:" + this);
+ if (log.getStartTxId() <= prev.getEndTxId()) {
+ throw new IllegalStateException(
+ "Invalid log manifest (log " + log + " overlaps " + prev + ")\n"
+ + this);
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Fri Oct 12 00:15:22 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.tools;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -53,6 +54,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.util.StringUtils;
@@ -80,7 +82,7 @@ public class DFSAdmin extends FsShell {
super(fs.getConf());
if (!(fs instanceof DistributedFileSystem)) {
throw new IllegalArgumentException("FileSystem " + fs.getUri() +
- " is not a distributed file system");
+ " is not an HDFS file system");
}
this.dfs = (DistributedFileSystem)fs;
}
@@ -284,7 +286,7 @@ public class DFSAdmin extends FsShell {
FileSystem fs = getFS();
if (!(fs instanceof DistributedFileSystem)) {
throw new IllegalArgumentException("FileSystem " + fs.getUri() +
- " is not a distributed file system");
+ " is not an HDFS file system");
}
return (DistributedFileSystem)fs;
}
@@ -511,11 +513,17 @@ public class DFSAdmin extends FsShell {
* @return an exit code indicating success or failure.
* @throws IOException
*/
- public int fetchImage(String[] argv, int idx) throws IOException {
- String infoServer = DFSUtil.getInfoServer(
+ public int fetchImage(final String[] argv, final int idx) throws IOException {
+ final String infoServer = DFSUtil.getInfoServer(
HAUtil.getAddressOfActive(getDFS()), getConf(), false);
- TransferFsImage.downloadMostRecentImageToDirectory(infoServer,
- new File(argv[idx]));
+ SecurityUtil.doAsCurrentUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ TransferFsImage.downloadMostRecentImageToDirectory(infoServer,
+ new File(argv[idx]));
+ return null;
+ }
+ });
return 0;
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java Fri Oct 12 00:15:22 2012
@@ -56,7 +56,7 @@ public class BinaryEditsVisitor implemen
@Override
public void close(Throwable error) throws IOException {
elfos.setReadyToFlush();
- elfos.flushAndSync();
+ elfos.flushAndSync(true);
elfos.close();
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java Fri Oct 12 00:15:22 2012
@@ -57,7 +57,9 @@ public class PersistentLongFile {
}
public void set(long newVal) throws IOException {
- writeFile(file, newVal);
+ if (value != newVal || !loaded) {
+ writeFile(file, newVal);
+ }
value = newVal;
loaded = true;
}
Propchange: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1390763-1397380
Merged /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1363593-1396941
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c Fri Oct 12 00:15:22 2012
@@ -67,6 +67,25 @@ static const struct ExceptionInfo gExcep
};
+void getExceptionInfo(const char *excName, int noPrintFlags,
+ int *excErrno, int *shouldPrint)
+{
+ int i;
+
+ for (i = 0; i < EXCEPTION_INFO_LEN; i++) {
+ if (strstr(gExceptionInfo[i].name, excName)) {
+ break;
+ }
+ }
+ if (i < EXCEPTION_INFO_LEN) {
+ *shouldPrint = !(gExceptionInfo[i].noPrintFlag & noPrintFlags);
+ *excErrno = gExceptionInfo[i].excErrno;
+ } else {
+ *shouldPrint = 1;
+ *excErrno = EINTERNAL;
+ }
+}
+
int printExceptionAndFreeV(JNIEnv *env, jthrowable exc, int noPrintFlags,
const char *fmt, va_list ap)
{