You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/06 20:32:07 UTC
svn commit: r1143523 [2/4] - in /hadoop/common/branches/HDFS-1073/hdfs: ./
src/c++/libhdfs/ src/contrib/hdfsproxy/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/protocol/datatransfe...
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Jul 6 18:32:04 2011
@@ -85,7 +85,10 @@ class DataXceiver extends Receiver imple
private long opStartTime; //the start time of receiving an Op
public DataXceiver(Socket s, DataNode datanode,
- DataXceiverServer dataXceiverServer) {
+ DataXceiverServer dataXceiverServer) throws IOException {
+ super(new DataInputStream(new BufferedInputStream(
+ NetUtils.getInputStream(s), FSConstants.SMALL_BUFFER_SIZE)));
+
this.s = s;
this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
this.datanode = datanode;
@@ -122,18 +125,14 @@ class DataXceiver extends Receiver imple
DataNode getDataNode() {return datanode;}
/**
- * Read/write data from/to the DataXceiveServer.
+ * Read/write data from/to the DataXceiverServer.
*/
public void run() {
updateCurrentThreadName("Waiting for operation");
- DataInputStream in=null;
int opsProcessed = 0;
Op op = null;
try {
- in = new DataInputStream(
- new BufferedInputStream(NetUtils.getInputStream(s),
- SMALL_BUFFER_SIZE));
int stdTimeout = s.getSoTimeout();
// We process requests in a loop, and stay around for a short timeout.
@@ -145,7 +144,7 @@ class DataXceiver extends Receiver imple
assert socketKeepaliveTimeout > 0;
s.setSoTimeout(socketKeepaliveTimeout);
}
- op = readOp(in);
+ op = readOp();
} catch (InterruptedIOException ignored) {
// Time out while we wait for client rpc
break;
@@ -176,7 +175,7 @@ class DataXceiver extends Receiver imple
}
opStartTime = now();
- processOp(op, in);
+ processOp(op);
++opsProcessed;
} while (!s.isClosed() && socketKeepaliveTimeout > 0);
} catch (Throwable t) {
@@ -196,13 +195,12 @@ class DataXceiver extends Receiver imple
}
}
- /**
- * Read a block from the disk.
- */
@Override
- protected void opReadBlock(DataInputStream in, ExtendedBlock block,
- long startOffset, long length, String clientName,
- Token<BlockTokenIdentifier> blockToken) throws IOException {
+ public void readBlock(final ExtendedBlock block,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final long blockOffset,
+ final long length) throws IOException {
OutputStream baseStream = NetUtils.getOutputStream(s,
datanode.socketWriteTimeout);
DataOutputStream out = new DataOutputStream(
@@ -225,7 +223,7 @@ class DataXceiver extends Receiver imple
updateCurrentThreadName("Sending block " + block);
try {
try {
- blockSender = new BlockSender(block, startOffset, length,
+ blockSender = new BlockSender(block, blockOffset, length,
true, true, false, datanode, clientTraceFmt);
} catch(IOException e) {
LOG.info("opReadBlock " + block + " received exception " + e);
@@ -284,16 +282,17 @@ class DataXceiver extends Receiver imple
datanode.metrics.incrReadsFromClient(isLocal);
}
- /**
- * Write a block to disk.
- */
@Override
- protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block,
- final int pipelineSize, final BlockConstructionStage stage,
- final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
- final String clientname, final DatanodeInfo srcDataNode,
- final DatanodeInfo[] targets, final Token<BlockTokenIdentifier> blockToken
- ) throws IOException {
+ public void writeBlock(final ExtendedBlock block,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientname,
+ final DatanodeInfo[] targets,
+ final DatanodeInfo srcDataNode,
+ final BlockConstructionStage stage,
+ final int pipelineSize,
+ final long minBytesRcvd,
+ final long maxBytesRcvd,
+ final long latestGenerationStamp) throws IOException {
updateCurrentThreadName("Receiving block " + block + " client=" + clientname);
final boolean isDatanode = clientname.length() == 0;
final boolean isClient = !isDatanode;
@@ -308,7 +307,7 @@ class DataXceiver extends Receiver imple
if (LOG.isDebugEnabled()) {
LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname
- + "\n block =" + block + ", newGs=" + newGs
+ + "\n block =" + block + ", newGs=" + latestGenerationStamp
+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
+ "\n targets=" + Arrays.asList(targets)
+ "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
@@ -351,10 +350,10 @@ class DataXceiver extends Receiver imple
blockReceiver = new BlockReceiver(block, in,
s.getRemoteSocketAddress().toString(),
s.getLocalSocketAddress().toString(),
- stage, newGs, minBytesRcvd, maxBytesRcvd,
+ stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode);
} else {
- datanode.data.recoverClose(block, newGs, minBytesRcvd);
+ datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
}
//
@@ -380,9 +379,9 @@ class DataXceiver extends Receiver imple
SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
- Sender.opWriteBlock(mirrorOut, originalBlock,
- pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname,
- srcDataNode, targets, blockToken);
+ new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
+ clientname, targets, srcDataNode, stage, pipelineSize,
+ minBytesRcvd, maxBytesRcvd, latestGenerationStamp);
if (blockReceiver != null) { // send checksum header
blockReceiver.writeChecksumHeader(mirrorOut);
@@ -464,7 +463,7 @@ class DataXceiver extends Receiver imple
// update its generation stamp
if (isClient &&
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
- block.setGenerationStamp(newGs);
+ block.setGenerationStamp(latestGenerationStamp);
block.setNumBytes(minBytesRcvd);
}
@@ -499,10 +498,10 @@ class DataXceiver extends Receiver imple
}
@Override
- protected void opTransferBlock(final DataInputStream in,
- final ExtendedBlock blk, final String client,
- final DatanodeInfo[] targets,
- final Token<BlockTokenIdentifier> blockToken) throws IOException {
+ public void transferBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final DatanodeInfo[] targets) throws IOException {
checkAccess(null, true, blk, blockToken,
Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
@@ -511,19 +510,16 @@ class DataXceiver extends Receiver imple
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
try {
- datanode.transferReplicaForPipelineRecovery(blk, targets, client);
+ datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
writeResponse(Status.SUCCESS, out);
} finally {
IOUtils.closeStream(out);
}
}
- /**
- * Get block checksum (MD5 of CRC32).
- */
@Override
- protected void opBlockChecksum(DataInputStream in, ExtendedBlock block,
- Token<BlockTokenIdentifier> blockToken) throws IOException {
+ public void blockChecksum(final ExtendedBlock block,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException {
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
checkAccess(out, true, block, blockToken,
@@ -572,12 +568,9 @@ class DataXceiver extends Receiver imple
datanode.metrics.addBlockChecksumOp(elapsed());
}
- /**
- * Read a block from the disk and then sends it to a destination.
- */
@Override
- protected void opCopyBlock(DataInputStream in, ExtendedBlock block,
- Token<BlockTokenIdentifier> blockToken) throws IOException {
+ public void copyBlock(final ExtendedBlock block,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException {
updateCurrentThreadName("Copying block " + block);
// Read in the header
if (datanode.isBlockTokenEnabled) {
@@ -647,15 +640,12 @@ class DataXceiver extends Receiver imple
datanode.metrics.addCopyBlockOp(elapsed());
}
- /**
- * Receive a block and write it to disk, it then notifies the namenode to
- * remove the copy from the source.
- */
@Override
- protected void opReplaceBlock(DataInputStream in,
- ExtendedBlock block, String sourceID, DatanodeInfo proxySource,
- Token<BlockTokenIdentifier> blockToken) throws IOException {
- updateCurrentThreadName("Replacing block " + block + " from " + sourceID);
+ public void replaceBlock(final ExtendedBlock block,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String delHint,
+ final DatanodeInfo proxySource) throws IOException {
+ updateCurrentThreadName("Replacing block " + block + " from " + delHint);
/* read header */
block.setNumBytes(dataXceiverServer.estimateBlockSize);
@@ -699,7 +689,7 @@ class DataXceiver extends Receiver imple
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
/* send request to the proxy */
- Sender.opCopyBlock(proxyOut, block, blockToken);
+ new Sender(proxyOut).copyBlock(block, blockToken);
// receive the response from the proxy
proxyReply = new DataInputStream(new BufferedInputStream(
@@ -727,7 +717,7 @@ class DataXceiver extends Receiver imple
dataXceiverServer.balanceThrottler, null);
// notify name node
- datanode.notifyNamenodeReceivedBlock(block, sourceID);
+ datanode.notifyNamenodeReceivedBlock(block, delHint);
LOG.info("Moved block " + block +
" from " + s.getRemoteSocketAddress());
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Wed Jul 6 18:32:04 2011
@@ -28,12 +28,13 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
/**
@@ -128,22 +129,27 @@ class DataXceiverServer implements Runna
DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT));
}
- /**
- */
+ @Override
public void run() {
while (datanode.shouldRun) {
try {
Socket s = ss.accept();
s.setTcpNoDelay(true);
- new Daemon(datanode.threadGroup,
- new DataXceiver(s, datanode, this)).start();
+ final DataXceiver exciver;
+ try {
+ exciver = new DataXceiver(s, datanode, this);
+ } catch(IOException e) {
+ IOUtils.closeSocket(s);
+ throw e;
+ }
+ new Daemon(datanode.threadGroup, exciver).start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
} catch (IOException ie) {
- LOG.warn(datanode.getMachineName() + ":DataXceiveServer: ", ie);
+ LOG.warn(datanode.getMachineName() + ":DataXceiverServer: ", ie);
} catch (Throwable te) {
LOG.error(datanode.getMachineName()
- + ":DataXceiveServer: Exiting due to: ", te);
+ + ":DataXceiverServer: Exiting due to: ", te);
datanode.shouldRun = false;
}
}
@@ -151,7 +157,7 @@ class DataXceiverServer implements Runna
ss.close();
} catch (IOException ie) {
LOG.warn(datanode.getMachineName()
- + ":DataXceiveServer: Close exception due to: ", ie);
+ + ":DataXceiverServer: Close exception due to: ", ie);
}
}
@@ -161,7 +167,7 @@ class DataXceiverServer implements Runna
try {
this.ss.close();
} catch (IOException ie) {
- LOG.warn(datanode.getMachineName() + ":DataXceiveServer.kill(): "
+ LOG.warn(datanode.getMachineName() + ":DataXceiverServer.kill(): "
+ StringUtils.stringifyException(ie));
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Jul 6 18:32:04 2011
@@ -1150,7 +1150,7 @@ public class FSDataset implements FSCons
conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
- String[] dataDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+ String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java Wed Jul 6 18:32:04 2011
@@ -17,8 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.net.InetSocketAddress;
+import java.net.URL;
+import java.net.URLConnection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -27,13 +31,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import javax.management.JMX;
-import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -43,24 +41,27 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.util.StringUtils;
+import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.znerd.xmlenc.XMLOutputter;
/**
* This class generates the data that is needed to be displayed on cluster web
- * console by connecting to each namenode through JMX.
+ * console.
*/
@InterfaceAudience.Private
class ClusterJspHelper {
private static final Log LOG = LogFactory.getLog(ClusterJspHelper.class);
public static final String OVERALL_STATUS = "overall-status";
public static final String DEAD = "Dead";
+ private static final String JMX_QRY =
+ "/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo";
/**
* JSP helper function that generates cluster health report. When
* encountering exception while getting Namenode status, the exception will
- * be listed in the page with corresponding stack trace.
+ * be listed on the page with corresponding stack trace.
*/
ClusterStatus generateClusterHealthReport() {
ClusterStatus cs = new ClusterStatus();
@@ -79,26 +80,24 @@ class ClusterJspHelper {
NamenodeMXBeanHelper nnHelper = null;
try {
nnHelper = new NamenodeMXBeanHelper(isa, conf);
- NamenodeStatus nn = nnHelper.getNamenodeStatus();
+ String mbeanProps= queryMbean(nnHelper.httpAddress, conf);
+ NamenodeStatus nn = nnHelper.getNamenodeStatus(mbeanProps);
if (cs.clusterid.isEmpty() || cs.clusterid.equals("")) { // Set clusterid only once
- cs.clusterid = nnHelper.getClusterId();
+ cs.clusterid = nnHelper.getClusterId(mbeanProps);
}
cs.addNamenodeStatus(nn);
} catch ( Exception e ) {
// track exceptions encountered when connecting to namenodes
cs.addException(isa.getHostName(), e);
continue;
- } finally {
- if (nnHelper != null) {
- nnHelper.cleanup();
- }
- }
+ }
}
return cs;
}
/**
- * Helper function that generates the decommissioning report.
+ * Helper function that generates the decommissioning report. Connect to each
+ * Namenode over http via JmxJsonServlet to collect the data nodes status.
*/
DecommissionStatus generateDecommissioningReport() {
String clusterid = "";
@@ -127,21 +126,18 @@ class ClusterJspHelper {
NamenodeMXBeanHelper nnHelper = null;
try {
nnHelper = new NamenodeMXBeanHelper(isa, conf);
+ String mbeanProps= queryMbean(nnHelper.httpAddress, conf);
if (clusterid.equals("")) {
- clusterid = nnHelper.getClusterId();
+ clusterid = nnHelper.getClusterId(mbeanProps);
}
- nnHelper.getDecomNodeInfoForReport(statusMap);
+ nnHelper.getDecomNodeInfoForReport(statusMap, mbeanProps);
} catch (Exception e) {
// catch exceptions encountered while connecting to namenodes
String nnHost = isa.getHostName();
decommissionExceptions.put(nnHost, e);
unreportedNamenode.add(nnHost);
continue;
- } finally {
- if (nnHelper != null) {
- nnHelper.cleanup();
- }
- }
+ }
}
updateUnknownStatus(statusMap, unreportedNamenode);
getDecommissionNodeClusterState(statusMap);
@@ -260,40 +256,20 @@ class ClusterJspHelper {
}
return Integer.parseInt(address.split(":")[1]);
}
-
+
/**
- * Class for connecting to Namenode over JMX and get attributes
- * exposed by the MXBean.
+ * Class for connecting to Namenode over http via JmxJsonServlet
+ * to get JMX attributes exposed by the MXBean.
*/
static class NamenodeMXBeanHelper {
private static final ObjectMapper mapper = new ObjectMapper();
- private final InetSocketAddress rpcAddress;
private final String host;
- private final Configuration conf;
- private final JMXConnector connector;
- private final NameNodeMXBean mxbeanProxy;
+ private final String httpAddress;
NamenodeMXBeanHelper(InetSocketAddress addr, Configuration conf)
throws IOException, MalformedObjectNameException {
- this.rpcAddress = addr;
this.host = addr.getHostName();
- this.conf = conf;
- int port = conf.getInt("dfs.namenode.jmxport", -1);
-
- JMXServiceURL jmxURL = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://"
- + host + ":" + port + "/jmxrmi");
- connector = JMXConnectorFactory.connect(jmxURL);
- mxbeanProxy = getNamenodeMxBean();
- }
-
- private NameNodeMXBean getNamenodeMxBean()
- throws IOException, MalformedObjectNameException {
- // Get an MBeanServerConnection on the remote VM.
- MBeanServerConnection remote = connector.getMBeanServerConnection();
- ObjectName mxbeanName = new ObjectName(
- "Hadoop:service=NameNode,name=NameNodeInfo");
-
- return JMX.newMXBeanProxy(remote, mxbeanName, NameNodeMXBean.class);
+ this.httpAddress = DFSUtil.getInfoServer(addr, conf, false);
}
/** Get the map corresponding to the JSON string */
@@ -305,10 +281,9 @@ class ClusterJspHelper {
}
/**
- * Process JSON string returned from JMX connection to get the number of
- * live datanodes.
+ * Get the number of live datanodes.
*
- * @param json JSON output from JMX call that contains live node status.
+ * @param json JSON string that contains live node status.
* @param nn namenode status to return information in
*/
private static void getLiveNodeCount(String json, NamenodeStatus nn)
@@ -333,11 +308,10 @@ class ClusterJspHelper {
}
/**
- * Count the number of dead datanode based on the JSON string returned from
- * JMX call.
+ * Count the number of dead datanode.
*
* @param nn namenode
- * @param json JSON string returned from JMX call
+ * @param json JSON string
*/
private static void getDeadNodeCount(String json, NamenodeStatus nn)
throws IOException {
@@ -358,51 +332,53 @@ class ClusterJspHelper {
}
}
- public String getClusterId() {
- return mxbeanProxy.getClusterId();
+ public String getClusterId(String props) throws IOException {
+ return getProperty(props, "ClusterId").getTextValue();
}
- public NamenodeStatus getNamenodeStatus()
- throws IOException, MalformedObjectNameException {
+ public NamenodeStatus getNamenodeStatus(String props) throws IOException,
+ MalformedObjectNameException, NumberFormatException {
NamenodeStatus nn = new NamenodeStatus();
nn.host = host;
- nn.filesAndDirectories = mxbeanProxy.getTotalFiles();
- nn.capacity = mxbeanProxy.getTotal();
- nn.free = mxbeanProxy.getFree();
- nn.bpUsed = mxbeanProxy.getBlockPoolUsedSpace();
- nn.nonDfsUsed = mxbeanProxy.getNonDfsUsedSpace();
- nn.blocksCount = mxbeanProxy.getTotalBlocks();
- nn.missingBlocksCount = mxbeanProxy.getNumberOfMissingBlocks();
- nn.free = mxbeanProxy.getFree();
- nn.httpAddress = DFSUtil.getInfoServer(rpcAddress, conf, false);
- getLiveNodeCount(mxbeanProxy.getLiveNodes(), nn);
- getDeadNodeCount(mxbeanProxy.getDeadNodes(), nn);
+ nn.filesAndDirectories = getProperty(props, "TotalFiles").getLongValue();
+ nn.capacity = getProperty(props, "Total").getLongValue();
+ nn.free = getProperty(props, "Free").getLongValue();
+ nn.bpUsed = getProperty(props, "BlockPoolUsedSpace").getLongValue();
+ nn.nonDfsUsed = getProperty(props, "NonDfsUsedSpace").getLongValue();
+ nn.blocksCount = getProperty(props, "TotalBlocks").getLongValue();
+ nn.missingBlocksCount = getProperty(props, "NumberOfMissingBlocks")
+ .getLongValue();
+ nn.httpAddress = httpAddress;
+ getLiveNodeCount(getProperty(props, "LiveNodes").getValueAsText(), nn);
+ getDeadNodeCount(getProperty(props, "DeadNodes").getValueAsText(), nn);
return nn;
}
/**
- * Connect to namenode to get decommission node information.
+ * Get the decommission node information.
* @param statusMap data node status map
- * @param connector JMXConnector
+ * @param props string
*/
private void getDecomNodeInfoForReport(
- Map<String, Map<String, String>> statusMap) throws IOException,
- MalformedObjectNameException {
- getLiveNodeStatus(statusMap, host, mxbeanProxy.getLiveNodes());
- getDeadNodeStatus(statusMap, host, mxbeanProxy.getDeadNodes());
- getDecommissionNodeStatus(statusMap, host, mxbeanProxy.getDecomNodes());
+ Map<String, Map<String, String>> statusMap, String props)
+ throws IOException, MalformedObjectNameException {
+ getLiveNodeStatus(statusMap, host, getProperty(props, "LiveNodes")
+ .getValueAsText());
+ getDeadNodeStatus(statusMap, host, getProperty(props, "DeadNodes")
+ .getValueAsText());
+ getDecommissionNodeStatus(statusMap, host,
+ getProperty(props, "DecomNodes").getValueAsText());
}
/**
- * Process the JSON string returned from JMX call to get live datanode
- * status. Store the information into datanode status map and
- * Decommissionnode.
+ * Store the live datanode status information into datanode status map and
+ * DecommissionNode.
*
* @param statusMap Map of datanode status. Key is datanode, value
* is an inner map whose key is namenode, value is datanode status.
* reported by each namenode.
* @param namenodeHost host name of the namenode
- * @param decomnode update Decommissionnode with alive node status
+ * @param decomnode update DecommissionNode with alive node status
* @param json JSON string contains datanode status
* @throws IOException
*/
@@ -434,15 +410,14 @@ class ClusterJspHelper {
}
/**
- * Process the JSON string returned from JMX connection to get the dead
- * datanode information. Store the information into datanode status map and
- * Decommissionnode.
+ * Store the dead datanode information into datanode status map and
+ * DecommissionNode.
*
* @param statusMap map with key being datanode, value being an
* inner map (key:namenode, value:decommisionning state).
* @param host datanode hostname
- * @param decomnode
- * @param json
+ * @param decomnode DecommissionNode
+ * @param json String
* @throws IOException
*/
private static void getDeadNodeStatus(
@@ -478,14 +453,13 @@ class ClusterJspHelper {
}
/**
- * We process the JSON string returned from JMX connection to get the
- * decommisioning datanode information.
+ * Get the decommisioning datanode information.
*
* @param dataNodeStatusMap map with key being datanode, value being an
* inner map (key:namenode, value:decommisionning state).
* @param host datanode
- * @param decomnode Decommissionnode
- * @param json JSON string returned from JMX connection
+ * @param decomnode DecommissionNode
+ * @param json String
*/
private static void getDecommissionNodeStatus(
Map<String, Map<String, String>> dataNodeStatusMap, String host,
@@ -508,19 +482,6 @@ class ClusterJspHelper {
dataNodeStatusMap.put(dn, nnStatus);
}
}
-
-
- public void cleanup() {
- if (connector != null) {
- try {
- connector.close();
- } catch (Exception e) {
- // log failure of close jmx connection
- LOG.warn("Unable to close JMX connection. "
- + StringUtils.stringifyException(e));
- }
- }
- }
}
/**
@@ -893,4 +854,47 @@ class ClusterJspHelper {
doc.endTag(); // message
doc.endTag(); // cluster
}
+
+ /**
+ * Read in the content from a URL
+ * @param url URL To read
+ * @return the text from the output
+ * @throws IOException if something went wrong
+ */
+ private static String readOutput(URL url) throws IOException {
+ StringBuilder out = new StringBuilder();
+ URLConnection connection = url.openConnection();
+ BufferedReader in = new BufferedReader(
+ new InputStreamReader(
+ connection.getInputStream()));
+ String inputLine;
+ while ((inputLine = in.readLine()) != null) {
+ out.append(inputLine);
+ }
+ in.close();
+ return out.toString();
+ }
+
+ private static String queryMbean(String httpAddress, Configuration conf)
+ throws IOException {
+ URL url = new URL("http://"+httpAddress+JMX_QRY);
+ return readOutput(url);
+ }
+ /**
+ * In order to query a namenode mxbean, a http connection in the form of
+ * "http://hostname/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo"
+ * is sent to namenode. JMX attributes are exposed via JmxJsonServelet on
+ * the namenode side.
+ */
+ private static JsonNode getProperty(String props, String propertyname)
+ throws IOException {
+ if (props == null || props.equals("") || propertyname == null
+ || propertyname.equals("")) {
+ return null;
+ }
+ ObjectMapper m = new ObjectMapper();
+ JsonNode rootNode = m.readValue(props, JsonNode.class);
+ JsonNode jn = rootNode.get("beans").get(0).get(propertyname);
+ return jn;
+ }
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java Wed Jul 6 18:32:04 2011
@@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.util.CyclicIteration;
/**
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Wed Jul 6 18:32:04 2011
@@ -154,19 +154,31 @@ class EditLogFileOutputStream extends Ed
// close should have been called after all pending transactions
// have been flushed & synced.
- int bufSize = bufCurrent.size();
- if (bufSize != 0) {
- throw new IOException("FSEditStream has " + bufSize
- + " bytes still to be flushed and cannot " + "be closed.");
+ // if already closed, just skip
+ if(bufCurrent != null)
+ {
+ int bufSize = bufCurrent.size();
+ if (bufSize != 0) {
+ throw new IOException("FSEditStream has " + bufSize
+ + " bytes still to be flushed and cannot " + "be closed.");
+ }
+ bufCurrent.close();
+ bufCurrent = null;
}
- bufCurrent.close();
- bufReady.close();
- // remove the last INVALID marker from transaction log.
- fc.truncate(fc.position());
- fp.close();
+ if(bufReady != null) {
+ bufReady.close();
+ bufReady = null;
+ }
- bufCurrent = bufReady = null;
+ // remove the last INVALID marker from transaction log.
+ if (fc != null && fc.isOpen()) {
+ fc.truncate(fc.position());
+ fc.close();
+ }
+ if (fp != null) {
+ fp.close();
+ }
fp = null;
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed Jul 6 18:32:04 2011
@@ -17,40 +17,47 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.Condition;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.FSLimitException;
-import org.apache.hadoop.hdfs.protocol.FSLimitException.*;
+import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
+import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.util.ByteArray;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
/*************************************************
* FSDirectory stores the filesystem directory state.
@@ -61,7 +68,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
* and logged to disk.
*
*************************************************/
-class FSDirectory implements Closeable {
+public class FSDirectory implements Closeable {
INodeDirectoryWithQuota rootDir;
FSImage fsImage;
@@ -1337,7 +1344,7 @@ class FSDirectory implements Closeable {
* @throws QuotaExceededException if the new count violates any quota limit
* @throws FileNotFound if path does not exist.
*/
- void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
+ public void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
throws QuotaExceededException,
FileNotFoundException,
UnresolvedLinkException {
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Wed Jul 6 18:32:04 2011
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
@@ -33,12 +35,32 @@ import org.apache.hadoop.hdfs.protocol.F
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogHeader;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Reader;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ClearNSQuotaOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ConcatDeleteOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetNSQuotaOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
public class FSEditLogLoader {
private final FSNamesystem fsNamesys;
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Wed Jul 6 18:32:04 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.Storage;
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Wed Jul 6 18:32:04 2011
@@ -414,7 +414,12 @@ public class FSImage implements Closeabl
LOG.info("Upgrade of " + sd.getRoot() + " is complete.");
}
isUpgradeFinalized = false;
- storage.reportErrorsOnDirectories(errorSDs);
+ if (!errorSDs.isEmpty()) {
+ storage.reportErrorsOnDirectories(errorSDs);
+ //during upgrade, it's a fatal error to fail any storage directory
+ throw new IOException("Upgrade failed in " + errorSDs.size()
+ + " storage directory(ies), previously logged.");
+ }
storage.initializeDistributedUpgrade();
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Wed Jul 6 18:32:04 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.io.MD5Hash;
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Wed Jul 6 18:32:04 2011
@@ -22,7 +22,6 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -33,6 +32,9 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Jul 6 18:32:04 2011
@@ -31,6 +31,7 @@ import java.io.PrintWriter;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.URI;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -96,15 +97,20 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
-import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
@@ -235,7 +241,7 @@ public class FSNamesystem implements FSC
// Stores the correct file name hierarchy
//
public FSDirectory dir;
- BlockManager blockManager;
+ public BlockManager blockManager;
// Block pool ID used by this namenode
String blockPoolId;
@@ -270,10 +276,10 @@ public class FSNamesystem implements FSC
* Stores a set of DatanodeDescriptor objects.
* This is a subset of {@link #datanodeMap}, containing nodes that are
* considered alive.
- * The {@link HeartbeatMonitor} periodically checks for outdated entries,
+ * The HeartbeatMonitor periodically checks for out-dated entries,
* and removes them from the list.
*/
- ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
+ public ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
public LeaseManager leaseManager = new LeaseManager(this);
@@ -314,8 +320,8 @@ public class FSNamesystem implements FSC
private volatile SafeModeInfo safeMode; // safe mode information
private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
- // datanode networktoplogy
- NetworkTopology clusterMap = new NetworkTopology();
+ /** datanode network toplogy */
+ public NetworkTopology clusterMap = new NetworkTopology();
private DNSToSwitchMapping dnsToSwitchMapping;
private HostsFileReader hostsReader;
@@ -329,7 +335,7 @@ public class FSNamesystem implements FSC
private final GenerationStamp generationStamp = new GenerationStamp();
// Ask Datanode only up to this many blocks to delete.
- int blockInvalidateLimit = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
+ public int blockInvalidateLimit = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
// precision of access times.
private long accessTimePrecision = 0;
@@ -472,23 +478,23 @@ public class FSNamesystem implements FSC
}
// utility methods to acquire and release read lock and write lock
- void readLock() {
+ public void readLock() {
this.fsLock.readLock().lock();
}
- void readUnlock() {
+ public void readUnlock() {
this.fsLock.readLock().unlock();
}
- void writeLock() {
+ public void writeLock() {
this.fsLock.writeLock().lock();
}
- void writeUnlock() {
+ public void writeUnlock() {
this.fsLock.writeLock().unlock();
}
- boolean hasWriteLock() {
+ public boolean hasWriteLock() {
return this.fsLock.isWriteLockedByCurrentThread();
}
@@ -1014,7 +1020,7 @@ public class FSNamesystem implements FSC
}
/** Create a LocatedBlock. */
- LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
+ public LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
final long offset, final boolean corrupt) throws IOException {
return new LocatedBlock(getExtendedBlock(b), locations, offset, corrupt);
}
@@ -3013,7 +3019,7 @@ public class FSNamesystem implements FSC
* @return an array of datanode commands
* @throws IOException
*/
- DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
+ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes)
throws IOException {
@@ -3521,7 +3527,7 @@ public class FSNamesystem implements FSC
* If no such a node is available,
* then pick a node with least free space
*/
- void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess,
+ public void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess,
Block b, short replication,
DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint,
@@ -3785,9 +3791,19 @@ public class FSNamesystem implements FSC
nodes.add(dn);
}
//Remove any form of the this datanode in include/exclude lists.
- mustList.remove(dn.getName());
- mustList.remove(dn.getHost());
- mustList.remove(dn.getHostName());
+ try {
+ InetAddress inet = InetAddress.getByName(dn.getHost());
+ // compare hostname(:port)
+ mustList.remove(inet.getHostName());
+ mustList.remove(inet.getHostName()+":"+dn.getPort());
+ // compare ipaddress(:port)
+ mustList.remove(inet.getHostAddress().toString());
+ mustList.remove(inet.getHostAddress().toString()+ ":" +dn.getPort());
+ } catch ( UnknownHostException e ) {
+ mustList.remove(dn.getName());
+ mustList.remove(dn.getHost());
+ LOG.warn(e);
+ }
}
}
@@ -3970,45 +3986,6 @@ public class FSNamesystem implements FSC
}
/**
- * A immutable object that stores the number of live replicas and
- * the number of decommissined Replicas.
- */
- static class NumberReplicas {
- private int liveReplicas;
- int decommissionedReplicas;
- private int corruptReplicas;
- private int excessReplicas;
-
- NumberReplicas() {
- initialize(0, 0, 0, 0);
- }
-
- NumberReplicas(int live, int decommissioned, int corrupt, int excess) {
- initialize(live, decommissioned, corrupt, excess);
- }
-
- void initialize(int live, int decommissioned, int corrupt, int excess) {
- liveReplicas = live;
- decommissionedReplicas = decommissioned;
- corruptReplicas = corrupt;
- excessReplicas = excess;
- }
-
- int liveReplicas() {
- return liveReplicas;
- }
- int decommissionedReplicas() {
- return decommissionedReplicas;
- }
- int corruptReplicas() {
- return corruptReplicas;
- }
- int excessReplicas() {
- return excessReplicas;
- }
- }
-
- /**
* Change, if appropriate, the admin state of a datanode to
* decommission completed. Return true if decommission is complete.
*/
@@ -4032,23 +4009,62 @@ public class FSNamesystem implements FSC
*/
private boolean inHostsList(DatanodeID node, String ipAddr) {
Set<String> hostsList = hostsReader.getHosts();
- return (hostsList.isEmpty() ||
- (ipAddr != null && hostsList.contains(ipAddr)) ||
- hostsList.contains(node.getHost()) ||
- hostsList.contains(node.getName()) ||
- ((node instanceof DatanodeInfo) &&
- hostsList.contains(((DatanodeInfo)node).getHostName())));
+ return checkInList(node, ipAddr, hostsList, false);
}
private boolean inExcludedHostsList(DatanodeID node, String ipAddr) {
Set<String> excludeList = hostsReader.getExcludedHosts();
- return ((ipAddr != null && excludeList.contains(ipAddr)) ||
- excludeList.contains(node.getHost()) ||
- excludeList.contains(node.getName()) ||
- ((node instanceof DatanodeInfo) &&
- excludeList.contains(((DatanodeInfo)node).getHostName())));
+ return checkInList(node, ipAddr, excludeList, true);
}
+
+ /**
+ * Check if the given node (of DatanodeID or ipAddress) is in the (include or
+ * exclude) list. If ipAddress in null, check only based upon the given
+ * DatanodeID. If ipAddress is not null, the ipAddress should refers to the
+ * same host that given DatanodeID refers to.
+ *
+ * @param node, DatanodeID, the host DatanodeID
+ * @param ipAddress, if not null, should refers to the same host
+ * that DatanodeID refers to
+ * @param hostsList, the list of hosts in the include/exclude file
+ * @param isExcludeList, boolean, true if this is the exclude list
+ * @return boolean, if in the list
+ */
+ private boolean checkInList(DatanodeID node, String ipAddress,
+ Set<String> hostsList, boolean isExcludeList) {
+ InetAddress iaddr = null;
+ try {
+ if (ipAddress != null) {
+ iaddr = InetAddress.getByName(ipAddress);
+ } else {
+ iaddr = InetAddress.getByName(node.getHost());
+ }
+ }catch (UnknownHostException e) {
+ LOG.warn("Unknown host in host list: "+ipAddress);
+ // can't resolve the host name.
+ if (isExcludeList){
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ // if include list is empty, host is in include list
+ if ( (!isExcludeList) && (hostsList.isEmpty()) ){
+ return true;
+ }
+ return // compare ipaddress(:port)
+ (hostsList.contains(iaddr.getHostAddress().toString()))
+ || (hostsList.contains(iaddr.getHostAddress().toString() + ":"
+ + node.getPort()))
+ // compare hostname(:port)
+ || (hostsList.contains(iaddr.getHostName()))
+ || (hostsList.contains(iaddr.getHostName() + ":" + node.getPort()))
+ || ((node instanceof DatanodeInfo) && hostsList
+ .contains(((DatanodeInfo) node).getHostName()));
+ }
+
/**
* Rereads the config to get hosts and exclude list file names.
* Rereads the files to update the hosts and exclude lists. It
@@ -4626,7 +4642,7 @@ public class FSNamesystem implements FSC
* Check whether the name node is in safe mode.
* @return true if safe mode is ON, false otherwise
*/
- boolean isInSafeMode() {
+ public boolean isInSafeMode() {
// safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null)
@@ -4637,7 +4653,7 @@ public class FSNamesystem implements FSC
/**
* Check whether the name node is in startup mode.
*/
- boolean isInStartupSafeMode() {
+ public boolean isInStartupSafeMode() {
// safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null)
@@ -4648,7 +4664,7 @@ public class FSNamesystem implements FSC
/**
* Check whether replication queues are populated.
*/
- boolean isPopulatingReplQueues() {
+ public boolean isPopulatingReplQueues() {
// safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null)
@@ -4660,7 +4676,7 @@ public class FSNamesystem implements FSC
* Increment number of blocks that reached minimal replication.
* @param replication current replication
*/
- void incrementSafeBlockCount(int replication) {
+ public void incrementSafeBlockCount(int replication) {
// safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null)
@@ -4671,7 +4687,7 @@ public class FSNamesystem implements FSC
/**
* Decrement number of blocks that reached minimal replication.
*/
- void decrementSafeBlockCount(Block b) {
+ public void decrementSafeBlockCount(Block b) {
// safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null) // mostly true
@@ -4978,13 +4994,13 @@ public class FSNamesystem implements FSC
@Override // FSNamesystemMBean
@Metric
public long getPendingReplicationBlocks() {
- return blockManager.pendingReplicationBlocksCount;
+ return blockManager.getPendingReplicationBlocksCount();
}
@Override // FSNamesystemMBean
@Metric
public long getUnderReplicatedBlocks() {
- return blockManager.underReplicatedBlocksCount;
+ return blockManager.getUnderReplicatedBlocksCount();
}
/** Return number of under-replicated but not missing blocks */
@@ -4995,23 +5011,23 @@ public class FSNamesystem implements FSC
/** Returns number of blocks with corrupt replicas */
@Metric({"CorruptBlocks", "Number of blocks with corrupt replicas"})
public long getCorruptReplicaBlocks() {
- return blockManager.corruptReplicaBlocksCount;
+ return blockManager.getCorruptReplicaBlocksCount();
}
@Override // FSNamesystemMBean
@Metric
public long getScheduledReplicationBlocks() {
- return blockManager.scheduledReplicationBlocksCount;
+ return blockManager.getScheduledReplicationBlocksCount();
}
@Metric
public long getPendingDeletionBlocks() {
- return blockManager.pendingDeletionBlocksCount;
+ return blockManager.getPendingDeletionBlocksCount();
}
@Metric
public long getExcessBlocks() {
- return blockManager.excessBlocksCount;
+ return blockManager.getExcessBlocksCount();
}
@Metric
@@ -5380,7 +5396,7 @@ public class FSNamesystem implements FSC
}
/** Get a datanode descriptor given corresponding storageID */
- DatanodeDescriptor getDatanode(String nodeID) {
+ public DatanodeDescriptor getDatanode(String nodeID) {
assert hasReadOrWriteLock();
return datanodeMap.get(nodeID);
}
@@ -5444,7 +5460,7 @@ public class FSNamesystem implements FSC
if (startBlockAfter != null) {
startBlockId = Block.filename2id(startBlockAfter);
}
- BlockIterator blkIterator = blockManager.getCorruptReplicaBlockIterator();
+ UnderReplicatedBlocks.BlockIterator blkIterator = blockManager.getCorruptReplicaBlockIterator();
while (blkIterator.hasNext()) {
Block blk = blkIterator.next();
INode inode = blockManager.getINode(blk);
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Wed Jul 6 18:32:04 2011
@@ -117,7 +117,7 @@ public class FileDataServlet extends Dfs
.getParameter(JspHelper.DELEGATION_PARAMETER_NAME);
HdfsFileStatus info = nn.getFileInfo(path);
- if ((info != null) && !info.isDir()) {
+ if (info != null && !info.isDir()) {
try {
response.sendRedirect(createUri(path, info, ugi, nn, request,
delegationToken).toURL().toString());
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Host2NodesMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Host2NodesMap.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Host2NodesMap.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Host2NodesMap.java Wed Jul 6 18:32:04 2011
@@ -17,10 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Random;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+
class Host2NodesMap {
private HashMap<String, DatanodeDescriptor[]> map
= new HashMap<String, DatanodeDescriptor[]>();
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Wed Jul 6 18:32:04 2011
@@ -20,11 +20,13 @@ package org.apache.hadoop.hdfs.server.na
import java.util.Arrays;
import java.util.List;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.util.StringUtils;
/**
@@ -32,7 +34,7 @@ import org.apache.hadoop.util.StringUtil
* This is a base INode class containing common fields for file and
* directory inodes.
*/
-abstract class INode implements Comparable<byte[]>, FSInodeInfo {
+public abstract class INode implements Comparable<byte[]>, FSInodeInfo {
/*
* The inode name is in java UTF8 encoding;
* The name in HdfsFileStatus should keep the same encoding as this.
@@ -324,7 +326,7 @@ abstract class INode implements Comparab
/**
* Is this inode being constructed?
*/
- boolean isUnderConstruction() {
+ public boolean isUnderConstruction() {
return false;
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Wed Jul 6 18:32:04 2011
@@ -409,20 +409,31 @@ class INodeDirectory extends INode {
/** {@inheritDoc} */
long[] computeContentSummary(long[] summary) {
+ // Walk through the children of this node, using a new summary array
+ // for the (sub)tree rooted at this node
+ assert 4 == summary.length;
+ long[] subtreeSummary = new long[]{0,0,0,0};
if (children != null) {
for (INode child : children) {
- child.computeContentSummary(summary);
+ child.computeContentSummary(subtreeSummary);
}
}
if (this instanceof INodeDirectoryWithQuota) {
// Warn if the cached and computed diskspace values differ
INodeDirectoryWithQuota node = (INodeDirectoryWithQuota)this;
long space = node.diskspaceConsumed();
- if (-1 != node.getDsQuota() && space != summary[3]) {
+ assert -1 == node.getDsQuota() || space == subtreeSummary[3];
+ if (-1 != node.getDsQuota() && space != subtreeSummary[3]) {
NameNode.LOG.warn("Inconsistent diskspace for directory "
- +getLocalName()+". Cached: "+space+" Computed: "+summary[3]);
+ +getLocalName()+". Cached: "+space+" Computed: "+subtreeSummary[3]);
}
}
+
+ // update the passed summary array with the values for this node's subtree
+ for (int i = 0; i < summary.length; i++) {
+ summary[i] += subtreeSummary[i];
+ }
+
summary[2]++;
return summary;
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Wed Jul 6 18:32:04 2011
@@ -24,8 +24,11 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
-class INodeFile extends INode {
+/** I-node for closed file. */
+public class INodeFile extends INode {
static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
//Number of bits for Block size
@@ -106,7 +109,7 @@ class INodeFile extends INode {
* Get file blocks
* @return file blocks
*/
- BlockInfo[] getBlocks() {
+ public BlockInfo[] getBlocks() {
return this.blocks;
}
@@ -149,7 +152,7 @@ class INodeFile extends INode {
/**
* Set file block
*/
- void setBlock(int idx, BlockInfo blk) {
+ public void setBlock(int idx, BlockInfo blk) {
this.blocks[idx] = blk;
}
@@ -237,7 +240,7 @@ class INodeFile extends INode {
* Get the last block of the file.
* Make sure it has the right type.
*/
- <T extends BlockInfo> T getLastBlock() throws IOException {
+ public <T extends BlockInfo> T getLastBlock() throws IOException {
if (blocks == null || blocks.length == 0)
return null;
T returnBlock = null;
@@ -252,7 +255,8 @@ class INodeFile extends INode {
return returnBlock;
}
- int numBlocks() {
+ /** @return the number of blocks */
+ public int numBlocks() {
return blocks == null ? 0 : blocks.length;
}
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Wed Jul 6 18:32:04 2011
@@ -21,10 +21,15 @@ import java.io.IOException;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
-
-class INodeFileUnderConstruction extends INodeFile {
+/**
+ * I-node for file being written.
+ */
+public class INodeFileUnderConstruction extends INodeFile {
private String clientName; // lease holder
private final String clientMachine;
private final DatanodeDescriptor clientNode; // if client is a cluster node too.
@@ -43,7 +48,7 @@ class INodeFileUnderConstruction extends
this.clientNode = clientNode;
}
- public INodeFileUnderConstruction(byte[] name,
+ INodeFileUnderConstruction(byte[] name,
short blockReplication,
long modificationTime,
long preferredBlockSize,
@@ -80,7 +85,7 @@ class INodeFileUnderConstruction extends
* Is this inode being constructed?
*/
@Override
- boolean isUnderConstruction() {
+ public boolean isUnderConstruction() {
return true;
}
@@ -122,7 +127,7 @@ class INodeFileUnderConstruction extends
* Convert the last block of the file to an under-construction block.
* Set its locations.
*/
- BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
+ public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
DatanodeDescriptor[] targets)
throws IOException {
if (blocks == null || blocks.length == 0) {
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Wed Jul 6 18:32:04 2011
@@ -163,7 +163,7 @@ public class LeaseManager {
/**
* Finds the pathname for the specified pendingFile
*/
- synchronized String findPath(INodeFileUnderConstruction pendingFile)
+ public synchronized String findPath(INodeFileUnderConstruction pendingFile)
throws IOException {
Lease lease = getLease(pendingFile.getClientName());
if (lease != null) {
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java Wed Jul 6 18:32:04 2011
@@ -464,6 +464,7 @@ public class NNStorage extends Storage i
// Close any edits stream associated with this dir and remove directory
LOG.warn("writeTransactionIdToStorage failed on " + sd,
e);
+ reportErrorsOnDirectory(sd);
}
}
}
@@ -828,17 +829,17 @@ public class NNStorage extends Storage i
* @throws IOException
*/
void reportErrorsOnDirectory(StorageDirectory sd) {
- LOG.warn("Error reported on storage directory " + sd);
+ LOG.error("Error reported on storage directory " + sd);
String lsd = listStorageDirectories();
LOG.debug("current list of storage dirs:" + lsd);
- LOG.info("About to remove corresponding storage: "
+ LOG.warn("About to remove corresponding storage: "
+ sd.getRoot().getAbsolutePath());
try {
sd.unlock();
} catch (Exception e) {
- LOG.info("Unable to unlock bad storage directory: "
+ LOG.warn("Unable to unlock bad storage directory: "
+ sd.getRoot().getPath(), e);
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Jul 6 18:32:04 2011
@@ -248,7 +248,7 @@ public class NameNode implements Namenod
/** Return the {@link FSNamesystem} object.
* @return {@link FSNamesystem} object.
*/
- FSNamesystem getNamesystem() {
+ public FSNamesystem getNamesystem() {
return namesystem;
}
@@ -439,6 +439,13 @@ public class NameNode implements Namenod
// The rpc-server port can be ephemeral... ensure we have the correct info
this.rpcAddress = this.server.getListenerAddress();
setRpcServerAddress(conf);
+
+ try {
+ validateConfigurationSettings(conf);
+ } catch (IOException e) {
+ LOG.fatal(e.toString());
+ throw e;
+ }
activate(conf);
LOG.info(getRole() + " up at: " + rpcAddress);
@@ -446,6 +453,28 @@ public class NameNode implements Namenod
LOG.info(getRole() + " service server is up at: " + serviceRPCAddress);
}
}
+
+ /**
+ * Verifies that the final Configuration Settings look ok for the NameNode to
+ * properly start up
+ * Things to check for include:
+ * - HTTP Server Port does not equal the RPC Server Port
+ * @param conf
+ * @throws IOException
+ */
+ protected void validateConfigurationSettings(final Configuration conf)
+ throws IOException {
+ // check to make sure the web port and rpc port do not match
+ if(getHttpServerAddress(conf).getPort()
+ == getRpcServerAddress(conf).getPort()) {
+ String errMsg = "dfs.namenode.rpc-address " +
+ "("+ getRpcServerAddress(conf) + ") and " +
+ "dfs.namenode.http-address ("+ getHttpServerAddress(conf) + ") " +
+ "configuration keys are bound to the same port, unable to start " +
+ "NameNode. Port: " + getRpcServerAddress(conf).getPort();
+ throw new IOException(errMsg);
+ }
+ }
/**
* Activate name-node servers and threads.
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Wed Jul 6 18:32:04 2011
@@ -36,20 +36,20 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
/**
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Wed Jul 6 18:32:04 2011
@@ -36,24 +36,24 @@ import javax.servlet.http.HttpServletRes
import javax.servlet.jsp.JspWriter;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ServletUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
-
-import org.znerd.xmlenc.*;
+import org.znerd.xmlenc.XMLOutputter;
class NamenodeJspHelper {
static String getSafeModeText(FSNamesystem fsn) {
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Wed Jul 6 18:32:04 2011
@@ -253,7 +253,7 @@ public class SecondaryNameNode implement
infoServer.addSslListener(secInfoSocAddr, conf, false, true);
}
- infoServer.setAttribute("secondary.name.node", this);
+ infoServer.setAttribute("secondary.name.node", SecondaryNameNode.this);
infoServer.setAttribute("name.system.image", checkpointImage);
infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
infoServer.addInternalServlet("getimage", "/getimage",
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java Wed Jul 6 18:32:04 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.DFSInputSt
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.mortbay.jetty.InclusiveByteRange;
@@ -46,7 +47,7 @@ public class StreamFile extends DfsServl
public static final String CONTENT_LENGTH = "Content-Length";
- /** getting a client for connecting to dfs */
+ /* Return a DFS client to use to make the given HTTP request */
protected DFSClient getDFSClient(HttpServletRequest request)
throws IOException, InterruptedException {
final Configuration conf =
@@ -57,6 +58,7 @@ public class StreamFile extends DfsServl
return DatanodeJspHelper.getDFSClient(request, datanode, conf, ugi);
}
+ @SuppressWarnings("unchecked")
public void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
final String path = request.getPathInfo() != null ?
@@ -69,9 +71,10 @@ public class StreamFile extends DfsServl
return;
}
- Enumeration<?> reqRanges = request.getHeaders("Range");
- if (reqRanges != null && !reqRanges.hasMoreElements())
+ Enumeration<String> reqRanges = request.getHeaders("Range");
+ if (reqRanges != null && !reqRanges.hasMoreElements()) {
reqRanges = null;
+ }
DFSClient dfs;
try {
@@ -81,107 +84,82 @@ public class StreamFile extends DfsServl
return;
}
- final DFSInputStream in = dfs.open(filename);
- final long fileLen = in.getFileLength();
- OutputStream os = response.getOutputStream();
+ DFSInputStream in = null;
+ OutputStream out = null;
try {
+ in = dfs.open(filename);
+ out = response.getOutputStream();
+ final long fileLen = in.getFileLength();
if (reqRanges != null) {
- List<?> ranges = InclusiveByteRange.satisfiableRanges(reqRanges,
- fileLen);
- StreamFile.sendPartialData(in, os, response, fileLen, ranges);
+ List<InclusiveByteRange> ranges =
+ InclusiveByteRange.satisfiableRanges(reqRanges, fileLen);
+ StreamFile.sendPartialData(in, out, response, fileLen, ranges);
} else {
// No ranges, so send entire file
response.setHeader("Content-Disposition", "attachment; filename=\"" +
filename + "\"");
response.setContentType("application/octet-stream");
response.setHeader(CONTENT_LENGTH, "" + fileLen);
- StreamFile.writeTo(in, os, 0L, fileLen);
+ StreamFile.copyFromOffset(in, out, 0L, fileLen);
}
- } catch(IOException e) {
+ in.close();
+ in = null;
+ out.close();
+ out = null;
+ dfs.close();
+ dfs = null;
+ } catch (IOException ioe) {
if (LOG.isDebugEnabled()) {
- LOG.debug("response.isCommitted()=" + response.isCommitted(), e);
+ LOG.debug("response.isCommitted()=" + response.isCommitted(), ioe);
}
- throw e;
+ throw ioe;
} finally {
- try {
- in.close();
- os.close();
- } finally {
- dfs.close();
- }
- }
+ IOUtils.cleanup(LOG, in);
+ IOUtils.cleanup(LOG, out);
+ IOUtils.cleanup(LOG, dfs);
+ }
}
+ /**
+ * Send a partial content response with the given range. If there are
+ * no satisfiable ranges, or if multiple ranges are requested, which
+ * is unsupported, respond with range not satisfiable.
+ *
+ * @param in stream to read from
+ * @param out stream to write to
+ * @param response http response to use
+ * @param contentLength for the response header
+ * @param ranges to write to respond with
+ * @throws IOException on error sending the response
+ */
static void sendPartialData(FSInputStream in,
- OutputStream os,
+ OutputStream out,
HttpServletResponse response,
long contentLength,
- List<?> ranges)
- throws IOException {
-
+ List<InclusiveByteRange> ranges)
+ throws IOException {
if (ranges == null || ranges.size() != 1) {
- // if there are no satisfiable ranges, or if multiple ranges are
- // requested (we don't support multiple range requests), send 416 response
response.setContentLength(0);
- int status = HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE;
- response.setStatus(status);
- response.setHeader("Content-Range",
+ response.setStatus(HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE);
+ response.setHeader("Content-Range",
InclusiveByteRange.to416HeaderRangeString(contentLength));
} else {
- // if there is only a single valid range (must be satisfiable
- // since were here now), send that range with a 206 response
- InclusiveByteRange singleSatisfiableRange =
- (InclusiveByteRange)ranges.get(0);
+ InclusiveByteRange singleSatisfiableRange = ranges.get(0);
long singleLength = singleSatisfiableRange.getSize(contentLength);
response.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT);
response.setHeader("Content-Range",
singleSatisfiableRange.toHeaderRangeString(contentLength));
- System.out.println("first: "+singleSatisfiableRange.getFirst(contentLength));
- System.out.println("singleLength: "+singleLength);
-
- StreamFile.writeTo(in,
- os,
- singleSatisfiableRange.getFirst(contentLength),
- singleLength);
+ copyFromOffset(in, out,
+ singleSatisfiableRange.getFirst(contentLength),
+ singleLength);
}
}
-
- static void writeTo(FSInputStream in,
- OutputStream os,
- long start,
- long count)
- throws IOException {
- byte buf[] = new byte[4096];
- long bytesRemaining = count;
- int bytesRead;
- int bytesToRead;
-
- in.seek(start);
-
- while (true) {
- // number of bytes to read this iteration
- bytesToRead = (int)(bytesRemaining<buf.length ?
- bytesRemaining:
- buf.length);
-
- // number of bytes actually read this iteration
- bytesRead = in.read(buf, 0, bytesToRead);
-
- // if we can't read anymore, break
- if (bytesRead == -1) {
- break;
- }
-
- os.write(buf, 0, bytesRead);
-
- bytesRemaining -= bytesRead;
-
- // if we don't need to read anymore, break
- if (bytesRemaining <= 0) {
- break;
- }
- }
+ /* Copy count bytes at the given offset from one stream to another */
+ static void copyFromOffset(FSInputStream in, OutputStream out, long offset,
+ long count) throws IOException {
+ in.seek(offset);
+ IOUtils.copyBytes(in, out, count, false);
}
}