You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2010/09/14 02:25:37 UTC
svn commit: r996727 [1/2] - in /hadoop/hdfs/branches/HDFS-1052: ./
src/c++/libhdfs/ src/contrib/hdfsproxy/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/security/token/block/ src/ja...
Author: suresh
Date: Tue Sep 14 00:25:35 2010
New Revision: 996727
URL: http://svn.apache.org/viewvc?rev=996727&view=rev
Log:
Merging changes from trunk
Modified:
hadoop/hdfs/branches/HDFS-1052/ (props changed)
hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
hadoop/hdfs/branches/HDFS-1052/build.xml (contents, props changed)
hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs/ (props changed)
hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs/hdfsJniHelper.c
hadoop/hdfs/branches/HDFS-1052/src/contrib/hdfsproxy/ (props changed)
hadoop/hdfs/branches/HDFS-1052/src/java/ (props changed)
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/BlockReader.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSck.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/ (props changed)
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCorruption.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNThroughputBenchmark.java
hadoop/hdfs/branches/HDFS-1052/src/webapps/datanode/ (props changed)
hadoop/hdfs/branches/HDFS-1052/src/webapps/hdfs/ (props changed)
hadoop/hdfs/branches/HDFS-1052/src/webapps/hdfs/corrupt_files.jsp
hadoop/hdfs/branches/HDFS-1052/src/webapps/secondary/ (props changed)
Propchange: hadoop/hdfs/branches/HDFS-1052/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 14 00:25:35 2010
@@ -1,4 +1,4 @@
/hadoop/core/branches/branch-0.19/hdfs:713112
/hadoop/hdfs/branches/HDFS-265:796829-820463
/hadoop/hdfs/branches/branch-0.21:820487
-/hadoop/hdfs/trunk:987665-992489
+/hadoop/hdfs/trunk:987665-996725
Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Tue Sep 14 00:25:35 2010
@@ -35,6 +35,8 @@ Trunk (unreleased changes)
HDFS-1359. Add BlockPoolID to Block. (suresh)
+ HDFS-1361. Add -fileStatus operation to NNThroughputBenchmark. (shv)
+
IMPROVEMENTS
HDFS-1096. fix for prev. commit. (boryas)
@@ -110,6 +112,9 @@ Trunk (unreleased changes)
HDFS-330. Datanode Web UIs should provide robots.txt.
(Allen Wittenauer via jghoman)
+ HDFS-881. Refactor DataNode Packet header into DataTransferProtocol.
+ (Todd Lipcon via jghoman)
+
HDFS-1036. docs for fetchdt
HDFS-1318. Add JMX interface for read access to namenode and datanode
@@ -118,6 +123,11 @@ Trunk (unreleased changes)
HDFS-1356. Provide information as to whether or not security is
enabled on web interface for NameNode (boryas)
+ HDFS-1205. FSDatasetAsyncDiskService should name its threads.
+ (Todd Lipcon via eli)
+
+ HDFS-1111. Introduce getCorruptFileBlocks() for fsck. (Sriram Rao via shv)
+
OPTIMIZATIONS
HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)
@@ -233,6 +243,22 @@ Trunk (unreleased changes)
HDFS-1284. TestBlockToken fails. (Kan Zhang via jghoman)
+ HDFS-1355. ant veryclean (clean-cache) doesn't clean enough.
+ (Luke Lu via jghoman)
+
+ HDFS-1353. Remove most of getBlockLocation optimization. (jghoman)
+
+ HDFS-1369. Invalid javadoc reference in FSDatasetMBean.java (Eli Collins)
+
+ HDFS-829. hdfsJniHelper.c: #include <error.h> is not portable.
+ (Allen Wittenauer via jghoman)
+
+ HDFS-1310. The ClientDatanodeProtocol proxy should be stopped in
+ DFSInputStream.readBlockLength(..). (sam rash via szetszwo)
+
+ HDFS-1357. HFTP traffic served by DataNode shouldn't use service port
+ on NameNode. (Kan Zhang via jghoman)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/hdfs/branches/HDFS-1052/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/build.xml?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/build.xml (original)
+++ hadoop/hdfs/branches/HDFS-1052/build.xml Tue Sep 14 00:25:35 2010
@@ -1189,10 +1189,7 @@
</target>
<target name="clean-cache" depends="clean" description="Clean. Delete ivy cache">
- <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop/hadoop-hdfs"/>
- <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop/hadoop-hdfs-test"/>
- <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop/hadoop-hdfs-${herriot.suffix}"/>
- <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop/hadoop-hdfs-${herriot.suffix}-test"/>
+ <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop"/>
</target>
<!-- ================================================================== -->
Propchange: hadoop/hdfs/branches/HDFS-1052/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 14 00:25:35 2010
@@ -2,4 +2,4 @@
/hadoop/core/trunk/build.xml:779102
/hadoop/hdfs/branches/HDFS-265/build.xml:796829-820463
/hadoop/hdfs/branches/branch-0.21/build.xml:820487
-/hadoop/hdfs/trunk/build.xml:987665-992489
+/hadoop/hdfs/trunk/build.xml:987665-996725
Propchange: hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 14 00:25:35 2010
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/c++/libhdfs:713112
/hadoop/core/trunk/src/c++/libhdfs:776175-784663
-/hadoop/hdfs/trunk/src/c++/libhdfs:987665-992489
+/hadoop/hdfs/trunk/src/c++/libhdfs:987665-996725
Modified: hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs/hdfsJniHelper.c
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/c%2B%2B/libhdfs/hdfsJniHelper.c?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs/hdfsJniHelper.c (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs/hdfsJniHelper.c Tue Sep 14 00:25:35 2010
@@ -16,7 +16,6 @@
* limitations under the License.
*/
#include <string.h>
-#include <error.h>
#include "hdfsJniHelper.h"
static pthread_mutex_t hdfsHashMutex = PTHREAD_MUTEX_INITIALIZER;
Propchange: hadoop/hdfs/branches/HDFS-1052/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 14 00:25:35 2010
@@ -2,4 +2,4 @@
/hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
/hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy:820487
-/hadoop/hdfs/trunk/src/contrib/hdfsproxy:987665-992489
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:987665-996725
Propchange: hadoop/hdfs/branches/HDFS-1052/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 14 00:25:35 2010
@@ -2,4 +2,4 @@
/hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
/hadoop/hdfs/branches/HDFS-265/src/java:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/java:820487
-/hadoop/hdfs/trunk/src/java:987665-992489
+/hadoop/hdfs/trunk/src/java:987665-996725
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/BlockReader.java Tue Sep 14 00:25:35 2010
@@ -34,6 +34,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -211,35 +212,23 @@ public class BlockReader extends FSInput
// Read next packet if the previous packet has been read completely.
if (dataLeft <= 0) {
//Read packet headers.
- int packetLen = in.readInt();
- long offsetInBlock = in.readLong();
- long seqno = in.readLong();
- boolean lastPacketInBlock = in.readBoolean();
-
+ PacketHeader header = new PacketHeader();
+ header.readFields(in);
+
if (LOG.isDebugEnabled()) {
- LOG.debug("DFSClient readChunk got seqno " + seqno +
- " offsetInBlock " + offsetInBlock +
- " lastPacketInBlock " + lastPacketInBlock +
- " packetLen " + packetLen);
+ LOG.debug("DFSClient readChunk got header " + header);
}
-
- int dataLen = in.readInt();
-
+
// Sanity check the lengths
- if ( ( dataLen <= 0 && !lastPacketInBlock ) ||
- ( dataLen != 0 && lastPacketInBlock) ||
- (seqno != (lastSeqNo + 1)) ) {
- throw new IOException("BlockReader: error in packet header" +
- "(chunkOffset : " + chunkOffset +
- ", dataLen : " + dataLen +
- ", seqno : " + seqno +
- " (last: " + lastSeqNo + "))");
+ if (!header.sanityCheck(lastSeqNo)) {
+ throw new IOException("BlockReader: error in packet header " +
+ header);
}
-
- lastSeqNo = seqno;
- dataLeft = dataLen;
- adjustChecksumBytes(dataLen);
- if (dataLen > 0) {
+
+ lastSeqNo = header.getSeqno();
+ dataLeft = header.getDataLen();
+ adjustChecksumBytes(header.getDataLen());
+ if (header.getDataLen() > 0) {
IOUtils.readFully(in, checksumBytes.array(), 0,
checksumBytes.limit());
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Tue Sep 14 00:25:35 2010
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
@@ -146,10 +147,14 @@ public class DFSInputStream extends FSIn
int replicaNotFoundCount = locatedblock.getLocations().length;
for(DatanodeInfo datanode : locatedblock.getLocations()) {
+ ClientDatanodeProtocol cdp = null;
+
try {
- final ClientDatanodeProtocol cdp = DFSClient.createClientDatanodeProtocolProxy(
- datanode, dfsClient.conf, dfsClient.socketTimeout, locatedblock);
+ cdp = DFSClient.createClientDatanodeProtocolProxy(
+ datanode, dfsClient.conf, dfsClient.socketTimeout, locatedblock);
+
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
+
if (n >= 0) {
return n;
}
@@ -166,6 +171,10 @@ public class DFSInputStream extends FSIn
DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode "
+ datanode + " for block " + locatedblock.getBlock(), ioe);
}
+ } finally {
+ if (cdp != null) {
+ RPC.stopProxy(cdp);
+ }
}
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Sep 14 00:25:35 2010
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
@@ -127,17 +128,30 @@ class DFSOutputStream extends FSOutputSu
private short blockReplication; // replication factor of file
private class Packet {
- ByteBuffer buffer; // only one of buf and buffer is non-null
- byte[] buf;
long seqno; // sequencenumber of buffer in block
long offsetInBlock; // offset in block
boolean lastPacketInBlock; // is this the last packet in block?
int numChunks; // number of chunks currently in packet
int maxChunks; // max chunks in packet
- int dataStart;
- int dataPos;
- int checksumStart;
- int checksumPos;
+
+ /** buffer for accumulating packet checksum and data */
+ ByteBuffer buffer; // wraps buf, only one of these two may be non-null
+ byte[] buf;
+
+ /**
+ * buf is pointed into like follows:
+ * (C is checksum data, D is payload data)
+ *
+ * [HHHHHCCCCC________________DDDDDDDDDDDDDDDD___]
+ * ^ ^ ^ ^
+ * | checksumPos dataStart dataPos
+ * checksumStart
+ */
+ int checksumStart;
+ int dataStart;
+ int dataPos;
+ int checksumPos;
+
private static final long HEART_BEAT_SEQNO = -1L;
/**
@@ -150,7 +164,7 @@ class DFSOutputStream extends FSOutputSu
this.seqno = HEART_BEAT_SEQNO;
buffer = null;
- int packetSize = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
+ int packetSize = PacketHeader.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER; // TODO(todd) strange
buf = new byte[packetSize];
checksumStart = dataStart = packetSize;
@@ -170,7 +184,7 @@ class DFSOutputStream extends FSOutputSu
buffer = null;
buf = new byte[pktSize];
- checksumStart = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
+ checksumStart = PacketHeader.PKT_HEADER_LEN;
checksumPos = checksumStart;
dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
dataPos = dataStart;
@@ -221,20 +235,15 @@ class DFSOutputStream extends FSOutputSu
int pktLen = DFSClient.SIZE_OF_INTEGER + dataLen + checksumLen;
//normally dataStart == checksumPos, i.e., offset is zero.
- buffer = ByteBuffer.wrap(buf, dataStart - checksumPos,
- DataNode.PKT_HEADER_LEN + pktLen);
+ buffer = ByteBuffer.wrap(
+ buf, dataStart - checksumPos,
+ PacketHeader.PKT_HEADER_LEN + pktLen - DFSClient.SIZE_OF_INTEGER);
buf = null;
buffer.mark();
-
- /* write the header and data length.
- * The format is described in comment before DataNode.BlockSender
- */
- buffer.putInt(pktLen); // pktSize
- buffer.putLong(offsetInBlock);
- buffer.putLong(seqno);
- buffer.put((byte) ((lastPacketInBlock) ? 1 : 0));
- //end of pkt header
- buffer.putInt(dataLen); // actual data length, excluding checksum.
+
+ PacketHeader header = new PacketHeader(
+ pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen);
+ header.putInBuffer(buffer);
buffer.reset();
return buffer;
@@ -1110,7 +1119,7 @@ class DFSOutputStream extends FSOutputSu
private void computePacketChunkSize(int psize, int csize) {
int chunkSize = csize + checksum.getChecksumSize();
- int n = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
+ int n = PacketHeader.PKT_HEADER_LEN;
chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
packetSize = n + chunkSize*chunksPerPacket;
if (DFSClient.LOG.isDebugEnabled()) {
@@ -1212,7 +1221,7 @@ class DFSOutputStream extends FSOutputSu
// indicate the end of block and reset bytesCurBlock.
//
if (bytesCurBlock == blockSize) {
- currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
+ currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
bytesCurBlock);
currentPacket.lastPacketInBlock = true;
waitAndQueuePacket(currentPacket);
@@ -1405,7 +1414,7 @@ class DFSOutputStream extends FSOutputSu
if (bytesCurBlock != 0) {
// send an empty packet to mark the end of the block
- currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
+ currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
bytesCurBlock);
currentPacket.lastPacketInBlock = true;
}
@@ -1456,7 +1465,7 @@ class DFSOutputStream extends FSOutputSu
synchronized void setChunksPerPacket(int value) {
chunksPerPacket = Math.min(chunksPerPacket, value);
- packetSize = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER +
+ packetSize = PacketHeader.PKT_HEADER_LEN +
(checksum.getBytesPerChecksum() +
checksum.getChecksumSize()) * chunksPerPacket;
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Tue Sep 14 00:25:35 2010
@@ -26,7 +26,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -68,9 +67,9 @@ public interface ClientProtocol extends
* Compared to the previous version the following changes have been introduced:
* (Only the latest change is reflected.
* The log of historical changes can be retrieved from the svn).
- * 63: Add block pool ID to Block
+ * 65: Add block pool ID to Block
*/
- public static final long versionID = 63L;
+ public static final long versionID = 65L;
///////////////////////////////////////
// File contents
@@ -665,14 +664,6 @@ public interface ClientProtocol extends
* @throws IOException
*/
public void metaSave(String filename) throws IOException;
-
- /**
- * @return Array of FileStatus objects referring to corrupted files.
- * The server could return all or a few of the files that are corrupt.
- * @throws AccessControlException
- * @throws IOException
- */
- FileStatus[] getCorruptFiles() throws AccessControlException, IOException;
/**
* Get the file info for a specific file or directory.
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Tue Sep 14 00:25:35 2010
@@ -23,6 +23,7 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -561,4 +562,141 @@ public interface DataTransferProtocol {
return ack.toString();
}
}
+
+ /**
+ * Header data for each packet that goes through the read/write pipelines.
+ */
+ public static class PacketHeader implements Writable {
+ /** Header size for a packet */
+ public static final int PKT_HEADER_LEN = ( 4 + /* Packet payload length */
+ 8 + /* offset in block */
+ 8 + /* seqno */
+ 1 + /* isLastPacketInBlock */
+ 4 /* data length */ );
+
+ private int packetLen;
+ private long offsetInBlock;
+ private long seqno;
+ private boolean lastPacketInBlock;
+ private int dataLen;
+
+ public PacketHeader() {
+ }
+
+ public PacketHeader(int packetLen, long offsetInBlock, long seqno,
+ boolean lastPacketInBlock, int dataLen) {
+ this.packetLen = packetLen;
+ this.offsetInBlock = offsetInBlock;
+ this.seqno = seqno;
+ this.lastPacketInBlock = lastPacketInBlock;
+ this.dataLen = dataLen;
+ }
+
+ public int getDataLen() {
+ return dataLen;
+ }
+
+ public boolean isLastPacketInBlock() {
+ return lastPacketInBlock;
+ }
+
+ public long getSeqno() {
+ return seqno;
+ }
+
+ public long getOffsetInBlock() {
+ return offsetInBlock;
+ }
+
+ public int getPacketLen() {
+ return packetLen;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("PacketHeader(")
+ .append("packetLen=").append(packetLen)
+ .append(" offsetInBlock=").append(offsetInBlock)
+ .append(" seqno=").append(seqno)
+ .append(" lastPacketInBlock=").append(lastPacketInBlock)
+ .append(" dataLen=").append(dataLen)
+ .append(")");
+ return sb.toString();
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ // Note that it's important for packetLen to come first and not
+ // change format -
+ // this is used by BlockReceiver to read entire packets with
+ // a single read call.
+ packetLen = in.readInt();
+ offsetInBlock = in.readLong();
+ seqno = in.readLong();
+ lastPacketInBlock = in.readBoolean();
+ dataLen = in.readInt();
+ }
+
+ public void readFields(ByteBuffer buf) throws IOException {
+ packetLen = buf.getInt();
+ offsetInBlock = buf.getLong();
+ seqno = buf.getLong();
+ lastPacketInBlock = (buf.get() != 0);
+ dataLen = buf.getInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(packetLen);
+ out.writeLong(offsetInBlock);
+ out.writeLong(seqno);
+ out.writeBoolean(lastPacketInBlock);
+ out.writeInt(dataLen);
+ }
+
+ /**
+ * Write the header into the buffer.
+ * This requires that PKT_HEADER_LEN bytes are available.
+ */
+ public void putInBuffer(ByteBuffer buf) {
+ buf.putInt(packetLen)
+ .putLong(offsetInBlock)
+ .putLong(seqno)
+ .put((byte)(lastPacketInBlock ? 1 : 0))
+ .putInt(dataLen);
+ }
+
+ /**
+ * Perform a sanity check on the packet, returning true if it is sane.
+ * @param lastSeqNo the previous sequence number received - we expect the current
+ * sequence number to be larger by 1.
+ */
+ public boolean sanityCheck(long lastSeqNo) {
+ // We should only have a non-positive data length for the last packet
+ if (dataLen <= 0 && lastPacketInBlock) return false;
+ // The last packet should not contain data
+ if (lastPacketInBlock && dataLen != 0) return false;
+ // Seqnos should always increase by 1 with each packet received
+ if (seqno != lastSeqNo + 1) return false;
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PacketHeader)) return false;
+ PacketHeader other = (PacketHeader)o;
+ return (other.packetLen == packetLen &&
+ other.offsetInBlock == offsetInBlock &&
+ other.seqno == seqno &&
+ other.lastPacketInBlock == lastPacketInBlock &&
+ other.dataLen == dataLen);
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)seqno;
+ }
+ }
+
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java Tue Sep 14 00:25:35 2010
@@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs.security.
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Arrays;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -38,23 +37,20 @@ public class BlockTokenIdentifier extend
private long expiryDate;
private int keyId;
private String userId;
- private long [] blockIds;
+ private long blockId;
private EnumSet<AccessMode> modes;
private byte [] cache;
public BlockTokenIdentifier() {
- this(null, new long [] {}, EnumSet.noneOf(AccessMode.class));
+ this(null, 0, EnumSet.noneOf(AccessMode.class));
}
- public BlockTokenIdentifier(String userId, long [] blockIds,
+ public BlockTokenIdentifier(String userId, long blockId,
EnumSet<AccessMode> modes) {
- if(blockIds == null)
- throw new IllegalArgumentException("blockIds can't be null");
this.cache = null;
this.userId = userId;
- this.blockIds = Arrays.copyOf(blockIds, blockIds.length);
- Arrays.sort(this.blockIds);
+ this.blockId = blockId;
this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
}
@@ -66,7 +62,7 @@ public class BlockTokenIdentifier extend
@Override
public UserGroupInformation getUser() {
if (userId == null || "".equals(userId)) {
- return UserGroupInformation.createRemoteUser(Arrays.toString(blockIds));
+ return UserGroupInformation.createRemoteUser(Long.toString(blockId));
}
return UserGroupInformation.createRemoteUser(userId);
}
@@ -93,27 +89,10 @@ public class BlockTokenIdentifier extend
return userId;
}
- /**
- * Return sorted array of blockIds this {@link BlockTokenIdentifier} includes
- */
- public long [] getBlockIds() {
- return blockIds;
+ public long getBlockId() {
+ return blockId;
}
-
- /**
- * Is specified blockId included in this BlockTokenIdentifier?
- */
- public boolean isBlockIncluded(long blockId) {
- switch(blockIds.length) {
- case 1:
- return blockIds[0] == blockId;
- case 2:
- return (blockIds[0] == blockId) || (blockIds[1] == blockId);
- default:
- return Arrays.binarySearch(blockIds, blockId) >= 0;
- }
- }
-
+
public EnumSet<AccessMode> getAccessModes() {
return modes;
}
@@ -122,7 +101,7 @@ public class BlockTokenIdentifier extend
public String toString() {
return "block_token_identifier (expiryDate=" + this.getExpiryDate()
+ ", keyId=" + this.getKeyId() + ", userId=" + this.getUserId()
- + ", blockId=" + Arrays.toString(blockIds) + ", access modes="
+ + ", blockId=" + this.getBlockId() + ", access modes="
+ this.getAccessModes() + ")";
}
@@ -138,8 +117,7 @@ public class BlockTokenIdentifier extend
if (obj instanceof BlockTokenIdentifier) {
BlockTokenIdentifier that = (BlockTokenIdentifier) obj;
return this.expiryDate == that.expiryDate && this.keyId == that.keyId
- && isEqual(this.userId, that.userId)
- && Arrays.equals(this.blockIds, that.blockIds)
+ && isEqual(this.userId, that.userId) && this.blockId == that.blockId
&& isEqual(this.modes, that.modes);
}
return false;
@@ -147,18 +125,16 @@ public class BlockTokenIdentifier extend
/** {@inheritDoc} */
public int hashCode() {
- return (int) expiryDate ^ keyId ^ Arrays.hashCode(blockIds) ^ modes.hashCode()
+ return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()
^ (userId == null ? 0 : userId.hashCode());
}
public void readFields(DataInput in) throws IOException {
- cache = null;
+ this.cache = null;
expiryDate = WritableUtils.readVLong(in);
keyId = WritableUtils.readVInt(in);
userId = WritableUtils.readString(in);
- blockIds = new long[WritableUtils.readVInt(in)];
- for(int i = 0; i < blockIds.length; i++)
- blockIds[i] = WritableUtils.readVLong(in);
+ blockId = WritableUtils.readVLong(in);
int length = WritableUtils.readVInt(in);
for (int i = 0; i < length; i++) {
modes.add(WritableUtils.readEnum(in, AccessMode.class));
@@ -169,9 +145,7 @@ public class BlockTokenIdentifier extend
WritableUtils.writeVLong(out, expiryDate);
WritableUtils.writeVInt(out, keyId);
WritableUtils.writeString(out, userId);
- WritableUtils.writeVInt(out, blockIds.length);
- for(int i = 0; i < blockIds.length; i++)
- WritableUtils.writeVLong(out, blockIds[i]);
+ WritableUtils.writeVLong(out, blockId);
WritableUtils.writeVInt(out, modes.size());
for (AccessMode aMode : modes) {
WritableUtils.writeEnum(out, aMode);
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java Tue Sep 14 00:25:35 2010
@@ -177,29 +177,16 @@ public class BlockTokenSecretManager ext
/** Generate an block token for current user */
public Token<BlockTokenIdentifier> generateToken(ExtendedBlock block,
EnumSet<AccessMode> modes) throws IOException {
- return generateToken(new long [] { block.getBlockId() }, modes);
- }
-
- /** Generate a block token for a specified user */
- public Token<BlockTokenIdentifier> generateToken(String userId, Block block,
- EnumSet<AccessMode> modes) throws IOException {
- return generateToken(userId, new long [] { block.getBlockId() }, modes);
- }
-
- /** Generate a block token for the current user based on a collection
- * of blockIds
- */
- public Token<BlockTokenIdentifier> generateToken(long[] blockIds,
- EnumSet<AccessMode> modes) throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String userID = (ugi == null ? null : ugi.getShortUserName());
- return generateToken(userID, blockIds, modes);
+ return generateToken(userID, block.getLocalBlock(), modes);
}
-
- /** Generate a block token based on a collection of blockIds */
- public Token<BlockTokenIdentifier> generateToken(String userID,
- long[] blockIds, EnumSet<AccessMode> modes) {
- BlockTokenIdentifier id = new BlockTokenIdentifier(userID, blockIds, modes);
+
+ /** Generate a block token for a specified user */
+ public Token<BlockTokenIdentifier> generateToken(String userId,
+ Block block, EnumSet<AccessMode> modes) throws IOException {
+ BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
+ .getBlockId(), modes);
return new Token<BlockTokenIdentifier>(id, this);
}
@@ -226,7 +213,7 @@ public class BlockTokenSecretManager ext
throw new InvalidToken("Block token with " + id.toString()
+ " doesn't belong to user " + userId);
}
- if (!id.isBlockIncluded(block.getBlockId())) {
+ if (id.getBlockId() != block.getBlockId()) {
throw new InvalidToken("Block token with " + id.toString()
+ " doesn't apply to block " + block);
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Sep 14 00:25:35 2010
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.F
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
@@ -334,9 +335,9 @@ class BlockReceiver implements java.io.C
* calculation in DFSClient to make the guess accurate.
*/
int chunkSize = bytesPerChecksum + checksumSize;
- int chunksPerPacket = (datanode.writePacketSize - DataNode.PKT_HEADER_LEN -
- SIZE_OF_INTEGER + chunkSize - 1)/chunkSize;
- buf = ByteBuffer.allocate(DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER +
+ int chunksPerPacket = (datanode.writePacketSize - PacketHeader.PKT_HEADER_LEN
+ + chunkSize - 1)/chunkSize;
+ buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN +
Math.max(chunksPerPacket, 1) * chunkSize);
buf.limit(0);
}
@@ -365,7 +366,9 @@ class BlockReceiver implements java.io.C
payloadLen);
}
- int pktSize = payloadLen + DataNode.PKT_HEADER_LEN;
+ // Subtract SIZE_OF_INTEGER since that accounts for the payloadLen that
+ // we read above.
+ int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN - SIZE_OF_INTEGER;
if (buf.remaining() < pktSize) {
//we need to read more data
@@ -407,30 +410,31 @@ class BlockReceiver implements java.io.C
private int receivePacket() throws IOException {
// read the next packet
readNextPacket();
-
+
buf.mark();
- //read the header
- buf.getInt(); // packet length
- long offsetInBlock = buf.getLong(); // get offset of packet in block
-
- if (offsetInBlock > replicaInfo.getNumBytes()) {
+ PacketHeader header = new PacketHeader();
+ header.readFields(buf);
+ int endOfHeader = buf.position();
+ buf.reset();
+
+ // Sanity check the header
+ if (header.getOffsetInBlock() > replicaInfo.getNumBytes()) {
throw new IOException("Received an out-of-sequence packet for " + block +
- "from " + inAddr + " at offset " + offsetInBlock +
+ "from " + inAddr + " at offset " + header.getOffsetInBlock() +
". Expecting packet starting at " + replicaInfo.getNumBytes());
}
- long seqno = buf.getLong(); // get seqno
- boolean lastPacketInBlock = (buf.get() != 0);
-
- int len = buf.getInt();
- if (len < 0) {
+ if (header.getDataLen() < 0) {
throw new IOException("Got wrong length during writeBlock(" + block +
") from " + inAddr + " at offset " +
- offsetInBlock + ": " + len);
- }
- int endOfHeader = buf.position();
- buf.reset();
-
- return receivePacket(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader);
+ header.getOffsetInBlock() + ": " +
+ header.getDataLen());
+ }
+
+ return receivePacket(
+ header.getOffsetInBlock(),
+ header.getSeqno(),
+ header.isLastPacketInBlock(),
+ header.getDataLen(), endOfHeader);
}
/**
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Sep 14 00:25:35 2010
@@ -33,6 +33,7 @@ import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum;
@@ -304,15 +305,12 @@ class BlockSender implements java.io.Clo
int packetLen = len + numChunks*checksumSize + 4;
boolean lastDataPacket = offset + len == endOffset && len > 0;
pkt.clear();
-
- // write packet header
- pkt.putInt(packetLen);
- pkt.putLong(offset);
- pkt.putLong(seqno);
- pkt.put((byte)((len == 0) ? 1 : 0));
- //why no ByteBuf.putBoolean()?
- pkt.putInt(len);
-
+
+
+ PacketHeader header = new PacketHeader(
+ packetLen, offset, seqno, (len == 0), len);
+ header.putInBuffer(pkt);
+
int checksumOff = pkt.position();
int checksumLen = numChunks * checksumSize;
byte[] buf = pkt.array();
@@ -444,7 +442,7 @@ class BlockSender implements java.io.Clo
}
int maxChunksPerPacket;
- int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+ int pktSize = PacketHeader.PKT_HEADER_LEN;
if (transferToAllowed && !verifyChecksum &&
baseStream instanceof SocketOutputStream &&
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Sep 14 00:25:35 2010
@@ -218,6 +218,7 @@ public class DataNode extends Configured
private HttpServer infoServer = null;
DataNodeMetrics myMetrics;
private InetSocketAddress nameNodeAddr;
+ private InetSocketAddress nameNodeAddrForClient;
private InetSocketAddress selfAddr;
private static DataNode datanodeObject = null;
private Thread dataNodeThread = null;
@@ -315,6 +316,7 @@ public class DataNode extends Configured
conf.get("dfs.datanode.dns.nameserver","default"));
}
this.nameNodeAddr = NameNode.getServiceAddress(conf, true);
+ this.nameNodeAddrForClient = NameNode.getAddress(conf);
this.socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsConstants.READ_TIMEOUT);
@@ -586,6 +588,10 @@ public class DataNode extends Configured
return nameNodeAddr;
}
+ public InetSocketAddress getNameNodeAddrForClient() {
+ return nameNodeAddrForClient;
+ }
+
public InetSocketAddress getSelfAddr() {
return selfAddr;
}
@@ -928,6 +934,12 @@ public class DataNode extends Configured
return;
}
LOG.warn(StringUtils.stringifyException(re));
+ try {
+ long sleepTime = Math.min(1000, heartBeatInterval);
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
} catch (IOException e) {
LOG.warn(StringUtils.stringifyException(e));
}
@@ -1285,14 +1297,6 @@ public class DataNode extends Configured
Not all the fields might be used while reading.
************************************************************************ */
-
- /** Header size for a packet */
- public static final int PKT_HEADER_LEN = ( 4 + /* Packet payload length */
- 8 + /* offset in block */
- 8 + /* seqno */
- 1 /* isLastPacketInBlock */);
-
-
/**
* Used for transferring a block of data. This class
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Tue Sep 14 00:25:35 2010
@@ -91,7 +91,7 @@ public class DatanodeJspHelper {
if (namenodeInfoPortStr != null)
namenodeInfoPort = Integer.parseInt(namenodeInfoPortStr);
- DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddr(), conf);
+ DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddrForClient(), conf);
String target = dir;
final HdfsFileStatus targetStatus = dfs.getFileInfo(target);
if (targetStatus == null) { // not exists
@@ -193,7 +193,7 @@ public class DatanodeJspHelper {
JspHelper.addTableFooter(out);
}
}
- String namenodeHost = datanode.getNameNodeAddr().getHostName();
+ String namenodeHost = datanode.getNameNodeAddrForClient().getHostName();
out.print("<br><a href=\"http://"
+ InetAddress.getByName(namenodeHost).getCanonicalHostName() + ":"
+ namenodeInfoPort + "/dfshealth.jsp\">Go back to DFS home</a>");
@@ -254,7 +254,7 @@ public class DatanodeJspHelper {
}
blockSize = Long.parseLong(blockSizeStr);
- final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddr(), conf);
+ final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddrForClient(), conf);
List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
Long.MAX_VALUE).getLocatedBlocks();
// Add the various links for looking at the file contents
@@ -311,7 +311,7 @@ public class DatanodeJspHelper {
// generate a table and dump the info
out.println("\n<table>");
- String namenodeHost = datanode.getNameNodeAddr().getHostName();
+ String namenodeHost = datanode.getNameNodeAddrForClient().getHostName();
String namenodeHostName = InetAddress.getByName(namenodeHost).getCanonicalHostName();
for (LocatedBlock cur : blocks) {
@@ -379,7 +379,7 @@ public class DatanodeJspHelper {
return;
}
- final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddr(), conf);
+ final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddrForClient(), conf);
Token<BlockTokenIdentifier> blockToken = BlockTokenSecretManager.DUMMY_TOKEN;
if (conf.getBoolean(
@@ -618,7 +618,7 @@ public class DatanodeJspHelper {
+ "\">");
// fetch the block from the datanode that has the last block for this file
- final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddr(), conf);
+ final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddrForClient(), conf);
List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
Long.MAX_VALUE).getLocatedBlocks();
if (blocks == null || blocks.size() == 0) {
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java Tue Sep 14 00:25:35 2010
@@ -58,8 +58,6 @@ class FSDatasetAsyncDiskService {
private final ThreadGroup threadGroup = new ThreadGroup("async disk service");
- private ThreadFactory threadFactory;
-
private HashMap<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>();
@@ -73,15 +71,26 @@ class FSDatasetAsyncDiskService {
* @param volumes The roots of the data volumes.
*/
FSDatasetAsyncDiskService(File[] volumes) {
-
- threadFactory = new ThreadFactory() {
- public Thread newThread(Runnable r) {
- return new Thread(threadGroup, r);
- }
- };
-
+
// Create one ThreadPool per volume
for (int v = 0 ; v < volumes.length; v++) {
+ final File vol = volumes[v];
+ ThreadFactory threadFactory = new ThreadFactory() {
+ int counter = 0;
+
+ @Override
+ public Thread newThread(Runnable r) {
+ int thisIndex;
+ synchronized (this) {
+ thisIndex = counter++;
+ }
+ Thread t = new Thread(threadGroup, r);
+ t.setName("Async disk worker #" + thisIndex +
+ " for volume " + vol);
+ return t;
+ }
+ };
+
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
@@ -89,7 +98,7 @@ class FSDatasetAsyncDiskService {
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
- executors.put(volumes[v], executor);
+ executors.put(vol, executor);
}
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java Tue Sep 14 00:25:35 2010
@@ -32,8 +32,8 @@ import org.apache.hadoop.classification.
* be published as an interface.
*
* <p>
- * Data Node runtime statistic info is reported in another MBean
- * @see org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics
+ * Data Node runtime statistic info is report in another MBean
+ * @see org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeActivityMBean
*
*/
@InterfaceAudience.Private
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Tue Sep 14 00:25:35 2010
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -384,7 +383,7 @@ public class BlockManager {
}
List<LocatedBlock> getBlockLocations(BlockInfo[] blocks, long offset,
- long length, int nrBlocksToReturn) throws IOException {
+ long length, int nrBlocksToReturn, boolean needBlockToken) throws IOException {
int curBlk = 0;
long curPos = 0, blkSize = 0;
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
@@ -403,7 +402,7 @@ public class BlockManager {
long endOff = offset + length;
List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
do {
- results.add(getBlockLocation(blocks[curBlk], curPos));
+ results.add(getBlockLocation(blocks[curBlk], curPos, needBlockToken));
curPos += blocks[curBlk].getNumBytes();
curBlk++;
} while (curPos < endOff
@@ -412,13 +411,14 @@ public class BlockManager {
return results;
}
- /** @return a LocatedBlock for the given block */
- LocatedBlock getBlockLocation(final BlockInfo blk, final long pos
+ /** @param needBlockToken
+ * @return a LocatedBlock for the given block */
+ LocatedBlock getBlockLocation(final BlockInfo blk, final long pos, boolean needBlockToken
) throws IOException {
if (!blk.isComplete()) {
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
final DatanodeDescriptor[] locations = uc.getExpectedLocations();
- return namesystem.createLocatedBlock(uc, locations, pos, false);
+ return namesystem.createLocatedBlock(uc, locations, pos, false, needBlockToken);
}
// get block locations
@@ -444,7 +444,7 @@ public class BlockManager {
machines[j++] = d;
}
}
- return namesystem.createLocatedBlock(blk, machines, pos, isCorrupt);
+ return namesystem.createLocatedBlock(blk, machines, pos, isCorrupt, needBlockToken);
}
/**
@@ -1723,27 +1723,13 @@ public class BlockManager {
Long startingBlockId) {
return corruptReplicas.getCorruptReplicaBlockIds(numExpectedBlocks,
startingBlockId);
- }
-
+ }
+
/**
- * @return inodes of files with corrupt blocks, with a maximum of
- * MAX_CORRUPT_FILES_RETURNED inodes listed in total
+ * Return an iterator over the set of blocks for which there are no replicas.
*/
- INode[] getCorruptInodes() {
- LinkedHashSet<INode> set = new LinkedHashSet<INode>();
-
- for (Block blk :
- neededReplications.getQueue(
- UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS)){
- INode inode = blocksMap.getINode(blk);
- if (inode != null && countNodes(blk).liveReplicas() == 0) {
- set.add(inode);
- if (set.size() >= this.maxCorruptFilesReturned) {
- break;
- }
- }
- }
- return set.toArray(new INode[set.size()]);
+ BlockIterator getCorruptReplicaBlockIterator() {
+ return neededReplications
+ .iterator(UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
}
-
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Sep 14 00:25:35 2010
@@ -24,7 +24,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.*;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
@@ -53,6 +52,7 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
+import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -68,7 +68,6 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.ParentNotDirectoryException;
@@ -89,8 +88,8 @@ import java.io.File;
import java.io.FileWriter;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.PrintWriter;
import java.io.DataOutputStream;
+import java.io.PrintWriter;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.URI;
@@ -163,6 +162,7 @@ public class FSNamesystem implements FSC
public static final Log auditLog = LogFactory.getLog(
FSNamesystem.class.getName() + ".audit");
+ static final int DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED = 100;
static int BLOCK_DELETION_INCREMENT = 1000;
private boolean isPermissionEnabled;
private UserGroupInformation fsOwner;
@@ -801,38 +801,33 @@ public class FSNamesystem implements FSC
} else {
final long n = inode.computeFileSize(false);
final List<LocatedBlock> locatedblocks = blockManager.getBlockLocations(
- blocks, offset, length, Integer.MAX_VALUE);
+ blocks, offset, length, Integer.MAX_VALUE, needBlockToken);
final BlockInfo last = inode.getLastBlock();
if (LOG.isDebugEnabled()) {
LOG.debug("last = " + last);
}
-
- if(isBlockTokenEnabled && needBlockToken) {
- setBlockTokens(locatedblocks);
- }
-
if (last.isComplete()) {
return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
- blockManager.getBlockLocation(last, n-last.getNumBytes()), true);
+ blockManager.getBlockLocation(last, n-last.getNumBytes(), needBlockToken), true);
} else {
return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
- blockManager.getBlockLocation(last, n), false);
+ blockManager.getBlockLocation(last, n, needBlockToken), false);
}
}
}
- /** Generate a combined block token for all the blocks to be returned. */
- private void setBlockTokens(List<LocatedBlock> locatedBlocks) throws IOException {
- long [] blockIds = new long[locatedBlocks.size()];
- for(int i = 0; i < blockIds.length; i++) {
- blockIds[i] = locatedBlocks.get(i).getBlock().getBlockId();
+ /** Create a LocatedBlock.
+ * @param needBlockToken */
+ LocatedBlock createLocatedBlock(final Block b,
+ final DatanodeInfo[] locations, final long offset, final boolean corrupt,
+ boolean needBlockToken) throws IOException {
+ final ExtendedBlock blk = getExtendedBlock(b);
+ final LocatedBlock lb = new LocatedBlock(blk, locations, offset, corrupt);
+ if (isBlockTokenEnabled && needBlockToken) {
+ lb.setBlockToken(blockTokenSecretManager.generateToken(blk,
+ EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
}
-
- Token<BlockTokenIdentifier> token =
- blockTokenSecretManager.generateToken(blockIds,
- EnumSet.of(BlockTokenSecretManager.AccessMode.READ));
-
- for(LocatedBlock l : locatedBlocks) l.setBlockToken(token);
+ return lb;
}
/** Create a LocatedBlock. */
@@ -4510,37 +4505,57 @@ public class FSNamesystem implements FSC
return blockManager.getCorruptReplicaBlockIds(numExpectedBlocks,
startingBlockId);
}
-
+
+ static class CorruptFileBlockInfo {
+ String path;
+ Block block;
+
+ public CorruptFileBlockInfo(String p, Block b) {
+ path = p;
+ block = b;
+ }
+
+ public String toString() {
+ return block.getBlockName() + "\t" + path;
+ }
+ }
/**
- * @return Array of FileStatus objects representing files with
- * corrupted blocks.
+ * @param path Restrict corrupt files to this portion of namespace.
+ * @param startBlockAfter Support for continuation; the set of files we return
+ * back is ordered by blockid; startBlockAfter tells where to start from
+ * @return a list in which each entry describes a corrupt file/block
* @throws AccessControlException
* @throws IOException
*/
- synchronized FileStatus[] getCorruptFiles()
- throws AccessControlException, IOException {
-
+ synchronized Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
+ String startBlockAfter) throws AccessControlException, IOException {
+
checkSuperuserPrivilege();
-
- INode[] inodes = blockManager.getCorruptInodes();
- FileStatus[] ret = new FileStatus[inodes.length];
-
- int i = 0;
- for (INode inode: inodes) {
- String src = inode.getFullPathName();
- ret[i++] = new FileStatus(inode.computeContentSummary().getLength(),
- inode.isDirectory(),
- ((INodeFile)inode).getReplication(),
- ((INodeFile)inode).getPreferredBlockSize(),
- inode.getModificationTime(),
- inode.getAccessTime(),
- inode.getFsPermission(),
- inode.getUserName(),
- inode.getGroupName(),
- new Path(src));
+ long startBlockId = 0;
+ // print a limited # of corrupt files per call
+ int count = 0;
+ ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>();
+
+ if (startBlockAfter != null) {
+ startBlockId = Block.filename2id(startBlockAfter);
+ }
+ BlockIterator blkIterator = blockManager.getCorruptReplicaBlockIterator();
+ while (blkIterator.hasNext()) {
+ Block blk = blkIterator.next();
+ INode inode = blockManager.getINode(blk);
+ if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) {
+ String src = FSDirectory.getFullPathName(inode);
+ if (((startBlockAfter == null) || (blk.getBlockId() > startBlockId))
+ && (src.startsWith(path))) {
+ corruptFiles.add(new CorruptFileBlockInfo(src, blk));
+ count++;
+ if (count >= DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED)
+ break;
+ }
+ }
}
-
- return ret;
+ LOG.info("list corrupt file blocks returned: " + count);
+ return corruptFiles;
}
public synchronized ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Sep 14 00:25:35 2010
@@ -33,7 +33,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options;
@@ -1119,12 +1118,20 @@ public class NameNode implements Namenod
namesystem.metaSave(filename);
}
- /** {@inheritDoc} */
- public FileStatus[] getCorruptFiles()
- throws AccessControlException, IOException {
-
- return namesystem.getCorruptFiles();
-
+ /**
+ *
+ * @param path
+ * Sub-tree used in querying corrupt files
+ * @param startBlockAfter
+ * Paging support---pass in the last block returned from the previous
+ * call and some # of corrupt blocks after that point are returned
+ * @return a list in which each entry describes a corrupt file/block
+ * @throws AccessControlException
+ * @throws IOException
+ */
+ public Collection<FSNamesystem.CorruptFileBlockInfo> listCorruptFileBlocks(String path,
+ String startBlockAfter) throws AccessControlException, IOException {
+ return namesystem.listCorruptFileBlocks(path, startBlockAfter);
}
/** {@inheritDoc} */
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Tue Sep 14 00:25:35 2010
@@ -24,6 +24,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
@@ -35,8 +36,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -105,9 +104,14 @@ public class NamenodeFsck {
private boolean showBlocks = false;
private boolean showLocations = false;
private boolean showRacks = false;
- private boolean showCorruptFiles = false;
+ private boolean showCorruptFileBlocks = false;
private int fixing = FIXING_NONE;
private String path = "/";
+
+ // We return back N files that are corrupt; the list of files returned is
+ // ordered by block id; to allow continuation support, pass in the last block
+ // # from previous call
+ private String startBlockAfter = null;
private final Configuration conf;
private final PrintWriter out;
@@ -145,7 +149,12 @@ public class NamenodeFsck {
else if (key.equals("locations")) { this.showLocations = true; }
else if (key.equals("racks")) { this.showRacks = true; }
else if (key.equals("openforwrite")) {this.showOpenFiles = true; }
- else if (key.equals("corruptfiles")) {this.showCorruptFiles = true; }
+ else if (key.equals("listcorruptfileblocks")) {
+ this.showCorruptFileBlocks = true;
+ }
+ else if (key.equals("startblockafter")) {
+ this.startBlockAfter = pmap.get("startblockafter")[0];
+ }
}
}
@@ -164,8 +173,8 @@ public class NamenodeFsck {
final HdfsFileStatus file = namenode.getFileInfo(path);
if (file != null) {
- if (showCorruptFiles) {
- listCorruptFiles();
+ if (showCorruptFileBlocks) {
+ listCorruptFileBlocks();
return;
}
@@ -205,53 +214,25 @@ public class NamenodeFsck {
}
}
- static String buildSummaryResultForListCorruptFiles(int corruptFilesCount,
- String pathName) {
-
- String summary = "";
-
- if (corruptFilesCount == 0) {
- summary = "Unable to locate any corrupt files under '" + pathName
- + "'.\n\nPlease run a complete fsck to confirm if '" + pathName
- + "' " + HEALTHY_STATUS;
- } else if (corruptFilesCount == 1) {
- summary = "There is at least 1 corrupt file under '" + pathName
- + "', which " + CORRUPT_STATUS;
- } else if (corruptFilesCount > 1) {
- summary = "There are at least " + corruptFilesCount
- + " corrupt files under '" + pathName + "', which " + CORRUPT_STATUS;
+ private void listCorruptFileBlocks() throws AccessControlException,
+ IOException {
+ Collection<FSNamesystem.CorruptFileBlockInfo> corruptFiles = namenode
+ .listCorruptFileBlocks(path, startBlockAfter);
+ int numCorruptFiles = corruptFiles.size();
+ String filler;
+ if (numCorruptFiles > 0) {
+ filler = Integer.toString(numCorruptFiles);
+ } else if (startBlockAfter == null) {
+ filler = "no";
} else {
- throw new IllegalArgumentException("corruptFilesCount must be positive");
+ filler = "no more";
}
-
- return summary;
- }
-
- private void listCorruptFiles() throws AccessControlException, IOException {
- int matchedCorruptFilesCount = 0;
- // directory representation of path
- String pathdir = path.endsWith(Path.SEPARATOR) ? path : path + Path.SEPARATOR;
- FileStatus[] corruptFileStatuses = namenode.getCorruptFiles();
-
- for (FileStatus fileStatus : corruptFileStatuses) {
- String currentPath = fileStatus.getPath().toString();
- if (currentPath.startsWith(pathdir) || currentPath.equals(path)) {
- matchedCorruptFilesCount++;
-
- // print the header before listing first item
- if (matchedCorruptFilesCount == 1 ) {
- out.println("Here are a few files that may be corrupted:");
- out.println("===========================================");
- }
-
- out.println(currentPath);
- }
+ for (FSNamesystem.CorruptFileBlockInfo c : corruptFiles) {
+ out.println(c.toString());
}
-
+ out.println("\n\nThe filesystem under path '" + path + "' has " + filler
+ + " CORRUPT files");
out.println();
- out.println(buildSummaryResultForListCorruptFiles(matchedCorruptFilesCount,
- path));
-
}
private void check(String parent, HdfsFileStatus file, Result res) throws IOException {
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java Tue Sep 14 00:25:35 2010
@@ -50,7 +50,7 @@ public class StreamFile extends DfsServl
static DataNode datanode = null;
static {
if ((datanode = DataNode.getDataNode()) != null) {
- nameNodeAddr = datanode.getNameNodeAddr();
+ nameNodeAddr = datanode.getNameNodeAddrForClient();
}
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java Tue Sep 14 00:25:35 2010
@@ -195,52 +195,66 @@ class UnderReplicatedBlocks implements I
}
}
- /* returns an interator of all blocks in a given priority queue */
- public synchronized Iterable<Block> getQueue(int priority) {
- if (priority < 0 || priority >= LEVEL) {
- return null;
- }
- return priorityQueues.get(priority);
+ /* returns an iterator of all blocks in a given priority queue */
+ synchronized BlockIterator iterator(int level) {
+ return new BlockIterator(level);
}
-
+
/* return an iterator of all the under replication blocks */
public synchronized BlockIterator iterator() {
return new BlockIterator();
}
- class BlockIterator implements Iterator<Block> {
- private int level;
- private List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();
- BlockIterator()
- {
- level=0;
- for(int i=0; i<LEVEL; i++) {
- iterators.add(priorityQueues.get(i).iterator());
- }
- }
-
- private void update() {
- while(level< LEVEL-1 && !iterators.get(level).hasNext()) {
- level++;
- }
- }
-
- public Block next() {
- update();
- return iterators.get(level).next();
+ class BlockIterator implements Iterator<Block> {
+ private int level;
+ private boolean isIteratorForLevel = false;
+ private List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();
+
+ BlockIterator()
+ {
+ level=0;
+ for(int i=0; i<LEVEL; i++) {
+ iterators.add(priorityQueues.get(i).iterator());
}
-
- public boolean hasNext() {
- update();
- return iterators.get(level).hasNext();
+ }
+
+ BlockIterator(int l) {
+ level = l;
+ isIteratorForLevel = true;
+ iterators.add(priorityQueues.get(level).iterator());
+ }
+
+ private void update() {
+ if (isIteratorForLevel)
+ return;
+ while(level< LEVEL-1 && !iterators.get(level).hasNext()) {
+ level++;
}
-
- public void remove() {
+ }
+
+ public Block next() {
+ if (isIteratorForLevel)
+ return iterators.get(0).next();
+ update();
+ return iterators.get(level).next();
+ }
+
+ public boolean hasNext() {
+ if (isIteratorForLevel)
+ return iterators.get(0).hasNext();
+ update();
+ return iterators.get(level).hasNext();
+ }
+
+ public void remove() {
+ if (isIteratorForLevel)
+ iterators.get(0).remove();
+ else
iterators.get(level).remove();
- }
-
- public int getPriority() {
- return level;
+ }
+
+ public int getPriority() {
+ return level;
};
- }
+ }
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Tue Sep 14 00:25:35 2010
@@ -45,9 +45,9 @@ import org.apache.avro.reflect.Nullable;
@InterfaceAudience.Private
public interface DatanodeProtocol extends VersionedProtocol {
/**
- * 26: Add block pool ID to Block
+ * 27: Add block pool ID to Block
*/
- public static final long versionID = 26L;
+ public static final long versionID = 27L;
// error code
final static int NOTIFY = 0;
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSck.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSck.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSck.java Tue Sep 14 00:25:35 2010
@@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.Krb5AndCertsSslSocketConnector;
import org.apache.hadoop.security.SecurityUtil;
@@ -85,14 +84,16 @@ public class DFSck extends Configured im
* Print fsck usage information
*/
static void printUsage() {
- System.err.println("Usage: DFSck <path> [-list-corruptfiles | [-move | -delete | -openforwrite ] [-files [-blocks [-locations | -racks]]]] ");
+ System.err.println("Usage: DFSck <path> [-list-corruptfileblocks | " +
+ "[-move | -delete | -openforwrite] " +
+ "[-files [-blocks [-locations | -racks]]]]");
System.err.println("\t<path>\tstart checking from this path");
System.err.println("\t-move\tmove corrupted files to /lost+found");
System.err.println("\t-delete\tdelete corrupted files");
System.err.println("\t-files\tprint out files being checked");
System.err.println("\t-openforwrite\tprint out files opened for write");
- System.err.println("\t-list-corruptfiles\tprint out corrupt files up to a "+
- "maximum defined by property dfs.corruptfilesreturned.max");
+ System.err.println("\t-list-corruptfileblocks\tprint out list of missing "
+ + "blocks and files they belong to");
System.err.println("\t-blocks\tprint out block report");
System.err.println("\t-locations\tprint out locations for every block");
System.err.println("\t-racks\tprint out network topology for data-node locations");
@@ -123,6 +124,67 @@ public class DFSck extends Configured im
throw new IOException(e);
}
}
+
+ /*
+ * To get the list, we need to call iteratively until the server says
+ * there is no more left.
+ */
+ private Integer listCorruptFileBlocks(String dir, String baseUrl)
+ throws IOException {
+ int errCode = -1;
+ int numCorrupt = 0;
+ String lastBlock = null;
+ final String noCorruptLine = "has no CORRUPT files";
+ final String noMoreCorruptLine = "has no more CORRUPT files";
+ boolean allDone = false;
+ while (!allDone) {
+ final StringBuffer url = new StringBuffer(baseUrl);
+ if (lastBlock != null) {
+ url.append("&startblockafter=").append(lastBlock);
+ }
+ URL path = new URL(url.toString());
+ SecurityUtil.fetchServiceTicket(path);
+ URLConnection connection = path.openConnection();
+ InputStream stream = connection.getInputStream();
+ BufferedReader input = new BufferedReader(new InputStreamReader(
+ stream, "UTF-8"));
+ try {
+ String line = null;
+ while ((line = input.readLine()) != null) {
+ if ((line.endsWith(noCorruptLine)) ||
+ (line.endsWith(noMoreCorruptLine)) ||
+ (line.endsWith(NamenodeFsck.NONEXISTENT_STATUS))) {
+ allDone = true;
+ break;
+ }
+ if ((line.isEmpty())
+ || (line.startsWith("FSCK started by"))
+ || (line.startsWith("The filesystem under path")))
+ continue;
+ numCorrupt++;
+ if (numCorrupt == 1) {
+ System.out.println("The list of corrupt files under path '"
+ + dir + "' are:");
+ }
+ System.out.println(line);
+ try {
+ // Get the block # that we need to send in next call
+ lastBlock = line.split("\t")[0];
+ } catch (Exception e) {
+ allDone = true;
+ break;
+ }
+ }
+ } finally {
+ input.close();
+ }
+ }
+ System.out.println("The filesystem under path '" + dir + "' has "
+ + numCorrupt + " CORRUPT files");
+ if (numCorrupt == 0)
+ errCode = 0;
+ return errCode;
+ }
private int doWork(final String[] args) throws IOException {
String proto = "http://";
@@ -141,15 +203,22 @@ public class DFSck extends Configured im
if (!args[idx].startsWith("-")) { dir = args[idx]; break; }
}
url.append(URLEncoder.encode(dir, "UTF-8"));
+ boolean doListCorruptFileBlocks = false;
for (int idx = 0; idx < args.length; idx++) {
if (args[idx].equals("-move")) { url.append("&move=1"); }
else if (args[idx].equals("-delete")) { url.append("&delete=1"); }
else if (args[idx].equals("-files")) { url.append("&files=1"); }
else if (args[idx].equals("-openforwrite")) { url.append("&openforwrite=1"); }
- else if (args[idx].equals("-list-corruptfiles")) { url.append("&corruptfiles=1"); }
else if (args[idx].equals("-blocks")) { url.append("&blocks=1"); }
else if (args[idx].equals("-locations")) { url.append("&locations=1"); }
else if (args[idx].equals("-racks")) { url.append("&racks=1"); }
+ else if (args[idx].equals("-list-corruptfileblocks")) {
+ url.append("&listcorruptfileblocks=1");
+ doListCorruptFileBlocks = true;
+ }
+ }
+ if (doListCorruptFileBlocks) {
+ return listCorruptFileBlocks(dir, url.toString());
}
URL path = new URL(url.toString());
SecurityUtil.fetchServiceTicket(path);
Propchange: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 14 00:25:35 2010
@@ -2,4 +2,4 @@
/hadoop/core/trunk/src/test/hdfs:776175-785643
/hadoop/hdfs/branches/HDFS-265/src/test/hdfs:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/test/hdfs:820487
-/hadoop/hdfs/trunk/src/test/hdfs:987665-992489
+/hadoop/hdfs/trunk/src/test/hdfs:987665-996725
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=996727&r1=996726&r2=996727&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Tue Sep 14 00:25:35 2010
@@ -19,11 +19,13 @@ package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.READ_BLOCK;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.WRITE_BLOCK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -32,6 +34,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.nio.ByteBuffer;
import java.util.Random;
import junit.framework.TestCase;
@@ -148,14 +151,16 @@ public class TestDataTransferProtocol ex
throws IOException {
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512); // checksum size
- sendOut.writeInt(8); // size of packet
- sendOut.writeLong(block.getNumBytes()); // OffsetInBlock
- sendOut.writeLong(100); // sequencenumber
- sendOut.writeBoolean(true); // lastPacketInBlock
- sendOut.writeInt(0); // chunk length
+ PacketHeader hdr = new PacketHeader(
+ 8, // size of packet
+ block.getNumBytes(), // OffsetInBlock
+ 100, // sequencenumber
+ true, // lastPacketInBlock
+ 0); // chunk length
+ hdr.write(sendOut);
sendOut.writeInt(0); // zero checksum
-
+
//ok finally write a block with 0 len
SUCCESS.write(recvOut);
Text.writeString(recvOut, "");
@@ -373,13 +378,15 @@ public class TestDataTransferProtocol ex
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512);
- sendOut.writeInt(4); // size of packet
- sendOut.writeLong(0); // OffsetInBlock
- sendOut.writeLong(100); // sequencenumber
- sendOut.writeBoolean(false); // lastPacketInBlock
-
- // bad data chunk length
- sendOut.writeInt(-1-random.nextInt(oneMil));
+
+ PacketHeader hdr = new PacketHeader(
+ 4, // size of packet
+ 0, // offset in block,
+ 100, // seqno
+ false, // last packet
+ -1 - random.nextInt(oneMil)); // bad datalen
+ hdr.write(sendOut);
+
SUCCESS.write(recvOut);
Text.writeString(recvOut, "");
new PipelineAck(100, new Status[]{ERROR}).write(recvOut);
@@ -395,12 +402,14 @@ public class TestDataTransferProtocol ex
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512); // checksum size
- sendOut.writeInt(8); // size of packet
- sendOut.writeLong(0); // OffsetInBlock
- sendOut.writeLong(100); // sequencenumber
- sendOut.writeBoolean(true); // lastPacketInBlock
- sendOut.writeInt(0); // chunk length
+ hdr = new PacketHeader(
+ 8, // size of packet
+ 0, // OffsetInBlock
+ 100, // sequencenumber
+ true, // lastPacketInBlock
+ 0); // chunk length
+ hdr.write(sendOut);
sendOut.writeInt(0); // zero checksum
sendOut.flush();
//ok finally write a block with 0 len
@@ -497,4 +506,39 @@ public class TestDataTransferProtocol ex
cluster.shutdown();
}
}
+
+ @Test
+ public void testPacketHeader() throws IOException {
+ PacketHeader hdr = new PacketHeader(
+ 4, // size of packet
+ 1024, // OffsetInBlock
+ 100, // sequencenumber
+ false, // lastPacketInBlock
+ 4096); // chunk length
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ hdr.write(new DataOutputStream(baos));
+
+ // Read back using DataInput
+ PacketHeader readBack = new PacketHeader();
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ readBack.readFields(new DataInputStream(bais));
+ assertEquals(hdr, readBack);
+
+ // Read back using ByteBuffer
+ readBack = new PacketHeader();
+ readBack.readFields(ByteBuffer.wrap(baos.toByteArray()));
+ assertEquals(hdr, readBack);
+
+ // Test sanity check for good header
+ PacketHeader goodHeader = new PacketHeader(
+ 4, // size of packet
+ 0, // OffsetInBlock
+ 100, // sequencenumber
+ true, // lastPacketInBlock
+ 0); // chunk length
+
+ assertTrue(hdr.sanityCheck(99));
+ assertFalse(hdr.sanityCheck(100));
+ }
}