You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2011/10/06 03:17:05 UTC
svn commit: r1179484 [1/2] - in
/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/
src/main/java/org/apache/hadoop/hdfs/server/datano...
Author: atm
Date: Thu Oct 6 01:16:48 2011
New Revision: 1179484
URL: http://svn.apache.org/viewvc?rev=1179484&view=rev
Log:
Merging trunk to HDFS-1623 branch.
Added:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
- copied unchanged from r1179483, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/deb/init.d/hadoop-secondarynamenode
- copied unchanged from r1179483, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/deb/init.d/hadoop-secondarynamenode
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/rpm/init.d/hadoop-secondarynamenode
- copied unchanged from r1179483, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/rpm/init.d/hadoop-secondarynamenode
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/ (props changed)
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed)
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/native/ (props changed)
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/deb/hadoop.control/preinst
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/rpm/spec/hadoop-hdfs.spec
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/ (props changed)
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/ (props changed)
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/ (props changed)
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/ (props changed)
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java
Propchange: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 6 01:16:48 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:1152502-1177128
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:1152502-1179483
/hadoop/core/branches/branch-0.19/hdfs:713112
/hadoop/hdfs/branches/HDFS-1052:987665-1095512
/hadoop/hdfs/branches/HDFS-265:796829-820463
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Oct 6 01:16:48 2011
@@ -19,9 +19,13 @@ Trunk (unreleased changes)
HDFS-2340. Support getFileBlockLocations and getDelegationToken in webhdfs.
(szetszwo)
+ HDFS-2348. Support getContentSummary and getFileChecksum in webhdfs.
+ (szetszwo)
+
IMPROVEMENTS
- HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
+ HADOOP-7524 Change RPC to allow multiple protocols including multuple
+ versions of the same protocol (sanjay Radia)
HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->
HdfsConstants. (Harsh J Chouraria via atm)
@@ -50,6 +54,13 @@ Trunk (unreleased changes)
HDFS-2355. Federation: enable using the same configuration file across
all the nodes in the cluster. (suresh)
+ HDFS-2371. Refactor BlockSender.java for better readability. (suresh)
+
+ HDFS-2158. Add JournalSet to manage the set of journals. (jitendra)
+
+ HDFS-2395. Add a root element in the JSON responses of webhdfs.
+ (szetszwo)
+
BUG FIXES
HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)
@@ -83,6 +94,9 @@ Trunk (unreleased changes)
HDFS-2361. hftp is broken, fixed username checks in JspHelper. (jitendra)
+ HDFS-2298. Fix TestDfsOverAvroRpc by changing ClientProtocol to
+ not include multiple methods of the same name. (cutting)
+
Release 0.23.0 - Unreleased
INCOMPATIBLE CHANGES
@@ -1145,6 +1159,39 @@ Release 0.23.0 - Unreleased
(todd)
HDFS-2027. Image inspector should return finalized logs before unfinalized
logs. (todd)
+ HDFS-2074. Determine edit log validity by truly reading and validating
+ transactions. (todd)
+ HDFS-2085. Finalize in-progress edit logs at startup. (todd)
+ HDFS-2026. SecondaryNameNode should properly handle the case where the
+ NameNode is reformatted. (todd)
+ HDFS-2077. Address checkpoint upload when one of the storage dirs is failed
+ (todd)
+ HDFS-2078. NameNode should not clear directory when restoring removed
+ storage. (todd)
+ HDFS-2088. Move edits log archiving logic into FSEditLog/JournalManager
+ (todd)
+ HDFS-2093. Handle case where an entirely empty log is left during NN crash
+ (todd)
+ HDFS-2102. Zero-pad edits filename to make them lexically sortable. (Ivan
+ Kelly via todd)
+ HDFS-2010. Fix NameNode to exit if all edit streams become inaccessible.
+ (atm via todd)
+ HDFS-2123. Checkpoint interval should be based on txn count, not size.
+ (todd)
+ HDFS-1979. Fix backupnode for new edits/image layout. (todd)
+ HDFS-2101. Fix remaining unit tests for new storage filenames. (todd)
+ HDFS-2133. Address remaining TODOs and pre-merge cleanup on HDFS-1073
+ branch. (todd)
+ HDFS-1780. Reduce need to rewrite FSImage on startup. (todd)
+ HDFS-2104. Add a flag to the 2NN to format its checkpoint dirs on startup.
+ (todd)
+ HDFS-2135. Fix regression of HDFS-1955 in HDFS-1073 branch. (todd)
+ HDFS-2160. Fix CreateEditsLog test tool in HDFS-1073 branch. (todd)
+ HDFS-2168. Reenable TestEditLog.testFailedOpen and fix exposed bug. (todd)
+ HDFS-2169. Clean up TestCheckpoint and remove TODOs (todd)
+ HDFS-2170. Address remaining TODOs in HDFS-1073 branch. (todd)
+ HDFS-2172. Address findbugs and javadoc warnings in HDFS-1073 branch.
+ (todd)
Release 0.22.0 - Unreleased
Propchange: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 6 01:16:48 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:1159757-1177128
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:1159757-1179483
/hadoop/core/branches/branch-0.19/hdfs/src/java:713112
/hadoop/core/branches/branch-0.19/hdfs/src/main/java:713112
/hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Thu Oct 6 01:16:48 2011
@@ -874,13 +874,13 @@ public class DFSClient implements java.i
}
/**
* Rename file or directory.
- * @see ClientProtocol#rename(String, String, Options.Rename...)
+ * @see ClientProtocol#rename2(String, String, Options.Rename...)
*/
public void rename(String src, String dst, Options.Rename... options)
throws IOException {
checkOpen();
try {
- namenode.rename(src, dst, options);
+ namenode.rename2(src, dst, options);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
DSQuotaExceededException.class,
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Thu Oct 6 01:16:48 2011
@@ -67,9 +67,9 @@ public interface ClientProtocol extends
* Compared to the previous version the following changes have been introduced:
* (Only the latest change is reflected.
* The log of historical changes can be retrieved from the svn).
- * 68: Add Balancer Bandwidth Command protocol
+ * 69: Eliminate overloaded method names.
*/
- public static final long versionID = 68L;
+ public static final long versionID = 69L;
///////////////////////////////////////
// File contents
@@ -419,7 +419,7 @@ public interface ClientProtocol extends
* <code>dst</code> contains a symlink
* @throws IOException If an I/O error occurred
*/
- public void rename(String src, String dst, Options.Rename... options)
+ public void rename2(String src, String dst, Options.Rename... options)
throws AccessControlException, DSQuotaExceededException,
FileAlreadyExistsException, FileNotFoundException,
NSQuotaExceededException, ParentNotDirectoryException, SafeModeException,
@@ -428,21 +428,6 @@ public interface ClientProtocol extends
/**
* Delete the given file or directory from the file system.
* <p>
- * Any blocks belonging to the deleted files will be garbage-collected.
- *
- * @param src existing name.
- * @return true only if the existing file or directory was actually removed
- * from the file system.
- * @throws UnresolvedLinkException if <code>src</code> contains a symlink.
- * @deprecated use {@link #delete(String, boolean)} istead.
- */
- @Deprecated
- public boolean delete(String src)
- throws IOException, UnresolvedLinkException;
-
- /**
- * Delete the given file or directory from the file system.
- * <p>
* same as delete but provides a way to avoid accidentally
* deleting non empty directories programmatically.
* @param src existing name
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Thu Oct 6 01:16:48 2011
@@ -404,7 +404,7 @@ class BlockPoolSliceScanner {
adjustThrottler();
blockSender = new BlockSender(block, 0, -1, false, false, true,
- datanode);
+ datanode, null);
DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream());
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu Oct 6 01:16:48 2011
@@ -41,191 +41,230 @@ import org.apache.hadoop.util.DataChecks
/**
* Reads a block from the disk and sends it to a recipient.
+ *
+ * Data sent from the BlockeSender in the following format:
+ * <br><b>Data format:</b> <pre>
+ * +--------------------------------------------------+
+ * | ChecksumHeader | Sequence of data PACKETS... |
+ * +--------------------------------------------------+
+ * </pre>
+ * <b>ChecksumHeader format:</b> <pre>
+ * +--------------------------------------------------+
+ * | 1 byte CHECKSUM_TYPE | 4 byte BYTES_PER_CHECKSUM |
+ * +--------------------------------------------------+
+ * </pre>
+ * An empty packet is sent to mark the end of block and read completion.
+ *
+ * PACKET Contains a packet header, checksum and data. Amount of data
+ * carried is set by BUFFER_SIZE.
+ * <pre>
+ * +-----------------------------------------------------+
+ * | 4 byte packet length (excluding packet header) |
+ * +-----------------------------------------------------+
+ * | 8 byte offset in the block | 8 byte sequence number |
+ * +-----------------------------------------------------+
+ * | 1 byte isLastPacketInBlock |
+ * +-----------------------------------------------------+
+ * | 4 byte Length of actual data |
+ * +-----------------------------------------------------+
+ * | x byte checksum data. x is defined below |
+ * +-----------------------------------------------------+
+ * | actual data ...... |
+ * +-----------------------------------------------------+
+ *
+ * Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
+ * A checksum is calculated for each chunk.
+ *
+ * x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
+ * CHECKSUM_SIZE
+ *
+ * CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
+ * </pre>
+ *
+ * The client reads data until it receives a packet with
+ * "LastPacketInBlock" set to true or with a zero length. If there is
+ * no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
+ * <pre>
+ * +------------------------------+
+ * | 2 byte OP_STATUS_CHECKSUM_OK |
+ * +------------------------------+
+ * </pre>
*/
class BlockSender implements java.io.Closeable {
- public static final Log LOG = DataNode.LOG;
+ static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
-
- private ExtendedBlock block; // the block to read from
-
- /** the replica to read from */
- private final Replica replica;
- /** The visible length of a replica. */
- private final long replicaVisibleLength;
-
- private static final boolean is32Bit = System.getProperty("sun.arch.data.model").equals("32");
-
- private InputStream blockIn; // data stream
- private long blockInPosition = -1; // updated while using transferTo().
- private DataInputStream checksumIn; // checksum datastream
- private DataChecksum checksum; // checksum stream
- private long offset; // starting position to read
- private long endOffset; // ending position
- private int bytesPerChecksum; // chunk size
- private int checksumSize; // checksum size
- private boolean corruptChecksumOk; // if need to verify checksum
- private boolean chunkOffsetOK; // if need to send chunk offset
- private long seqno; // sequence number of packet
-
- private boolean transferToAllowed = true;
- // set once entire requested byte range has been sent to the client
- private boolean sentEntireByteRange;
- private boolean verifyChecksum; //if true, check is verified while reading
- private DataTransferThrottler throttler;
- private final String clientTraceFmt; // format of client trace log message
-
+ private static final boolean is32Bit =
+ System.getProperty("sun.arch.data.model").equals("32");
/**
* Minimum buffer used while sending data to clients. Used only if
* transferTo() is enabled. 64KB is not that large. It could be larger, but
* not sure if there will be much more improvement.
*/
private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
+ private static final int TRANSFERTO_BUFFER_SIZE = Math.max(
+ HdfsConstants.IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO);
+
+ /** the block to read from */
+ private final ExtendedBlock block;
+ /** the replica to read from */
+ private final Replica replica;
+ /** The visible length of a replica. */
+ private final long replicaVisibleLength;
+ /** Stream to read block data from */
+ private InputStream blockIn;
+ /** updated while using transferTo() */
+ private long blockInPosition = -1;
+ /** Stream to read checksum */
+ private DataInputStream checksumIn;
+ /** Checksum utility */
+ private final DataChecksum checksum;
+ /** Starting position to read */
+ private long offset;
+ /** Position of last byte to read from block file */
+ private final long endOffset;
+ /** Number of bytes in chunk used for computing checksum */
+ private final int chunkSize;
+ /** Number bytes of checksum computed for a chunk */
+ private final int checksumSize;
+ /** If true, failure to read checksum is ignored */
+ private final boolean corruptChecksumOk;
+ /** true if chunk offset is needed to be sent in Checksum header */
+ private final boolean chunkOffsetOK;
+ /** Sequence number of packet being sent */
+ private long seqno;
+ /** Set to true if transferTo is allowed for sending data to the client */
+ private final boolean transferToAllowed;
+ /** Set to true once entire requested byte range has been sent to the client */
+ private boolean sentEntireByteRange;
+ /** When true, verify checksum while reading from checksum file */
+ private final boolean verifyChecksum;
+ /** Format used to print client trace log messages */
+ private final String clientTraceFmt;
private volatile ChunkChecksum lastChunkChecksum = null;
-
- BlockSender(ExtendedBlock block, long startOffset, long length,
- boolean corruptChecksumOk, boolean chunkOffsetOK,
- boolean verifyChecksum, DataNode datanode) throws IOException {
- this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK,
- verifyChecksum, datanode, null);
- }
-
+ /**
+ * Constructor
+ *
+ * @param block Block that is being read
+ * @param startOffset starting offset to read from
+ * @param length length of data to read
+ * @param corruptChecksumOk
+ * @param chunkOffsetOK need to send check offset in checksum header
+ * @param verifyChecksum verify checksum while reading the data
+ * @param datanode datanode from which the block is being read
+ * @param clientTraceFmt format string used to print client trace logs
+ * @throws IOException
+ */
BlockSender(ExtendedBlock block, long startOffset, long length,
boolean corruptChecksumOk, boolean chunkOffsetOK,
boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
throws IOException {
try {
this.block = block;
+ this.chunkOffsetOK = chunkOffsetOK;
+ this.corruptChecksumOk = corruptChecksumOk;
+ this.verifyChecksum = verifyChecksum;
+ this.clientTraceFmt = clientTraceFmt;
+
synchronized(datanode.data) {
- this.replica = datanode.data.getReplica(block.getBlockPoolId(),
- block.getBlockId());
- if (replica == null) {
- throw new ReplicaNotFoundException(block);
- }
+ this.replica = getReplica(block, datanode);
this.replicaVisibleLength = replica.getVisibleLength();
}
- long minEndOffset = startOffset + length;
- // if this is a write in progress
+ // if there is a write in progress
ChunkChecksum chunkChecksum = null;
if (replica instanceof ReplicaBeingWritten) {
- for (int i = 0; i < 30 && replica.getBytesOnDisk() < minEndOffset; i++) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException ie) {
- throw new IOException(ie);
- }
- }
-
- long currentBytesOnDisk = replica.getBytesOnDisk();
-
- if (currentBytesOnDisk < minEndOffset) {
- throw new IOException(String.format(
- "need %d bytes, but only %d bytes available",
- minEndOffset,
- currentBytesOnDisk
- ));
- }
-
+ long minEndOffset = startOffset + length;
+ waitForMinLength((ReplicaBeingWritten)replica, minEndOffset);
ReplicaInPipeline rip = (ReplicaInPipeline) replica;
chunkChecksum = rip.getLastChecksumAndDataLen();
}
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
- throw new IOException(
- "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+ throw new IOException("Replica gen stamp < block genstamp, block="
+ block + ", replica=" + replica);
}
if (replicaVisibleLength < 0) {
- throw new IOException("The replica is not readable, block="
+ throw new IOException("Replica is not readable, block="
+ block + ", replica=" + replica);
}
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("block=" + block + ", replica=" + replica);
}
-
- this.chunkOffsetOK = chunkOffsetOK;
- this.corruptChecksumOk = corruptChecksumOk;
- this.verifyChecksum = verifyChecksum;
// transferToFully() fails on 32 bit platforms for block sizes >= 2GB,
// use normal transfer in those cases
this.transferToAllowed = datanode.transferToAllowed &&
- (!is32Bit || length < (long) Integer.MAX_VALUE);
- this.clientTraceFmt = clientTraceFmt;
+ (!is32Bit || length <= Integer.MAX_VALUE);
- if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
+ DataChecksum csum;
+ if (!corruptChecksumOk || datanode.data.metaFileExists(block)) {
checksumIn = new DataInputStream(new BufferedInputStream(datanode.data
.getMetaDataInputStream(block), HdfsConstants.IO_FILE_BUFFER_SIZE));
// read and handle the common header here. For now just a version
- BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
- short version = header.getVersion();
-
+ BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
+ short version = header.getVersion();
if (version != FSDataset.METADATA_VERSION) {
LOG.warn("Wrong version (" + version + ") for metadata file for "
+ block + " ignoring ...");
}
- checksum = header.getChecksum();
+ csum = header.getChecksum();
} else {
LOG.warn("Could not find metadata file for " + block);
// This only decides the buffer size. Use BUFFER_SIZE?
- checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
+ csum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
16 * 1024);
}
- /* If bytesPerChecksum is very large, then the metadata file
- * is mostly corrupted. For now just truncate bytesPerchecksum to
- * blockLength.
- */
- bytesPerChecksum = checksum.getBytesPerChecksum();
- if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > replicaVisibleLength) {
- checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
+ /*
+ * If chunkSize is very large, then the metadata file is mostly
+ * corrupted. For now just truncate bytesPerchecksum to blockLength.
+ */
+ int size = csum.getBytesPerChecksum();
+ if (size > 10*1024*1024 && size > replicaVisibleLength) {
+ csum = DataChecksum.newDataChecksum(csum.getChecksumType(),
Math.max((int)replicaVisibleLength, 10*1024*1024));
- bytesPerChecksum = checksum.getBytesPerChecksum();
+ size = csum.getBytesPerChecksum();
}
+ chunkSize = size;
+ checksum = csum;
checksumSize = checksum.getChecksumSize();
-
- if (length < 0) {
- length = replicaVisibleLength;
- }
+ length = length < 0 ? replicaVisibleLength : length;
// end is either last byte on disk or the length for which we have a
// checksum
- if (chunkChecksum != null) {
- endOffset = chunkChecksum.getDataLength();
- } else {
- endOffset = replica.getBytesOnDisk();
- }
-
- if (startOffset < 0 || startOffset > endOffset
- || (length + startOffset) > endOffset) {
+ long end = chunkChecksum != null ? chunkChecksum.getDataLength()
+ : replica.getBytesOnDisk();
+ if (startOffset < 0 || startOffset > end
+ || (length + startOffset) > end) {
String msg = " Offset " + startOffset + " and length " + length
- + " don't match block " + block + " ( blockLen " + endOffset + " )";
+ + " don't match block " + block + " ( blockLen " + end + " )";
LOG.warn(datanode.getDNRegistrationForBP(block.getBlockPoolId()) +
":sendBlock() : " + msg);
throw new IOException(msg);
}
- offset = (startOffset - (startOffset % bytesPerChecksum));
+ // Ensure read offset is position at the beginning of chunk
+ offset = startOffset - (startOffset % chunkSize);
if (length >= 0) {
- // Make sure endOffset points to end of a checksumed chunk.
+ // Ensure endOffset points to end of chunk.
long tmpLen = startOffset + length;
- if (tmpLen % bytesPerChecksum != 0) {
- tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
+ if (tmpLen % chunkSize != 0) {
+ tmpLen += (chunkSize - tmpLen % chunkSize);
}
- if (tmpLen < endOffset) {
+ if (tmpLen < end) {
// will use on-disk checksum here since the end is a stable chunk
- endOffset = tmpLen;
+ end = tmpLen;
} else if (chunkChecksum != null) {
- //in last chunk which is changing. flag that we need to use in-memory
- // checksum
+ // last chunk is changing. flag that we need to use in-memory checksum
this.lastChunkChecksum = chunkChecksum;
}
}
+ endOffset = end;
// seek to the right offsets
if (offset > 0) {
- long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
+ long checksumSkip = (offset / chunkSize) * checksumSize;
// note blockInStream is seeked when created below
if (checksumSkip > 0) {
// Should we use seek() for checksum file as well?
@@ -237,7 +276,6 @@ class BlockSender implements java.io.Clo
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("replica=" + replica);
}
-
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
} catch (IOException ioe) {
IOUtils.closeStream(this);
@@ -251,19 +289,17 @@ class BlockSender implements java.io.Clo
*/
public void close() throws IOException {
IOException ioe = null;
- // close checksum file
if(checksumIn!=null) {
try {
- checksumIn.close();
+ checksumIn.close(); // close checksum file
} catch (IOException e) {
ioe = e;
}
checksumIn = null;
- }
- // close data file
+ }
if(blockIn!=null) {
try {
- blockIn.close();
+ blockIn.close(); // close data file
} catch (IOException e) {
ioe = e;
}
@@ -274,7 +310,41 @@ class BlockSender implements java.io.Clo
throw ioe;
}
}
-
+
+ private static Replica getReplica(ExtendedBlock block, DataNode datanode)
+ throws ReplicaNotFoundException {
+ Replica replica = datanode.data.getReplica(block.getBlockPoolId(),
+ block.getBlockId());
+ if (replica == null) {
+ throw new ReplicaNotFoundException(block);
+ }
+ return replica;
+ }
+
+ /**
+ * Wait for rbw replica to reach the length
+ * @param rbw replica that is being written to
+ * @param len minimum length to reach
+ * @throws IOException on failing to reach the len in given wait time
+ */
+ private static void waitForMinLength(ReplicaBeingWritten rbw, long len)
+ throws IOException {
+ // Wait for 3 seconds for rbw replica to reach the minimum length
+ for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; i++) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ }
+ long bytesOnDisk = rbw.getBytesOnDisk();
+ if (bytesOnDisk < len) {
+ throw new IOException(
+ String.format("Need %d bytes, but only %d bytes available", len,
+ bytesOnDisk));
+ }
+ }
+
/**
* Converts an IOExcpetion (not subclasses) to SocketException.
* This is typically done to indicate to upper layers that the error
@@ -296,54 +366,43 @@ class BlockSender implements java.io.Clo
}
/**
- * Sends upto maxChunks chunks of data.
+ * @param datalen Length of data
+ * @return number of chunks for data of given size
+ */
+ private int numberOfChunks(long datalen) {
+ return (int) ((datalen + chunkSize - 1)/chunkSize);
+ }
+
+ /**
+ * Sends a packet with up to maxChunks chunks of data.
*
- * When blockInPosition is >= 0, assumes 'out' is a
- * {@link SocketOutputStream} and tries
- * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
- * send data (and updates blockInPosition).
- */
- private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out)
- throws IOException {
- // Sends multiple chunks in one packet with a single write().
-
- int len = (int) Math.min(endOffset - offset,
- (((long) bytesPerChecksum) * ((long) maxChunks)));
- int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
- int packetLen = len + numChunks*checksumSize + 4;
- boolean lastDataPacket = offset + len == endOffset && len > 0;
- pkt.clear();
-
+ * @param pkt buffer used for writing packet data
+ * @param maxChunks maximum number of chunks to send
+ * @param out stream to send data to
+ * @param transferTo use transferTo to send data
+ * @param throttler used for throttling data transfer bandwidth
+ */
+ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
+ boolean transferTo, DataTransferThrottler throttler) throws IOException {
+ int dataLen = (int) Math.min(endOffset - offset,
+ (chunkSize * (long) maxChunks));
+
+ int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the packet
+ int checksumDataLen = numChunks * checksumSize;
+ int packetLen = dataLen + checksumDataLen + 4;
+ boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
- PacketHeader header = new PacketHeader(
- packetLen, offset, seqno, (len == 0), len);
- header.putInBuffer(pkt);
+ writePacketHeader(pkt, dataLen, packetLen);
int checksumOff = pkt.position();
- int checksumLen = numChunks * checksumSize;
byte[] buf = pkt.array();
if (checksumSize > 0 && checksumIn != null) {
- try {
- checksumIn.readFully(buf, checksumOff, checksumLen);
- } catch (IOException e) {
- LOG.warn(" Could not read or failed to veirfy checksum for data"
- + " at offset " + offset + " for block " + block, e);
- IOUtils.closeStream(checksumIn);
- checksumIn = null;
- if (corruptChecksumOk) {
- if (checksumOff < checksumLen) {
- // Just fill the array with zeros.
- Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
- }
- } else {
- throw e;
- }
- }
+ readChecksum(buf, checksumOff, checksumDataLen);
// write in progress that we need to use to get last checksum
if (lastDataPacket && lastChunkChecksum != null) {
- int start = checksumOff + checksumLen - checksumSize;
+ int start = checksumOff + checksumDataLen - checksumSize;
byte[] updatedChecksum = lastChunkChecksum.getChecksum();
if (updatedChecksum != null) {
@@ -352,52 +411,28 @@ class BlockSender implements java.io.Clo
}
}
- int dataOff = checksumOff + checksumLen;
-
- if (blockInPosition < 0) {
- //normal transfer
- IOUtils.readFully(blockIn, buf, dataOff, len);
+ int dataOff = checksumOff + checksumDataLen;
+ if (!transferTo) { // normal transfer
+ IOUtils.readFully(blockIn, buf, dataOff, dataLen);
if (verifyChecksum) {
- int dOff = dataOff;
- int cOff = checksumOff;
- int dLeft = len;
-
- for (int i=0; i<numChunks; i++) {
- checksum.reset();
- int dLen = Math.min(dLeft, bytesPerChecksum);
- checksum.update(buf, dOff, dLen);
- if (!checksum.compare(buf, cOff)) {
- long failedPos = offset + len -dLeft;
- throw new ChecksumException("Checksum failed at " +
- failedPos, failedPos);
- }
- dLeft -= dLen;
- dOff += dLen;
- cOff += checksumSize;
- }
+ verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
}
- //writing is done below (mainly to handle IOException)
}
try {
- if (blockInPosition >= 0) {
- //use transferTo(). Checks on out and blockIn are already done.
-
+ if (transferTo) {
SocketOutputStream sockOut = (SocketOutputStream)out;
- //first write the packet
- sockOut.write(buf, 0, dataOff);
+ sockOut.write(buf, 0, dataOff); // First write checksum
+
// no need to flush. since we know out is not a buffered stream.
-
sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),
- blockInPosition, len);
-
- blockInPosition += len;
- } else {
+ blockInPosition, dataLen);
+ blockInPosition += dataLen;
+ } else {
// normal transfer
- out.write(buf, 0, dataOff + len);
+ out.write(buf, 0, dataOff + dataLen);
}
-
} catch (IOException e) {
/* Exception while writing to the client. Connection closure from
* the other end is mostly the case and we do not care much about
@@ -419,9 +454,72 @@ class BlockSender implements java.io.Clo
throttler.throttle(packetLen);
}
- return len;
+ return dataLen;
}
-
+
+ /**
+ * Read checksum into given buffer
+ * @param buf buffer to read the checksum into
+ * @param checksumOffset offset at which to write the checksum into buf
+ * @param checksumLen length of checksum to write
+ * @throws IOException on error
+ */
+ private void readChecksum(byte[] buf, final int checksumOffset,
+ final int checksumLen) throws IOException {
+ if (checksumSize <= 0 && checksumIn == null) {
+ return;
+ }
+ try {
+ checksumIn.readFully(buf, checksumOffset, checksumLen);
+ } catch (IOException e) {
+ LOG.warn(" Could not read or failed to veirfy checksum for data"
+ + " at offset " + offset + " for block " + block, e);
+ IOUtils.closeStream(checksumIn);
+ checksumIn = null;
+ if (corruptChecksumOk) {
+ if (checksumOffset < checksumLen) {
+ // Just fill the array with zeros.
+ Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
+ }
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Compute checksum for chunks and verify the checksum that is read from
+ * the metadata file is correct.
+ *
+ * @param buf buffer that has checksum and data
+ * @param dataOffset position where data is written in the buf
+ * @param datalen length of data
+ * @param numChunks number of chunks corresponding to data
+ * @param checksumOffset offset where checksum is written in the buf
+ * @throws ChecksumException on failed checksum verification
+ */
+ public void verifyChecksum(final byte[] buf, final int dataOffset,
+ final int datalen, final int numChunks, final int checksumOffset)
+ throws ChecksumException {
+ int dOff = dataOffset;
+ int cOff = checksumOffset;
+ int dLeft = datalen;
+
+ for (int i = 0; i < numChunks; i++) {
+ checksum.reset();
+ int dLen = Math.min(dLeft, chunkSize);
+ checksum.update(buf, dOff, dLen);
+ if (!checksum.compare(buf, cOff)) {
+ long failedPos = offset + datalen - dLeft;
+ throw new ChecksumException("Checksum failed at " + failedPos,
+ failedPos);
+ }
+ dLeft -= dLen;
+ dOff += dLen;
+ cOff += checksumSize;
+ }
+ }
+
/**
* sendBlock() is used to read block and its metadata and stream the data to
* either a client or to another datanode.
@@ -433,70 +531,54 @@ class BlockSender implements java.io.Clo
* {@link SocketOutputStream#transferToFully(FileChannel,
* long, int)}.
* @param throttler for sending data.
- * @return total bytes reads, including crc.
+ * @return total bytes read, including checksum data.
*/
long sendBlock(DataOutputStream out, OutputStream baseStream,
DataTransferThrottler throttler) throws IOException {
- if( out == null ) {
+ if (out == null) {
throw new IOException( "out stream is null" );
}
- this.throttler = throttler;
-
- long initialOffset = offset;
+ final long initialOffset = offset;
long totalRead = 0;
OutputStream streamForSendChunks = out;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
try {
- try {
- checksum.writeHeader(out);
- if ( chunkOffsetOK ) {
- out.writeLong( offset );
- }
- out.flush();
- } catch (IOException e) { //socket error
- throw ioeToSocketException(e);
- }
+ writeChecksumHeader(out);
int maxChunksPerPacket;
int pktSize = PacketHeader.PKT_HEADER_LEN;
-
- if (transferToAllowed && !verifyChecksum &&
- baseStream instanceof SocketOutputStream &&
- blockIn instanceof FileInputStream) {
-
+ boolean transferTo = transferToAllowed && !verifyChecksum
+ && baseStream instanceof SocketOutputStream
+ && blockIn instanceof FileInputStream;
+ if (transferTo) {
FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
-
- // blockInPosition also indicates sendChunks() uses transferTo.
blockInPosition = fileChannel.position();
streamForSendChunks = baseStream;
+ maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
- // assure a mininum buffer size.
- maxChunksPerPacket = (Math.max(HdfsConstants.IO_FILE_BUFFER_SIZE,
- MIN_BUFFER_WITH_TRANSFERTO)
- + bytesPerChecksum - 1)/bytesPerChecksum;
-
- // allocate smaller buffer while using transferTo().
+ // Smaller packet size to only hold checksum when doing transferTo
pktSize += checksumSize * maxChunksPerPacket;
} else {
- maxChunksPerPacket = Math.max(1, (HdfsConstants.IO_FILE_BUFFER_SIZE
- + bytesPerChecksum - 1) / bytesPerChecksum);
- pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
+ maxChunksPerPacket = Math.max(1,
+ numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
+ // Packet size includes both checksum and data
+ pktSize += (chunkSize + checksumSize) * maxChunksPerPacket;
}
ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
while (endOffset > offset) {
- long len = sendChunks(pktBuf, maxChunksPerPacket,
- streamForSendChunks);
+ long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
+ transferTo, throttler);
offset += len;
- totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
- checksumSize);
+ totalRead += len + (numberOfChunks(len) * checksumSize);
seqno++;
}
try {
// send an empty packet to mark the end of the block
- sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);
+ sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
+ throttler);
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
@@ -506,14 +588,39 @@ class BlockSender implements java.io.Clo
} finally {
if (clientTraceFmt != null) {
final long endTime = System.nanoTime();
- ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime));
+ ClientTraceLog.info(String.format(clientTraceFmt, totalRead,
+ initialOffset, endTime - startTime));
}
close();
}
-
return totalRead;
}
+ /**
+ * Write checksum header to the output stream
+ */
+ private void writeChecksumHeader(DataOutputStream out) throws IOException {
+ try {
+ checksum.writeHeader(out);
+ if (chunkOffsetOK) {
+ out.writeLong(offset);
+ }
+ out.flush();
+ } catch (IOException e) { //socket error
+ throw ioeToSocketException(e);
+ }
+ }
+
+ /**
+ * Write packet header into {@code pkt}
+ */
+ private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
+ pkt.clear();
+ PacketHeader header = new PacketHeader(packetLen, offset, seqno,
+ (dataLen == 0), dataLen);
+ header.putInBuffer(pkt);
+ }
+
boolean didSendEntireByteRange() {
return sentEntireByteRange;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Oct 6 01:16:48 2011
@@ -2058,7 +2058,7 @@ public class DataNode extends Configured
out = new DataOutputStream(new BufferedOutputStream(baseStream,
HdfsConstants.SMALL_BUFFER_SIZE));
blockSender = new BlockSender(b, 0, b.getNumBytes(),
- false, false, false, DataNode.this);
+ false, false, false, DataNode.this, null);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
//
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu Oct 6 01:16:48 2011
@@ -597,7 +597,7 @@ class DataXceiver extends Receiver imple
try {
// check if the block exists or not
blockSender = new BlockSender(block, 0, -1, false, false, false,
- datanode);
+ datanode, null);
// set up response stream
OutputStream baseStream = NetUtils.getOutputStream(
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Thu Oct 6 01:16:48 2011
@@ -46,10 +46,12 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.hdfs.web.ParamFilter;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
@@ -219,13 +221,13 @@ public class DatanodeWebHdfsMethods {
final String fullpath = path.getAbsolutePath();
final DataNode datanode = (DataNode)context.getAttribute("datanode");
+ final Configuration conf = new Configuration(datanode.getConf());
+ final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
+ final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
switch(op.getValue()) {
case OPEN:
{
- final Configuration conf = new Configuration(datanode.getConf());
- final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
- final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
final int b = bufferSize.getValue(conf);
final DFSDataInputStream in = new DFSClient.DFSDataInputStream(
dfsclient.open(fullpath, b, true));
@@ -244,6 +246,12 @@ public class DatanodeWebHdfsMethods {
};
return Response.ok(streaming).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
+ case GETFILECHECKSUM:
+ {
+ final MD5MD5CRC32FileChecksum checksum = dfsclient.getFileChecksum(fullpath);
+ final String js = JsonUtil.toJsonString(checksum);
+ return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
+ }
default:
throw new UnsupportedOperationException(op + " is not supported");
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Thu Oct 6 01:16:48 2011
@@ -54,7 +54,6 @@ class EditLogBackupOutputStream extends
this.nnRegistration = nnReg;
InetSocketAddress bnAddress =
NetUtils.createSocketAddr(bnRegistration.getAddress());
- Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress);
try {
this.backupNode =
RPC.getProxy(JournalProtocol.class,
@@ -67,16 +66,6 @@ class EditLogBackupOutputStream extends
this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
}
- @Override // JournalStream
- public String getName() {
- return bnRegistration.getAddress();
- }
-
- @Override // JournalStream
- public JournalType getType() {
- return JournalType.BACKUP;
- }
-
@Override // EditLogOutputStream
void write(FSEditLogOp op) throws IOException {
doubleBuf.writeOp(op);
@@ -142,16 +131,6 @@ class EditLogBackupOutputStream extends
}
/**
- * There is no persistent storage. Therefore length is 0.<p>
- * Length is used to check when it is large enough to start a checkpoint.
- * This criteria should not be used for backup streams.
- */
- @Override // EditLogOutputStream
- long length() throws IOException {
- return 0;
- }
-
- /**
* Get backup node registration.
*/
NamenodeRegistration getRegistration() {
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Thu Oct 6 01:16:48 2011
@@ -37,9 +37,7 @@ import com.google.common.annotations.Vis
* stores edits in a local file.
*/
class EditLogFileOutputStream extends EditLogOutputStream {
- private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);;
-
- private static int EDITS_FILE_HEADER_SIZE_BYTES = Integer.SIZE / Byte.SIZE;
+ private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
private File file;
private FileOutputStream fp; // file stream for storing edit logs
@@ -73,16 +71,6 @@ class EditLogFileOutputStream extends Ed
fc.position(fc.size());
}
- @Override // JournalStream
- public String getName() {
- return file.getPath();
- }
-
- @Override // JournalStream
- public JournalType getType() {
- return JournalType.FILE;
- }
-
/** {@inheritDoc} */
@Override
void write(FSEditLogOp op) throws IOException {
@@ -176,7 +164,10 @@ class EditLogFileOutputStream extends Ed
if (fp == null) {
throw new IOException("Trying to use aborted output stream");
}
-
+ if (doubleBuf.isFlushed()) {
+ LOG.info("Nothing to flush");
+ return;
+ }
preallocate(); // preallocate file if necessary
doubleBuf.flushTo(fp);
fc.force(false); // metadata updates not needed because of preallocation
@@ -190,16 +181,6 @@ class EditLogFileOutputStream extends Ed
public boolean shouldForceSync() {
return doubleBuf.shouldForceSync();
}
-
- /**
- * Return the size of the current edit log including buffered data.
- */
- @Override
- long length() throws IOException {
- // file size - header size + size of both buffers
- return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES +
- doubleBuf.countBufferedBytes();
- }
// allocate a big chunk of data
private void preallocate() throws IOException {
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Thu Oct 6 01:16:48 2011
@@ -18,23 +18,20 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
-import java.util.zip.Checksum;
import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Writable;
/**
* A generic abstract class to support journaling of edits logs into
* a persistent storage.
*/
-abstract class EditLogOutputStream implements JournalStream {
+abstract class EditLogOutputStream {
// these are statistics counters
private long numSync; // number of sync(s) to disk
private long totalTimeSync; // total time to sync
- EditLogOutputStream() throws IOException {
+ EditLogOutputStream() {
numSync = totalTimeSync = 0;
}
@@ -106,12 +103,6 @@ abstract class EditLogOutputStream imple
}
/**
- * Return the size of the current edits log.
- * Length is used to check when it is large enough to start a checkpoint.
- */
- abstract long length() throws IOException;
-
- /**
* Implement the policy when to automatically sync the buffered edits log
* The buffered edits can be flushed when the buffer becomes full or
* a certain period of time is elapsed.
@@ -132,12 +123,7 @@ abstract class EditLogOutputStream imple
/**
* Return number of calls to {@link #flushAndSync()}
*/
- long getNumSync() {
+ protected long getNumSync() {
return numSync;
}
-
- @Override // Object
- public String toString() {
- return getName();
- }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Oct 6 01:16:48 2011
@@ -17,12 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
-import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,25 +34,17 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
-import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Sets;
-
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
/**
* FSEditLog maintains a log of the namespace modifications.
@@ -62,9 +54,6 @@ import org.apache.hadoop.hdfs.server.nam
@InterfaceStability.Evolving
public class FSEditLog {
- static final String NO_JOURNAL_STREAMS_WARNING = "!!! WARNING !!!" +
- " File system changes are not persistent. No journal streams.";
-
static final Log LOG = LogFactory.getLog(FSEditLog.class);
/**
@@ -82,10 +71,11 @@ public class FSEditLog {
CLOSED;
}
private State state = State.UNINITIALIZED;
+
+ //initialize
+ final private JournalSet journalSet;
+ private EditLogOutputStream editLogStream = null;
-
- private List<JournalAndStream> journals = Lists.newArrayList();
-
// a monotonically increasing counter that represents transactionIds.
private long txid = 0;
@@ -137,15 +127,15 @@ public class FSEditLog {
this.storage = storage;
metrics = NameNode.getNameNodeMetrics();
lastPrintTime = now();
-
+
+ this.journalSet = new JournalSet();
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
- journals.add(new JournalAndStream(new FileJournalManager(sd)));
+ journalSet.add(new FileJournalManager(sd));
}
- if (journals.isEmpty()) {
+ if (journalSet.isEmpty()) {
LOG.error("No edits directories configured!");
- }
-
+ }
state = State.BETWEEN_LOG_SEGMENTS;
}
@@ -172,9 +162,8 @@ public class FSEditLog {
LOG.warn("Closing log when already closed", new Exception());
return;
}
-
if (state == State.IN_SEGMENT) {
- assert !journals.isEmpty();
+ assert editLogStream != null;
waitForSyncToFinish();
endCurrentLogSegment(true);
}
@@ -193,20 +182,14 @@ public class FSEditLog {
// wait if an automatic sync is scheduled
waitIfAutoSyncScheduled();
- if (journals.isEmpty()) {
- throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
- }
-
long start = beginTransaction();
op.setTransactionId(txid);
- mapJournalsAndReportErrors(new JournalClosure() {
- @Override
- public void apply(JournalAndStream jas) throws IOException {
- if (!jas.isActive()) return;
- jas.stream.write(op);
- }
- }, "logging edit");
+ try {
+ editLogStream.write(op);
+ } catch (IOException ex) {
+ // All journals failed, it is handled in logSync.
+ }
endTransaction(start);
@@ -251,14 +234,7 @@ public class FSEditLog {
* @return true if any of the edit stream says that it should sync
*/
private boolean shouldForceSync() {
- for (JournalAndStream jas : journals) {
- if (!jas.isActive()) continue;
-
- if (jas.getCurrentStream().shouldForceSync()) {
- return true;
- }
- }
- return false;
+ return editLogStream.shouldForceSync();
}
private long beginTransaction() {
@@ -322,7 +298,7 @@ public class FSEditLog {
* NOTE: this should be done while holding the FSNamesystem lock, or
* else more operations can start writing while this is in progress.
*/
- void logSyncAll() throws IOException {
+ void logSyncAll() {
// Record the most recent transaction ID as our own id
synchronized (this) {
TransactionId id = myTransactionId.get();
@@ -366,74 +342,73 @@ public class FSEditLog {
// Fetch the transactionId of this thread.
long mytxid = myTransactionId.get().txid;
- List<JournalAndStream> candidateJournals =
- Lists.newArrayListWithCapacity(journals.size());
- List<JournalAndStream> badJournals = Lists.newArrayList();
-
boolean sync = false;
try {
+ EditLogOutputStream logStream = null;
synchronized (this) {
try {
- printStatistics(false);
-
- // if somebody is already syncing, then wait
- while (mytxid > synctxid && isSyncRunning) {
- try {
- wait(1000);
- } catch (InterruptedException ie) {
+ printStatistics(false);
+
+ // if somebody is already syncing, then wait
+ while (mytxid > synctxid && isSyncRunning) {
+ try {
+ wait(1000);
+ } catch (InterruptedException ie) {
+ }
}
- }
- //
- // If this transaction was already flushed, then nothing to do
- //
- if (mytxid <= synctxid) {
- numTransactionsBatchedInSync++;
- if (metrics != null) // Metrics is non-null only when used inside name node
- metrics.incrTransactionsBatchedInSync();
- return;
- }
+ //
+ // If this transaction was already flushed, then nothing to do
+ //
+ if (mytxid <= synctxid) {
+ numTransactionsBatchedInSync++;
+ if (metrics != null) {
+ // Metrics is non-null only when used inside name node
+ metrics.incrTransactionsBatchedInSync();
+ }
+ return;
+ }
- // now, this thread will do the sync
- syncStart = txid;
- isSyncRunning = true;
- sync = true;
-
- // swap buffers
- assert !journals.isEmpty() : "no editlog streams";
-
- for (JournalAndStream jas : journals) {
- if (!jas.isActive()) continue;
+ // now, this thread will do the sync
+ syncStart = txid;
+ isSyncRunning = true;
+ sync = true;
+
+ // swap buffers
try {
- jas.getCurrentStream().setReadyToFlush();
- candidateJournals.add(jas);
- } catch (IOException ie) {
- LOG.error("Unable to get ready to flush.", ie);
- badJournals.add(jas);
+ if (journalSet.isEmpty()) {
+ throw new IOException("No journals available to flush");
+ }
+ editLogStream.setReadyToFlush();
+ } catch (IOException e) {
+ LOG.fatal("Could not sync any journal to persistent storage. "
+ + "Unsynced transactions: " + (txid - synctxid),
+ new Exception());
+ runtime.exit(1);
}
- }
} finally {
// Prevent RuntimeException from blocking other log edit write
doneWithAutoSyncScheduling();
}
+ //editLogStream may become null,
+ //so store a local variable for flush.
+ logStream = editLogStream;
}
-
+
// do the sync
long start = now();
- for (JournalAndStream jas : candidateJournals) {
- if (!jas.isActive()) continue;
- try {
- jas.getCurrentStream().flush();
- } catch (IOException ie) {
- LOG.error("Unable to sync edit log.", ie);
- //
- // remember the streams that encountered an error.
- //
- badJournals.add(jas);
+ try {
+ if (logStream != null) {
+ logStream.flush();
+ }
+ } catch (IOException ex) {
+ synchronized (this) {
+ LOG.fatal("Could not sync any journal to persistent storage. "
+ + "Unsynced transactions: " + (txid - synctxid), new Exception());
+ runtime.exit(1);
}
}
long elapsed = now() - start;
- disableAndReportErrorOnJournals(badJournals);
if (metrics != null) { // Metrics non-null only when used inside name node
metrics.addSync(elapsed);
@@ -443,13 +418,6 @@ public class FSEditLog {
// Prevent RuntimeException from blocking other log edit sync
synchronized (this) {
if (sync) {
- if (badJournals.size() >= journals.size()) {
- LOG.fatal("Could not sync any journal to persistent storage. " +
- "Unsynced transactions: " + (txid - synctxid),
- new Exception());
- runtime.exit(1);
- }
-
synctxid = syncStart;
isSyncRunning = false;
}
@@ -466,9 +434,6 @@ public class FSEditLog {
if (lastPrintTime + 60000 > now && !force) {
return;
}
- if (journals.isEmpty()) {
- return;
- }
lastPrintTime = now;
StringBuilder buf = new StringBuilder();
buf.append("Number of transactions: ");
@@ -478,20 +443,9 @@ public class FSEditLog {
buf.append("Number of transactions batched in Syncs: ");
buf.append(numTransactionsBatchedInSync);
buf.append(" Number of syncs: ");
- for (JournalAndStream jas : journals) {
- if (!jas.isActive()) continue;
- buf.append(jas.getCurrentStream().getNumSync());
- break;
- }
-
+ buf.append(editLogStream.getNumSync());
buf.append(" SyncTimes(ms): ");
-
- for (JournalAndStream jas : journals) {
- if (!jas.isActive()) continue;
- EditLogOutputStream eStream = jas.getCurrentStream();
- buf.append(eStream.getTotalSyncTime());
- buf.append(" ");
- }
+ buf.append(journalSet.getSyncTimes());
LOG.info(buf);
}
@@ -664,7 +618,6 @@ public class FSEditLog {
* log delegation token to edit log
* @param id DelegationTokenIdentifier
* @param expiryTime of the token
- * @return
*/
void logGetDelegationToken(DelegationTokenIdentifier id,
long expiryTime) {
@@ -703,24 +656,11 @@ public class FSEditLog {
}
/**
- * @return the number of active (non-failed) journals
- */
- private int countActiveJournals() {
- int count = 0;
- for (JournalAndStream jas : journals) {
- if (jas.isActive()) {
- count++;
- }
- }
- return count;
- }
-
- /**
* Used only by unit tests.
*/
@VisibleForTesting
List<JournalAndStream> getJournals() {
- return journals;
+ return journalSet.getAllJournalStreams();
}
/**
@@ -734,62 +674,9 @@ public class FSEditLog {
/**
* Return a manifest of what finalized edit logs are available
*/
- public synchronized RemoteEditLogManifest getEditLogManifest(
- long fromTxId) throws IOException {
- // Collect RemoteEditLogs available from each FileJournalManager
- List<RemoteEditLog> allLogs = Lists.newArrayList();
- for (JournalAndStream j : journals) {
- if (j.getManager() instanceof FileJournalManager) {
- FileJournalManager fjm = (FileJournalManager)j.getManager();
- try {
- allLogs.addAll(fjm.getRemoteEditLogs(fromTxId));
- } catch (Throwable t) {
- LOG.warn("Cannot list edit logs in " + fjm, t);
- }
- }
- }
-
- // Group logs by their starting txid
- ImmutableListMultimap<Long, RemoteEditLog> logsByStartTxId =
- Multimaps.index(allLogs, RemoteEditLog.GET_START_TXID);
- long curStartTxId = fromTxId;
-
- List<RemoteEditLog> logs = Lists.newArrayList();
- while (true) {
- ImmutableList<RemoteEditLog> logGroup = logsByStartTxId.get(curStartTxId);
- if (logGroup.isEmpty()) {
- // we have a gap in logs - for example because we recovered some old
- // storage directory with ancient logs. Clear out any logs we've
- // accumulated so far, and then skip to the next segment of logs
- // after the gap.
- SortedSet<Long> startTxIds = Sets.newTreeSet(logsByStartTxId.keySet());
- startTxIds = startTxIds.tailSet(curStartTxId);
- if (startTxIds.isEmpty()) {
- break;
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found gap in logs at " + curStartTxId + ": " +
- "not returning previous logs in manifest.");
- }
- logs.clear();
- curStartTxId = startTxIds.first();
- continue;
- }
- }
-
- // Find the one that extends the farthest forward
- RemoteEditLog bestLog = Collections.max(logGroup);
- logs.add(bestLog);
- // And then start looking from after that point
- curStartTxId = bestLog.getEndTxId() + 1;
- }
- RemoteEditLogManifest ret = new RemoteEditLogManifest(logs);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Generated manifest for logs since " + fromTxId + ":"
- + ret);
- }
- return ret;
+ public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId)
+ throws IOException {
+ return journalSet.getEditLogManifest(fromTxId);
}
/**
@@ -832,14 +719,9 @@ public class FSEditLog {
// See HDFS-2174.
storage.attemptRestoreRemovedStorage();
- mapJournalsAndReportErrors(new JournalClosure() {
- @Override
- public void apply(JournalAndStream jas) throws IOException {
- jas.startLogSegment(segmentTxId);
- }
- }, "starting log segment " + segmentTxId);
-
- if (countActiveJournals() == 0) {
+ try {
+ editLogStream = journalSet.startLogSegment(segmentTxId);
+ } catch (IOException ex) {
throw new IOException("Unable to start log segment " +
segmentTxId + ": no journals successfully started.");
}
@@ -873,14 +755,12 @@ public class FSEditLog {
final long lastTxId = getLastWrittenTxId();
- mapJournalsAndReportErrors(new JournalClosure() {
- @Override
- public void apply(JournalAndStream jas) throws IOException {
- if (jas.isActive()) {
- jas.close(lastTxId);
- }
- }
- }, "ending log segment");
+ try {
+ journalSet.finalizeLogSegment(curSegmentTxId, lastTxId);
+ editLogStream = null;
+ } catch (IOException e) {
+ //All journals have failed, it will be handled in logSync.
+ }
state = State.BETWEEN_LOG_SEGMENTS;
}
@@ -889,14 +769,15 @@ public class FSEditLog {
* Abort all current logs. Called from the backup node.
*/
synchronized void abortCurrentLogSegment() {
- mapJournalsAndReportErrors(new JournalClosure() {
-
- @Override
- public void apply(JournalAndStream jas) throws IOException {
- jas.abort();
+ try {
+ //Check for null, as abort can be called any time.
+ if (editLogStream != null) {
+ editLogStream.abort();
+ editLogStream = null;
}
- }, "aborting all streams");
- state = State.BETWEEN_LOG_SEGMENTS;
+ } catch (IOException e) {
+ LOG.warn("All journals failed to abort", e);
+ }
}
/**
@@ -912,13 +793,12 @@ public class FSEditLog {
"cannot purge logs older than txid " + minTxIdToKeep +
" when current segment starts at " + curSegmentTxId;
}
-
- mapJournalsAndReportErrors(new JournalClosure() {
- @Override
- public void apply(JournalAndStream jas) throws IOException {
- jas.manager.purgeLogsOlderThan(minTxIdToKeep);
- }
- }, "purging logs older than " + minTxIdToKeep);
+
+ try {
+ journalSet.purgeLogsOlderThan(minTxIdToKeep);
+ } catch (IOException ex) {
+ //All journals have failed, it will be handled in logSync.
+ }
}
@@ -946,9 +826,7 @@ public class FSEditLog {
// sets the initial capacity of the flush buffer.
public void setOutputBufferCapacity(int size) {
- for (JournalAndStream jas : journals) {
- jas.manager.setOutputBufferCapacity(size);
- }
+ journalSet.setOutputBufferCapacity(size);
}
/**
@@ -969,7 +847,7 @@ public class FSEditLog {
if(bnReg.isRole(NamenodeRole.CHECKPOINT))
return; // checkpoint node does not stream edits
- JournalAndStream jas = findBackupJournalAndStream(bnReg);
+ JournalManager jas = findBackupJournal(bnReg);
if (jas != null) {
// already registered
LOG.info("Backup node " + bnReg + " re-registers");
@@ -978,35 +856,29 @@ public class FSEditLog {
LOG.info("Registering new backup node: " + bnReg);
BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
- journals.add(new JournalAndStream(bjm));
+ journalSet.add(bjm);
}
- synchronized void releaseBackupStream(NamenodeRegistration registration) {
- for (Iterator<JournalAndStream> iter = journals.iterator();
- iter.hasNext();) {
- JournalAndStream jas = iter.next();
- if (jas.manager instanceof BackupJournalManager &&
- ((BackupJournalManager)jas.manager).matchesRegistration(
- registration)) {
- jas.abort();
- LOG.info("Removing backup journal " + jas);
- iter.remove();
- }
+ synchronized void releaseBackupStream(NamenodeRegistration registration)
+ throws IOException {
+ BackupJournalManager bjm = this.findBackupJournal(registration);
+ if (bjm != null) {
+ LOG.info("Removing backup journal " + bjm);
+ journalSet.remove(bjm);
}
}
/**
* Find the JournalAndStream associated with this BackupNode.
+ *
* @return null if it cannot be found
*/
- private synchronized JournalAndStream findBackupJournalAndStream(
+ private synchronized BackupJournalManager findBackupJournal(
NamenodeRegistration bnReg) {
- for (JournalAndStream jas : journals) {
- if (jas.manager instanceof BackupJournalManager) {
- BackupJournalManager bjm = (BackupJournalManager)jas.manager;
- if (bjm.matchesRegistration(bnReg)) {
- return jas;
- }
+ for (JournalManager bjm : journalSet.getJournalManagers()) {
+ if ((bjm instanceof BackupJournalManager)
+ && ((BackupJournalManager) bjm).matchesRegistration(bnReg)) {
+ return (BackupJournalManager) bjm;
}
}
return null;
@@ -1018,124 +890,24 @@ public class FSEditLog {
*/
synchronized void logEdit(final int length, final byte[] data) {
long start = beginTransaction();
-
- mapJournalsAndReportErrors(new JournalClosure() {
- @Override
- public void apply(JournalAndStream jas) throws IOException {
- if (jas.isActive()) {
- jas.getCurrentStream().writeRaw(data, 0, length); // TODO writeRaw
- }
- }
- }, "Logging edit");
-
- endTransaction(start);
- }
- //// Iteration across journals
- private interface JournalClosure {
- public void apply(JournalAndStream jas) throws IOException;
- }
-
- /**
- * Apply the given function across all of the journal managers, disabling
- * any for which the closure throws an IOException.
- * @param status message used for logging errors (e.g. "opening journal")
- */
- private void mapJournalsAndReportErrors(
- JournalClosure closure, String status) {
- List<JournalAndStream> badJAS = Lists.newLinkedList();
- for (JournalAndStream jas : journals) {
- try {
- closure.apply(jas);
- } catch (Throwable t) {
- LOG.error("Error " + status + " (journal " + jas + ")", t);
- badJAS.add(jas);
- }
- }
-
- disableAndReportErrorOnJournals(badJAS);
- }
-
- /**
- * Called when some journals experience an error in some operation.
- * This propagates errors to the storage level.
- */
- private void disableAndReportErrorOnJournals(List<JournalAndStream> badJournals) {
- if (badJournals == null || badJournals.isEmpty()) {
- return; // nothing to do
- }
-
- for (JournalAndStream j : badJournals) {
- LOG.error("Disabling journal " + j);
- j.abort();
- }
- }
-
- /**
- * Find the best editlog input stream to read from txid. In this case
- * best means the editlog which has the largest continuous range of
- * transactions starting from the transaction id, fromTxId.
- *
- * If a journal throws an CorruptionException while reading from a txn id,
- * it means that it has more transactions, but can't find any from fromTxId.
- * If this is the case and no other journal has transactions, we should throw
- * an exception as it means more transactions exist, we just can't load them.
- *
- * @param fromTxId Transaction id to start from.
- * @return a edit log input stream with tranactions fromTxId
- * or null if no more exist
- */
- private EditLogInputStream selectStream(long fromTxId)
- throws IOException {
- JournalManager bestjm = null;
- long bestjmNumTxns = 0;
- CorruptionException corruption = null;
-
- for (JournalAndStream jas : journals) {
- JournalManager candidate = jas.getManager();
- long candidateNumTxns = 0;
- try {
- candidateNumTxns = candidate.getNumberOfTransactions(fromTxId);
- } catch (CorruptionException ce) {
- corruption = ce;
- } catch (IOException ioe) {
- LOG.warn("Error reading number of transactions from " + candidate);
- continue; // error reading disk, just skip
- }
-
- if (candidateNumTxns > bestjmNumTxns) {
- bestjm = candidate;
- bestjmNumTxns = candidateNumTxns;
- }
- }
-
-
- if (bestjm == null) {
- /**
- * If all candidates either threw a CorruptionException or
- * found 0 transactions, then a gap exists.
- */
- if (corruption != null) {
- throw new IOException("Gap exists in logs from "
- + fromTxId, corruption);
- } else {
- return null;
- }
+ try {
+ editLogStream.writeRaw(data, 0, length);
+ } catch (IOException ex) {
+ // All journals have failed, it will be handled in logSync.
}
-
- return bestjm.getInputStream(fromTxId);
+ endTransaction(start);
}
/**
* Run recovery on all journals to recover any unclosed segments
*/
void recoverUnclosedStreams() {
- mapJournalsAndReportErrors(new JournalClosure() {
- @Override
- public void apply(JournalAndStream jas) throws IOException {
- jas.manager.recoverUnfinalizedSegments();
- }
- }, "recovering unclosed streams");
+ try {
+ journalSet.recoverUnfinalizedSegments();
+ } catch (IOException ex) {
+ // All journals have failed, it is handled in logSync.
+ }
}
/**
@@ -1143,23 +915,16 @@ public class FSEditLog {
* @param fromTxId first transaction in the selected streams
* @param toAtLeast the selected streams must contain this transaction
*/
- Collection<EditLogInputStream> selectInputStreams(long fromTxId, long toAtLeastTxId)
- throws IOException {
- List<EditLogInputStream> streams = Lists.newArrayList();
-
- boolean gapFound = false;
- EditLogInputStream stream = selectStream(fromTxId);
+ Collection<EditLogInputStream> selectInputStreams(long fromTxId,
+ long toAtLeastTxId) throws IOException {
+ List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
+ EditLogInputStream stream = journalSet.getInputStream(fromTxId);
while (stream != null) {
fromTxId = stream.getLastTxId() + 1;
streams.add(stream);
- try {
- stream = selectStream(fromTxId);
- } catch (IOException ioe) {
- gapFound = true;
- break;
- }
+ stream = journalSet.getInputStream(fromTxId);
}
- if (fromTxId <= toAtLeastTxId || gapFound) {
+ if (fromTxId <= toAtLeastTxId) {
closeAllStreams(streams);
throw new IOException("No non-corrupt logs for txid "
+ fromTxId);
@@ -1176,75 +941,4 @@ public class FSEditLog {
IOUtils.closeStream(s);
}
}
-
- /**
- * Container for a JournalManager paired with its currently
- * active stream.
- *
- * If a Journal gets disabled due to an error writing to its
- * stream, then the stream will be aborted and set to null.
- */
- static class JournalAndStream {
- private final JournalManager manager;
- private EditLogOutputStream stream;
- private long segmentStartsAtTxId = HdfsConstants.INVALID_TXID;
-
- private JournalAndStream(JournalManager manager) {
- this.manager = manager;
- }
-
- private void startLogSegment(long txId) throws IOException {
- Preconditions.checkState(stream == null);
- stream = manager.startLogSegment(txId);
- segmentStartsAtTxId = txId;
- }
-
- private void close(long lastTxId) throws IOException {
- Preconditions.checkArgument(lastTxId >= segmentStartsAtTxId,
- "invalid segment: lastTxId %s >= " +
- "segment starting txid %s", lastTxId, segmentStartsAtTxId);
-
- if (stream == null) return;
- stream.close();
- manager.finalizeLogSegment(segmentStartsAtTxId, lastTxId);
- stream = null;
- }
-
- @VisibleForTesting
- void abort() {
- if (stream == null) return;
- try {
- stream.abort();
- } catch (IOException ioe) {
- LOG.error("Unable to abort stream " + stream, ioe);
- }
- stream = null;
- segmentStartsAtTxId = HdfsConstants.INVALID_TXID;
- }
-
- private boolean isActive() {
- return stream != null;
- }
-
- @VisibleForTesting
- EditLogOutputStream getCurrentStream() {
- return stream;
- }
-
- @Override
- public String toString() {
- return "JournalAndStream(mgr=" + manager +
- ", " + "stream=" + stream + ")";
- }
-
- @VisibleForTesting
- void setCurrentStreamForTests(EditLogOutputStream stream) {
- this.stream = stream;
- }
-
- @VisibleForTesting
- JournalManager getManager() {
- return manager;
- }
- }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Thu Oct 6 01:16:48 2011
@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFac
import java.io.File;
import java.io.IOException;
import java.util.List;
-import java.util.HashMap;
import java.util.Comparator;
import java.util.Collections;
import java.util.regex.Matcher;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Thu Oct 6 01:16:48 2011
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.na
import java.io.IOException;
-import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
/**
* A JournalManager is responsible for managing a single place of storing