You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/05/14 08:59:47 UTC
svn commit: r656122 [1/2] - in /hadoop/core/trunk: ./
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/
src/java/org/apache/hadoop/io/ src/test/org/apache/hadoop/dfs/
src/webapps/datanode/
Author: dhruba
Date: Tue May 13 23:59:46 2008
New Revision: 656122
URL: http://svn.apache.org/viewvc?rev=656122&view=rev
Log:
HADOOP-2656. The Block object has a generation stamp inside it.
Existing blocks get a generation stamp of 0. This is needed to support
appends. (dhruba)
Added:
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetadataHeader.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStampUpgrade.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UpgradeManager.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectCollection.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
hadoop/core/trunk/src/java/org/apache/hadoop/io/ObjectWritable.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestPendingReplication.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestSimulatedFSDataset.java
hadoop/core/trunk/src/webapps/datanode/browseBlock.jsp
hadoop/core/trunk/src/webapps/datanode/browseDirectory.jsp
hadoop/core/trunk/src/webapps/datanode/tail.jsp
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue May 13 23:59:46 2008
@@ -47,6 +47,10 @@
HADOOP-3329. DatanodeDescriptor objects should not be stored in the
fsimage. (dhruba)
+ HADOOP-2656. The Block object has a generation stamp inside it.
+ Existing blocks get a generation stamp of 0. This is needed to support
+ appends. (dhruba)
+
NEW FEATURES
HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java Tue May 13 23:59:46 2008
@@ -344,6 +344,7 @@
out.writeShort(FSConstants.DATA_TRANSFER_VERSION);
out.writeByte(FSConstants.OP_COPY_BLOCK);
out.writeLong(block.getBlock().getBlockId());
+ out.writeLong(block.getBlock().getGenerationStamp());
Text.writeString(out, source.getStorageID());
target.write(out);
out.flush();
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java Tue May 13 23:59:46 2008
@@ -35,9 +35,13 @@
});
}
+ // generation stamp of blocks that pre-date the introduction of
+ // a generation stamp.
+ static final long GRANDFATHER_GENERATION_STAMP = 0;
+
/**
*/
- public static boolean isBlockFilename(File f) {
+ static boolean isBlockFilename(File f) {
String name = f.getName();
if ( name.startsWith( "blk_" ) &&
name.indexOf( '.' ) < 0 ) {
@@ -49,19 +53,22 @@
long blkid;
long len;
+ long generationStamp;
/**
*/
public Block() {
this.blkid = 0;
this.len = 0;
+ this.generationStamp = 0;
}
/**
*/
- public Block(final long blkid, final long len) {
+ public Block(final long blkid, final long len, final long generationStamp) {
this.blkid = blkid;
this.len = len;
+ this.generationStamp = generationStamp;
}
/**
@@ -69,21 +76,24 @@
public Block(Block blk) {
this.blkid = blk.blkid;
this.len = blk.len;
+ this.generationStamp = blk.generationStamp;
}
/**
* Find the blockid from the given filename
*/
- public Block(File f, long len) {
+ public Block(File f, long len, long genstamp) {
String name = f.getName();
name = name.substring("blk_".length());
this.blkid = Long.parseLong(name);
this.len = len;
+ this.generationStamp = genstamp;
}
- public void set(long blkid, long len) {
+ public void set(long blkid, long len, long genStamp) {
this.blkid = blkid;
this.len = len;
+ this.generationStamp = genStamp;
}
/**
*/
@@ -106,10 +116,14 @@
this.len = len;
}
+ long getGenerationStamp() {
+ return generationStamp;
+ }
+
/**
*/
public String toString() {
- return getBlockName();
+ return getBlockName() + "_" + getGenerationStamp();
}
/////////////////////////////////////
@@ -118,11 +132,13 @@
public void write(DataOutput out) throws IOException {
out.writeLong(blkid);
out.writeLong(len);
+ out.writeLong(generationStamp);
}
public void readFields(DataInput in) throws IOException {
this.blkid = in.readLong();
this.len = in.readLong();
+ this.generationStamp = in.readLong();
if (len < 0) {
throw new IOException("Unexpected block size: " + len);
}
@@ -136,7 +152,13 @@
if (blkid < b.blkid) {
return -1;
} else if (blkid == b.blkid) {
- return 0;
+ if (generationStamp < b.generationStamp) {
+ return -1;
+ } else if (generationStamp == b.generationStamp) {
+ return 0;
+ } else {
+ return 1;
+ }
} else {
return 1;
}
@@ -145,7 +167,8 @@
if (!(o instanceof Block)) {
return false;
}
- return blkid == ((Block)o).blkid;
+ return blkid == ((Block)o).blkid &&
+ generationStamp == ((Block)o).generationStamp;
}
public int hashCode() {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java Tue May 13 23:59:46 2008
@@ -26,10 +26,10 @@
*/
class BlockListAsLongs {
/**
- * A block as 2 longs
- * block-id and block length
+ * A block as 3 longs
+ * block-id and block length and generation stamp
*/
- private static final int LONGS_PER_BLOCK = 2;
+ private static final int LONGS_PER_BLOCK = 3;
private static int index2BlockId(int index) {
return index*LONGS_PER_BLOCK;
@@ -37,6 +37,9 @@
private static int index2BlockLen(int index) {
return (index*LONGS_PER_BLOCK) + 1;
}
+ private static int index2BlockGenStamp(int index) {
+ return (index*LONGS_PER_BLOCK) + 2;
+ }
private long[] blockList;
@@ -101,6 +104,15 @@
long getBlockLen(final int index) {
return blockList[index2BlockLen(index)];
}
+
+ /**
+ * The generation stamp of the indexTh block
+ * @param index - the block whose block-len is desired
+ * @return - the generation stamp
+ */
+ long getBlockGenStamp(final int index) {
+ return blockList[index2BlockGenStamp(index)];
+ }
/**
* Set the indexTh block
@@ -110,5 +122,6 @@
void setBlock(final int index, final Block b) {
blockList[index2BlockId(index)] = b.getBlockId();
blockList[index2BlockLen(index)] = b.getNumBytes();
+ blockList[index2BlockGenStamp(index)] = b.getGenerationStamp();
}
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java Tue May 13 23:59:46 2008
@@ -44,8 +44,6 @@
return info;
}
- //TODO: remove generationStamp if it is defined in Block
- private long generationStamp;
private long lastScanTime;
public BlockMetaDataInfo() {}
@@ -57,14 +55,12 @@
/** {@inheritDoc} */
public void write(DataOutput out) throws IOException {
super.write(out);
- out.writeLong(generationStamp);
out.writeLong(lastScanTime);
}
/** {@inheritDoc} */
public void readFields(DataInput in) throws IOException {
super.readFields(in);
- generationStamp = in.readLong();
lastScanTime = in.readLong();
}
-}
\ No newline at end of file
+}
Added: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetadataHeader.java?rev=656122&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetadataHeader.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetadataHeader.java Tue May 13 23:59:46 2008
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.IOUtils;
+
+
+/**
+ * BlockMetadataHeader manages metadata for data blocks on Datanodes.
+ * This is not related to the Block related functionality in Namenode.
+ * The biggest part of data block metadata is CRC for the block.
+ */
+class BlockMetadataHeader {
+
+ static final short METADATA_VERSION = FSDataset.METADATA_VERSION;
+
+ /**
+ * Header includes everything except the checksum(s) themselves.
+ * Version is two bytes. Following it is the DataChecksum
+ * that occupies 5 bytes.
+ */
+ private short version;
+ private DataChecksum checksum = null;
+
+ BlockMetadataHeader(short version, DataChecksum checksum) {
+ this.checksum = checksum;
+ this.version = version;
+ }
+
+ short getVersion() {
+ return version;
+ }
+
+ DataChecksum getChecksum() {
+ return checksum;
+ }
+
+
+ /**
+ * This reads all the fields till the beginning of checksum.
+ * @param in
+ * @return Metadata Header
+ * @throws IOException
+ */
+ static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
+ return readHeader(in.readShort(), in);
+ }
+
+ /**
+ * Reads header at the top of metadata file and returns the header.
+ *
+ * @param dataset
+ * @param block
+ * @return
+ * @throws IOException
+ */
+ static BlockMetadataHeader readHeader(File file) throws IOException {
+ DataInputStream in = null;
+ try {
+ in = new DataInputStream(new BufferedInputStream(
+ new FileInputStream(file)));
+ return readHeader(in);
+ } finally {
+ IOUtils.closeStream(in);
+ }
+ }
+
+ // Version is already read.
+ private static BlockMetadataHeader readHeader(short version, DataInputStream in)
+ throws IOException {
+ DataChecksum checksum = DataChecksum.newDataChecksum(in);
+ return new BlockMetadataHeader(version, checksum);
+ }
+
+ /**
+ * This writes all the fields till the beginning of checksum.
+ * @param out DataOutputStream
+ * @param header
+ * @return
+ * @throws IOException
+ */
+ private static void writeHeader(DataOutputStream out,
+ BlockMetadataHeader header)
+ throws IOException {
+ out.writeShort(header.getVersion());
+ header.getChecksum().writeHeader(out);
+ }
+
+ /**
+ * Writes all the fields till the beginning of checksum.
+ * @param out
+ * @param checksum
+ * @throws IOException
+ */
+ static void writeHeader(DataOutputStream out, DataChecksum checksum)
+ throws IOException {
+ writeHeader(out, new BlockMetadataHeader(METADATA_VERSION, checksum));
+ }
+
+ /**
+ * Returns the size of the header
+ */
+ static int getHeaderSize() {
+ return Short.SIZE/Byte.SIZE + DataChecksum.getChecksumHeaderSize();
+ }
+}
+
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Tue May 13 23:59:46 2008
@@ -37,9 +37,9 @@
* Compared to the previous version the following changes have been introduced:
* (Only the latest change is reflected.
* The log of historical changes can be retrieved from the svn).
- * 32: add corrupt field to LocatedBlock
+ * 33 : Block generation stamp stored in Block
*/
- public static final long versionID = 32L;
+ public static final long versionID = 33L;
///////////////////////////////////////
// File contents
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue May 13 23:59:46 2008
@@ -985,13 +985,14 @@
}
static BlockReader newBlockReader(Socket sock, String file, long blockId,
- long startOffset, long len, int bufferSize) throws IOException {
- return newBlockReader(sock, file, blockId, startOffset, len, bufferSize,
+ long genStamp, long startOffset, long len, int bufferSize) throws IOException {
+ return newBlockReader(sock, file, blockId, genStamp, startOffset, len, bufferSize,
true);
}
/** Java Doc required */
static BlockReader newBlockReader( Socket sock, String file, long blockId,
+ long genStamp,
long startOffset, long len,
int bufferSize, boolean verifyChecksum)
throws IOException {
@@ -1003,6 +1004,7 @@
out.writeShort( DATA_TRANSFER_VERSION );
out.write( OP_READ_BLOCK );
out.writeLong( blockId );
+ out.writeLong( genStamp );
out.writeLong( startOffset );
out.writeLong( len );
out.flush();
@@ -1268,6 +1270,7 @@
Block blk = targetBlock.getBlock();
blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(),
+ blk.getGenerationStamp(),
offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
buffersize, verifyChecksum);
return chosenNode;
@@ -1449,6 +1452,7 @@
reader = BlockReader.newBlockReader(dn, src,
block.getBlock().getBlockId(),
+ block.getBlock().getGenerationStamp(),
start, len, buffersize,
verifyChecksum);
int nread = reader.readAll(buf, offset, len);
@@ -2288,6 +2292,7 @@
out.writeShort( DATA_TRANSFER_VERSION );
out.write( OP_WRITE_BLOCK );
out.writeLong( block.getBlockId() );
+ out.writeLong( block.getGenerationStamp() );
out.writeInt( nodes.length );
out.writeBoolean( recoveryFlag ); // recovery flag
Text.writeString( out, client );
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java Tue May 13 23:59:46 2008
@@ -193,8 +193,8 @@
//still keep 'info.lastScanType' to NONE.
addBlockInfo(info);
}
-
- /* Pick the first directory that has any existing sanner log.
+
+ /* Pick the first directory that has any existing scanner log.
* otherwise, pick the first directory.
*/
File dir = null;
@@ -327,6 +327,7 @@
static private class LogEntry {
long blockId = -1;
long verificationTime = -1;
+ long genStamp = Block.GRANDFATHER_GENERATION_STAMP;
/**
* The format consists of single line with multiple entries. each
@@ -340,6 +341,7 @@
static String newEnry(Block block, long time) {
return "date=\"" + dateFormat.format(new Date(time)) + "\"\t " +
"time=\"" + time + "\"\t " +
+ "genstamp=\"" + block.getGenerationStamp() + "\"\t " +
"id=\"" + block.getBlockId() +"\"";
}
@@ -355,6 +357,8 @@
entry.blockId = Long.valueOf(value);
} else if (name.equals("time")) {
entry.verificationTime = Long.valueOf(value);
+ } else if (name.equals("genstamp")) {
+ entry.genStamp = Long.valueOf(value);
}
}
@@ -478,7 +482,7 @@
}
// update verification times from the verificationLog.
- Block tmpBlock = new Block(0, 0);
+ Block tmpBlock = new Block();
while (logReader != null && logReader.hasNext()) {
if (!datanode.shouldRun || Thread.interrupted()) {
return false;
@@ -486,7 +490,7 @@
String line = logReader.next();
LogEntry entry = LogEntry.parseEntry(line);
synchronized (this) {
- tmpBlock.blkid = entry.blockId;
+ tmpBlock.set(entry.blockId, 0, entry.genStamp);
BlockScanInfo info = blockMap.get(tmpBlock);
if(info != null && entry.verificationTime > 0 &&
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java Tue May 13 23:59:46 2008
@@ -205,8 +205,8 @@
public int getNumBytesInSum() {
return inSum;
}
- public int getChecksumHeaderSize() {
- return 2 + 1 + 4; // version: short + type : byte + bytesPerChecksum : int
+ static public int getChecksumHeaderSize() {
+ return 1 + FSConstants.SIZE_OF_INTEGER; // type byte, bytesPerChecksum int
}
//Checksum Interface. Just a wrapper around member summer.
public long getValue() {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue May 13 23:59:46 2008
@@ -36,6 +36,7 @@
import org.apache.hadoop.dfs.DatanodeProtocol;
import org.apache.hadoop.dfs.FSDatasetInterface.MetaDataInputStream;
import org.apache.hadoop.dfs.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.dfs.BlockMetadataHeader;
import java.io.*;
import java.net.*;
@@ -131,8 +132,8 @@
private boolean transferToAllowed = true;
private int writePacketSize = 0;
- DataBlockScanner blockScanner;
- Daemon blockScannerThread;
+ DataBlockScanner blockScanner = null;
+ Daemon blockScannerThread = null;
private static final Random R = new Random();
@@ -314,12 +315,11 @@
}
if ( reason == null ) {
blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
- blockScannerThread = new Daemon(blockScanner);
} else {
LOG.info("Periodic Block Verification is disabled because " +
reason + ".");
}
-
+
//create a servlet to serve full-file content
String infoAddr =
NetUtils.getServerAddress(conf,
@@ -725,6 +725,14 @@
}
processCommand(cmd);
}
+
+ // start block scanner
+ if (blockScanner != null && blockScannerThread == null &&
+ upgradeManager.isUpgradeCompleted()) {
+ LOG.info("Starting Periodic block scanner.");
+ blockScannerThread = new Daemon(blockScanner);
+ blockScannerThread.start();
+ }
//
// There is no work to do; sleep until hearbeat timer elapses,
@@ -1055,7 +1063,7 @@
// Read in the header
//
long blockId = in.readLong();
- Block block = new Block( blockId, 0 );
+ Block block = new Block( blockId, 0 , in.readLong());
long startOffset = in.readLong();
long length = in.readLong();
@@ -1123,7 +1131,7 @@
//
// Read in the header
//
- Block block = new Block(in.readLong(), estimateBlockSize);
+ Block block = new Block(in.readLong(), estimateBlockSize, in.readLong());
LOG.info("Receiving block " + block +
" src: " + remoteAddress +
" dest: " + localAddress);
@@ -1184,6 +1192,7 @@
mirrorOut.writeShort( DATA_TRANSFER_VERSION );
mirrorOut.write( OP_WRITE_BLOCK );
mirrorOut.writeLong( block.getBlockId() );
+ mirrorOut.writeLong( block.getGenerationStamp() );
mirrorOut.writeInt( pipelineSize );
mirrorOut.writeBoolean( isRecovery );
Text.writeString( mirrorOut, client );
@@ -1281,7 +1290,7 @@
void readMetadata(DataInputStream in) throws IOException {
xceiverCount.incr();
- Block block = new Block( in.readLong(), 0 );
+ Block block = new Block( in.readLong(), 0 , in.readLong());
MetaDataInputStream checksumIn = null;
DataOutputStream out = null;
@@ -1325,7 +1334,7 @@
private void copyBlock(DataInputStream in) throws IOException {
// Read in the header
long blockId = in.readLong(); // read block id
- Block block = new Block(blockId, 0);
+ Block block = new Block(blockId, 0, in.readLong());
String source = Text.readString(in); // read del hint
DatanodeInfo target = new DatanodeInfo(); // read target
target.readFields(in);
@@ -1356,6 +1365,7 @@
targetOut.writeShort(DATA_TRANSFER_VERSION); // transfer version
targetOut.writeByte(OP_REPLACE_BLOCK); // op code
targetOut.writeLong(block.getBlockId()); // block id
+ targetOut.writeLong(block.getGenerationStamp()); // block id
Text.writeString( targetOut, source); // del hint
// then send data
@@ -1401,7 +1411,7 @@
balancingSem.acquireUninterruptibly();
/* read header */
- Block block = new Block(in.readLong(), estimateBlockSize); // block id & len
+ Block block = new Block(in.readLong(), estimateBlockSize, in.readLong()); // block id & len
String sourceID = Text.readString(in);
short opStatus = OP_STATUS_SUCCESS;
@@ -1538,9 +1548,9 @@
+----------------------------------------------+
Processed in readBlock() :
- +-------------------------------------------------------+
- | 8 byte Block ID | 8 byte start offset | 8 byte length |
- +-------------------------------------------------------+
+ +-------------------------------------------------------------------------+
+ | 8 byte Block ID | 8 byte genstamp | 8 byte start offset | 8 byte length |
+ +-------------------------------------------------------------------------+
Client sends optional response only at the end of receiving data.
@@ -1648,12 +1658,14 @@
BUFFER_SIZE));
// read and handle the common header here. For now just a version
- short version = checksumIn.readShort();
+ BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
+ short version = header.getVersion();
+
if (version != FSDataset.METADATA_VERSION) {
LOG.warn("Wrong version (" + version + ") for metadata file for "
+ block + " ignoring ...");
}
- checksum = DataChecksum.newDataChecksum(checksumIn);
+ checksum = header.getChecksum();
} else {
LOG.warn("Could not find metadata file for " + block);
// This only decides the buffer size. Use BUFFER_SIZE?
@@ -2597,8 +2609,7 @@
try {
// write data chunk header
if (!finalized) {
- checksumOut.writeShort(FSDataset.METADATA_VERSION);
- checksum.writeHeader(checksumOut);
+ BlockMetadataHeader.writeHeader(checksumOut, checksum);
}
if (clientName.length() > 0) {
responder = new Daemon(threadGroup,
@@ -2690,7 +2701,7 @@
" which is not a multiple of bytesPerChecksum " +
bytesPerChecksum);
}
- long offsetInChecksum = checksum.getChecksumHeaderSize() +
+ long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
offsetInBlock / bytesPerChecksum * checksumSize;
if (out != null) {
out.flush();
@@ -2755,6 +2766,7 @@
out.writeShort(DATA_TRANSFER_VERSION);
out.writeByte(OP_WRITE_BLOCK);
out.writeLong(b.getBlockId());
+ out.writeLong(b.getGenerationStamp());
out.writeInt(0); // no pipelining
out.writeBoolean(false); // not part of recovery
Text.writeString(out, ""); // client
@@ -2791,11 +2803,6 @@
public void run() {
LOG.info(dnRegistration + "In DataNode.run, data = " + data);
- // start block scanner
- if (blockScannerThread != null) {
- blockScannerThread.start();
- }
-
// start dataXceiveServer
dataXceiveServer.start();
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java Tue May 13 23:59:46 2008
@@ -109,7 +109,7 @@
if (file == null || volume == null) {
throw new IOException("detachBlock:Block not found. " + block);
}
- File meta = FSDataset.getMetaFile(file);
+ File meta = FSDataset.getMetaFile(file, block);
if (meta == null) {
throw new IOException("Meta file not found for block " + block);
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Tue May 13 23:59:46 2008
@@ -352,7 +352,8 @@
// as part this block report - which why block list is stored as longs
Block iblk = new Block(); // a fixed new'ed block to be reused with index i
for (int i = 0; i < newReport.getNumberOfBlocks(); ++i) {
- iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i));
+ iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i),
+ newReport.getBlockGenStamp(i));
BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
if(storedBlock == null) { // Brand new block
toAdd.add(new Block(iblk));
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Tue May 13 23:59:46 2008
@@ -314,7 +314,7 @@
DatanodeInfo[] dataNode = {dfsIn.getCurrentDatanode()};
lblocks[0] = new LocatedBlock(dataBlock, dataNode);
LOG.info("Found checksum error in data stream at block="
- + dataBlock.getBlockName() + " on datanode="
+ + dataBlock + " on datanode="
+ dataNode[0].getName());
// Find block in checksum stream
@@ -327,7 +327,7 @@
DatanodeInfo[] sumsNode = {dfsSums.getCurrentDatanode()};
lblocks[1] = new LocatedBlock(sumsBlock, sumsNode);
LOG.info("Found checksum error in checksum stream at block="
- + sumsBlock.getBlockName() + " on datanode="
+ + sumsBlock + " on datanode="
+ sumsNode[0].getName());
// Ask client to delete blocks.
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Tue May 13 23:59:46 2008
@@ -189,7 +189,7 @@
// Version is reflected in the data storage file.
// Versions are negative.
// Decrement LAYOUT_VERSION to define a new version.
- public static final int LAYOUT_VERSION = -14;
+ public static final int LAYOUT_VERSION = -15;
// Current version:
- // Remove storing locations of last block of a file in fsimage
+ // Store generation stamp with each Block
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Tue May 13 23:59:46 2008
@@ -93,8 +93,8 @@
boolean resetIdx) throws IOException {
if (numBlocks < maxBlocksPerDir) {
File dest = new File(dir, b.getBlockName());
- File metaData = getMetaFile( src );
- if ( ! metaData.renameTo( getMetaFile(dest) ) ||
+ File metaData = getMetaFile( src, b );
+ if ( ! metaData.renameTo( getMetaFile(dest, b) ) ||
! src.renameTo( dest ) ) {
throw new IOException( "could not move files for " + b +
" from tmp to " +
@@ -139,6 +139,31 @@
return children[ lastChildIdx ].addBlock(b, src, true, false);
}
+ /** Find the metadata file for the specified block file.
+ * Return the generation stamp from the name of the metafile.
+ */
+ long getGenerationStampFromFile(File[] listdir, File blockFile) {
+ String blockName = blockFile.getName();
+ for (int j = 0; j < listdir.length; j++) {
+ String path = listdir[j].getName();
+ if (!path.startsWith(blockName)) {
+ continue;
+ }
+ String[] vals = path.split("_");
+ if (vals.length != 3) { // blk, blkid, genstamp.meta
+ continue;
+ }
+ String[] str = vals[2].split("\\.");
+ if (str.length != 2) {
+ continue;
+ }
+ return Long.parseLong(str[0]);
+ }
+ DataNode.LOG.warn("Block " + blockFile +
+ " does not have a metafile!");
+ return Block.GRANDFATHER_GENERATION_STAMP;
+ }
+
/**
* Populate the given blockSet with any child blocks
* found at this node.
@@ -153,12 +178,12 @@
File blockFiles[] = dir.listFiles();
for (int i = 0; i < blockFiles.length; i++) {
if (Block.isBlockFilename(blockFiles[i])) {
- blockSet.add(new Block(blockFiles[i], blockFiles[i].length()));
+ long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
+ blockSet.add(new Block(blockFiles[i], blockFiles[i].length(), genStamp));
}
}
}
-
void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap, FSVolume volume) {
if (children != null) {
for (int i = 0; i < children.length; i++) {
@@ -168,9 +193,9 @@
File blockFiles[] = dir.listFiles();
for (int i = 0; i < blockFiles.length; i++) {
- //We are not enforcing presense of metadata file
if (Block.isBlockFilename(blockFiles[i])) {
- volumeMap.put(new Block(blockFiles[i], blockFiles[i].length()),
+ long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
+ volumeMap.put(new Block(blockFiles[i], blockFiles[i].length(), genStamp),
new DatanodeBlockInfo(volume, blockFiles[i]));
}
}
@@ -363,7 +388,7 @@
File addBlock(Block b, File f) throws IOException {
File blockFile = dataDir.addBlock(b, f);
- File metaFile = getMetaFile( blockFile );
+ File metaFile = getMetaFile( blockFile , b);
dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
return blockFile;
}
@@ -508,9 +533,6 @@
public static final String METADATA_EXTENSION = ".meta";
public static final short METADATA_VERSION = 1;
- static File getMetaFile( File f ) {
- return new File( f.getAbsolutePath() + METADATA_EXTENSION );
- }
static class ActiveFile {
File file;
@@ -525,9 +547,14 @@
}
}
+ static File getMetaFile(File f , Block b) {
+ return new File( f.getAbsolutePath() +
+ "_" + b.getGenerationStamp() + METADATA_EXTENSION );
+ }
protected File getMetaFile(Block b) throws IOException {
File blockFile = getBlockFile( b );
- return getMetaFile(blockFile);
+ return new File( blockFile.getAbsolutePath() +
+ "_" + b.getGenerationStamp() + METADATA_EXTENSION );
}
public boolean metaFileExists(Block b) throws IOException {
@@ -628,9 +655,9 @@
return new FileInputStream(blockInFile.getFD());
}
- BlockWriteStreams createBlockWriteStreams( File f ) throws IOException {
+ private BlockWriteStreams createBlockWriteStreams( File f , File metafile) throws IOException {
return new BlockWriteStreams(new FileOutputStream(new RandomAccessFile( f , "rw" ).getFD()),
- new FileOutputStream( new RandomAccessFile( getMetaFile( f ) , "rw" ).getFD() ));
+ new FileOutputStream( new RandomAccessFile( metafile , "rw" ).getFD() ));
}
@@ -737,7 +764,8 @@
// REMIND - mjc - make this a filter stream that enforces a max
// block size, so clients can't go crazy
//
- return createBlockWriteStreams( f );
+ File metafile = getMetaFile(f, b);
+ return createBlockWriteStreams( f , metafile);
}
/**
@@ -902,7 +930,7 @@
v.clearPath(parent);
volumeMap.remove(invalidBlks[i]);
}
- File metaFile = getMetaFile( f );
+ File metaFile = getMetaFile( f, invalidBlks[i] );
long blockSize = f.length()+metaFile.length();
if ( !f.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
DataNode.LOG.warn("Unexpected error trying to delete block "
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Tue May 13 23:59:46 2008
@@ -28,6 +28,8 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.util.ArrayList;
import java.lang.Math;
import java.nio.channels.FileChannel;
@@ -448,7 +450,19 @@
blockSize = readLong(in);
}
// get blocks
- Block blocks[] = readBlocks(in);
+ Block blocks[] = null;
+ if (logVersion <= -14) {
+ blocks = readBlocks(in);
+ } else {
+ BlockTwo oldblk = new BlockTwo();
+ int num = in.readInt();
+ blocks = new Block[num];
+ for (int i = 0; i < num; i++) {
+ oldblk.readFields(in);
+ blocks[i] = new Block(oldblk.blkid, oldblk.len,
+ Block.GRANDFATHER_GENERATION_STAMP);
+ }
+ }
// Older versions of HDFS does not store the block size in inode.
// If the file has more than one block, use the size of the
@@ -1003,6 +1017,41 @@
sizeFlushBuffer = size;
}
+ /**
+ * A class to read in blocks stored in the old format. The only two
+ * fields in the block were blockid and length.
+ */
+ static class BlockTwo implements Writable {
+ long blkid;
+ long len;
+
+ static { // register a ctor
+ WritableFactories.setFactory
+ (BlockTwo.class,
+ new WritableFactory() {
+ public Writable newInstance() { return new BlockTwo(); }
+ });
+ }
+
+
+ BlockTwo() {
+ blkid = 0;
+ len = 0;
+ }
+ /////////////////////////////////////
+ // Writable
+ /////////////////////////////////////
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(blkid);
+ out.writeLong(len);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.blkid = in.readLong();
+ this.len = in.readLong();
+ }
+ }
+
/** This method is defined for compatibility reason. */
static private DatanodeDescriptor[] readDatanodeDescriptorArray(DataInput in
) throws IOException {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Tue May 13 23:59:46 2008
@@ -750,7 +750,12 @@
blocks = new Block[numBlocks];
for (int j = 0; j < numBlocks; j++) {
blocks[j] = new Block();
- blocks[j].readFields(in);
+ if (-14 < imgVersion) {
+ blocks[j].set(in.readLong(), in.readLong(),
+ Block.GRANDFATHER_GENERATION_STAMP);
+ } else {
+ blocks[j].readFields(in);
+ }
}
}
// Older versions of HDFS does not store the block size in inode.
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue May 13 23:59:46 2008
@@ -1047,11 +1047,11 @@
// Remove the block from the pending creates list
//
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
- +b.getBlockName()+"of file "+src);
+ +b+"of file "+src);
INodeFileUnderConstruction file = checkLease(src, holder);
dir.removeBlock(src, file, b);
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
- + b.getBlockName()
+ + b
+ " is removed from pendingCreates");
return true;
}
@@ -1185,11 +1185,12 @@
private Block allocateBlock(String src, INode file) throws IOException {
Block b = null;
do {
- b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
+ b = new Block(FSNamesystem.randBlockId.nextLong(), 0,
+ getGenerationStamp());
} while (isValidBlock(b));
b = dir.addBlock(src, file, b);
NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
- +src+ ". "+b.getBlockName());
+ +src+ ". "+b);
return b;
}
@@ -1300,14 +1301,14 @@
public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
throws IOException {
NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: "
- + blk.getBlockName() + " on "
+ + blk + " on "
+ dn.getName());
if (isInSafeMode()) {
- throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode);
+ throw new SafeModeException("Cannot invalidate block " + blk, safeMode);
}
DatanodeDescriptor node = getDatanode(dn);
if (node == null) {
- throw new IOException("Cannot invalidate block " + blk.getBlockName() +
+ throw new IOException("Cannot invalidate block " + blk +
" because datanode " + dn.getName() +
" does not exist.");
}
@@ -1319,11 +1320,11 @@
addToInvalidates(blk, dn);
removeStoredBlock(blk, node);
NameNode.stateChangeLog.debug("BLOCK* NameSystem.invalidateBlocks: "
- + blk.getBlockName() + " on "
+ + blk + " on "
+ dn.getName() + " listed for deletion.");
} else {
NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
- + blk.getBlockName() + " on "
+ + blk + " on "
+ dn.getName() + " is the only copy and was not deleted.");
}
}
@@ -2100,7 +2101,7 @@
neededReplicationsIterator.remove(); // remove from neededReplications
replIndex--;
NameNode.stateChangeLog.info("BLOCK* "
- + "Removing block " + block.getBlockName()
+ + "Removing block " + block
+ " from neededReplications as it does not belong to any file.");
continue;
}
@@ -2127,7 +2128,7 @@
replIndex--;
pendingReplications.add(block, targets.length);
NameNode.stateChangeLog.debug(
- "BLOCK* block " + block.getBlockName()
+ "BLOCK* block " + block
+ " is moved from neededReplications to pendingReplications");
}
if (NameNode.stateChangeLog.isInfoEnabled()) {
@@ -2139,7 +2140,7 @@
NameNode.stateChangeLog.info(
"BLOCK* ask "
+ srcNode.getName() + " to replicate "
- + block.getBlockName() + " to " + targetList);
+ + block + " to " + targetList);
NameNode.stateChangeLog.debug(
"BLOCK* neededReplications = " + neededReplications.size()
+ " pendingReplications = " + pendingReplications.size());
@@ -2257,7 +2258,7 @@
StringBuffer blockList = new StringBuffer();
for(Block blk : blocksToInvalidate) {
blockList.append(' ');
- blockList.append(blk.getBlockName());
+ blockList.append(blk);
}
NameNode.stateChangeLog.info("BLOCK* ask "
+ dn.getName() + " to delete " + blockList);
@@ -2492,7 +2493,7 @@
obsolete.add(b);
}
NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
- +"ask "+nodeID.getName()+" to delete "+b.getBlockName());
+ +"ask "+nodeID.getName()+" to delete "+b);
}
}
NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
@@ -2587,12 +2588,12 @@
//
if (!isInSafeMode()) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
- +"blockMap updated: "+node.getName()+" is added to "+block.getBlockName()+" size "+block.getNumBytes());
+ +"blockMap updated: "+node.getName()+" is added to "+block+" size "+block.getNumBytes());
}
} else {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
+ "Redundant addStoredBlock request received for "
- + block.getBlockName() + " on " + node.getName()
+ + block + " on " + node.getName()
+ " size " + block.getNumBytes());
}
//
@@ -2601,7 +2602,7 @@
if (fileINode == null) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
+ "addStoredBlock request received for "
- + block.getBlockName() + " on " + node.getName()
+ + block + " on " + node.getName()
+ " size " + block.getNumBytes()
+ " But it does not belong to any file.");
return block;
@@ -2812,7 +2813,7 @@
}
excessBlocks.add(b);
NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: "
- +"("+cur.getName()+", "+b.getBlockName()+") is added to excessReplicateMap");
+ +"("+cur.getName()+", "+b+") is added to excessReplicateMap");
//
// The 'excessblocks' tracks blocks until we get confirmation
@@ -2830,7 +2831,7 @@
}
invalidateSet.add(b);
NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: "
- +"("+cur.getName()+", "+b.getBlockName()+") is added to recentInvalidateSets");
+ +"("+cur.getName()+", "+b+") is added to recentInvalidateSets");
}
}
@@ -2840,10 +2841,10 @@
*/
synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
- +block.getBlockName() + " from "+node.getName());
+ +block + " from "+node.getName());
if (!blocksMap.removeNode(block, node)) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
- +block.getBlockName()+" has already been removed from node "+node);
+ +block+" has already been removed from node "+node);
return;
}
@@ -2867,7 +2868,7 @@
if (excessBlocks != null) {
excessBlocks.remove(block);
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
- +block.getBlockName()+" is removed from excessBlocks");
+ +block+" is removed from excessBlocks");
if (excessBlocks.size() == 0) {
excessReplicateMap.remove(node.getStorageID());
}
@@ -2886,16 +2887,16 @@
DatanodeDescriptor node = getDatanode(nodeID);
if (node == null) {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
- + block.getBlockName() + " is received from an unrecorded node "
+ + block + " is received from an unrecorded node "
+ nodeID.getName());
throw new IllegalArgumentException(
"Unexpected exception. Got blockReceived message from node "
- + block.getBlockName() + ", but there is no info for it");
+ + block + ", but there is no info for it");
}
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
- +block.getBlockName()+" is received from " + nodeID.getName());
+ +block+" is received from " + nodeID.getName());
}
// Check if this datanode should actually be shutdown instead.
@@ -2910,7 +2911,7 @@
delHintNode = datanodeMap.get(delHint);
if(delHintNode == null) {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
- + block.getBlockName()
+ + block
+ " is expected to be removed from an unrecorded node "
+ delHint);
}
Added: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStampUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStampUpgrade.java?rev=656122&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStampUpgrade.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStampUpgrade.java Tue May 13 23:59:46 2008
@@ -0,0 +1,981 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.net.SocketTimeoutException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.*;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+
+/**
+ * This class associates a block generation stamp with with block. This
+ * generation stamp is written to each metadata file. Please see
+ * HADOOP-1700 for details.
+ */
+class GenerationStampUpgradeDatanode extends UpgradeObjectDatanode {
+
+ public static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.dfs.GenerationStampUpgrade");
+
+ DatanodeProtocol namenode;
+ InetSocketAddress namenodeAddr;
+
+ // stats
+ private AtomicInteger blocksPreviouslyUpgraded = new AtomicInteger(0);
+ private AtomicInteger blocksToUpgrade = new AtomicInteger(0);
+ private AtomicInteger blocksUpgraded = new AtomicInteger(0);
+ private AtomicInteger errors = new AtomicInteger(0);
+
+ // process the upgrade using a pool of threads.
+ static private final int poolSize = 4;
+
+ // If no progress has occured during this time, print warnings message.
+ static private final int LONG_TIMEOUT_MILLISEC = 1*60*1000; // 1 minute
+
+ // This object is needed to indicate that namenode is not running upgrade.
+ static UpgradeCommand noUpgradeOnNamenode = new UpgradeCommand();
+
+ private List<UpgradeExecutor> completedList = new LinkedList<UpgradeExecutor>();
+
+ /* This is set when the datanode misses the regular upgrade.
+ * When this is set, it upgrades the block but stops heartbeating
+ * to the namenode.
+ */
+ private AtomicBoolean offlineUpgrade = new AtomicBoolean(false);
+ private AtomicBoolean upgradeCompleted = new AtomicBoolean(false);
+
+ // Implement the common interfaces required by UpgradeObjectDatanode
+
+ public int getVersion() {
+ return GenerationStampUpgradeNamenode.PRE_GENERATIONSTAMP_LAYOUT_VERSION;
+ }
+
+ /*
+ * Start upgrade if it not already running. It sends status to
+ * namenode even if an upgrade is already in progress.
+ */
+ public synchronized UpgradeCommand startUpgrade() throws IOException {
+ if (offlineUpgrade.get()) {
+ doUpgrade();
+ }
+ return null;
+ }
+
+ public String getDescription() {
+ return "Block Generation Stamp Upgrade at Datanode";
+ }
+
+ public short getUpgradeStatus() {
+ return (blocksToUpgrade.get() == blocksUpgraded.get()) ? 100 :
+ (short) Math.floor(blocksUpgraded.get()*100.0/blocksToUpgrade.get());
+ }
+
+ public UpgradeCommand completeUpgrade() throws IOException {
+ // return latest stats command.
+ assert getUpgradeStatus() == 100;
+ return new DatanodeStatsCommand(getUpgradeStatus(),
+ getDatanode().dnRegistration,
+ blocksPreviouslyUpgraded.get() + blocksUpgraded.get(),
+ blocksToUpgrade.get()-blocksUpgraded.get(),
+ errors.get(),
+ GenerationStampUpgradeNamenode.PRE_GENERATIONSTAMP_LAYOUT_VERSION);
+ }
+
+ @Override
+ boolean preUpgradeAction(NamespaceInfo nsInfo) throws IOException {
+ int nsUpgradeVersion = nsInfo.getDistributedUpgradeVersion();
+ if(nsUpgradeVersion >= getVersion()) {
+ return false; // Normal upgrade.
+ }
+
+ LOG.info("\n This Datanode has missed a cluster wide Block generation Stamp Upgrade." +
+ "\n Will perform an 'offline' upgrade of the blocks." +
+ "\n During this time, Datanode does not heartbeat.");
+
+
+ // Namenode removes this node from the registered nodes
+ try {
+ getDatanode().namenode.errorReport(getDatanode().dnRegistration,
+ DatanodeProtocol.NOTIFY,
+ "Performing an offline generation stamp " +
+ "upgrade. " +
+ "Will be back online once the ugprade " +
+ "completes. Please see datanode logs.");
+
+ } catch(IOException ignored) {
+ LOG.info("\n This Datanode was unable to send error report to namenode.");
+ }
+ offlineUpgrade.set(true);
+ return true;
+ }
+
+ public GenerationStampUpgradeDatanode() {
+ blocksPreviouslyUpgraded.set(0);
+ blocksToUpgrade.set(0);
+ blocksUpgraded.set(0);
+ errors.set(0);
+ }
+
+ static File getPreGenerationMetaFile(File f) {
+ return new File(f.getAbsolutePath() + FSDataset.METADATA_EXTENSION);
+ }
+
+ // This class is invoked by the worker thread to convert the
+ // metafile into the new format
+ //
+ class UpgradeExecutor implements Runnable {
+ Block block;
+ Throwable throwable;
+
+ UpgradeExecutor(Block b) {
+ block = b;
+ }
+
+ public void run() {
+ try {
+ // do the real work here
+ FSDataset dataset = (FSDataset) getDatanode().data;
+ upgradeToCurVersion(dataset, block);
+ } catch (Throwable t) {
+ throwable = t;
+ }
+ synchronized (completedList) {
+ completedList.add(this);
+ completedList.notify();
+ }
+ }
+
+ /**
+ * Upgrades the metadata file to current version if required.
+ * @param dataset
+ * @param block
+ */
+ void upgradeToCurVersion(FSDataset dataset, Block block)
+ throws IOException {
+ File blockFile = dataset.getBlockFile(block);
+ if (blockFile == null) {
+ throw new IOException("Could find file for " + block);
+ }
+
+ File metadataFile = dataset.getMetaFile(block);
+ File oldmetadataFile = getPreGenerationMetaFile(blockFile);
+
+ if (metadataFile.exists() && oldmetadataFile.exists()) {
+ //
+ // If both file exists and are of the same size,
+ // then delete the old one. If the sizes are not same then
+ // leave both of them and consider the upgrade as successful.
+ //
+ if (metadataFile.length() == oldmetadataFile.length()) {
+ if (!oldmetadataFile.delete()) {
+ LOG.info("Unable to delete old metadata file " + oldmetadataFile);
+ }
+ }
+ } else if (metadataFile.exists()) {
+ //
+ // Only the new file exists, nothing more to do.
+ //
+ return;
+ } else if (oldmetadataFile.exists()) {
+ //
+ // The old file exists but the new one is missing. Rename
+ // old one to new name.
+ //
+ if (!oldmetadataFile.renameTo(metadataFile)) {
+ throw new IOException("Could find rename " + oldmetadataFile +
+ " to " + metadataFile);
+ }
+ } else {
+ throw new IOException("Could find any metadata file for " + block);
+ }
+ }
+ }
+
+ // This method iterates through all the blocks on a datanode and
+ // do the upgrade.
+ //
+ void doUpgrade() throws IOException {
+
+ if (upgradeCompleted.get()) {
+ assert offlineUpgrade.get() :
+ ("Multiple calls to doUpgrade is expected only during " +
+ "offline upgrade");
+ return;
+ }
+
+ FSDataset dataset = (FSDataset) getDatanode().data;
+
+ // Set up the retry policy so that each attempt waits for one minute.
+ Configuration conf = new Configuration();
+ // set rpc timeout to one minute.
+ conf.set("ipc.client.timeout", "60000");
+
+ RetryPolicy timeoutPolicy =
+ RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+ LONG_TIMEOUT_MILLISEC/1000,
+ 1, TimeUnit.MILLISECONDS);
+
+ Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
+ new HashMap<Class<? extends Exception>, RetryPolicy>();
+ exceptionToPolicyMap.put(SocketTimeoutException.class, timeoutPolicy);
+ RetryPolicy methodPolicy = RetryPolicies.retryByException(
+ RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+ Map<String,RetryPolicy> methodNameToPolicyMap =
+ new HashMap<String, RetryPolicy>();
+ // do we need to set the policy for connection failures also?
+ methodNameToPolicyMap.put("processUpgradeCommand", methodPolicy);
+
+ LOG.info("Starting Block Generation Stamp Upgrade on datanode " +
+ getDatanode());
+
+ for (;;) {
+ try {
+ namenodeAddr = getDatanode().getNameNodeAddr();
+ namenode = (DatanodeProtocol) RetryProxy.create(
+ DatanodeProtocol.class,
+ RPC.waitForProxy(DatanodeProtocol.class,
+ DatanodeProtocol.versionID,
+ namenodeAddr,
+ conf),
+ methodNameToPolicyMap);
+ break;
+ } catch (IOException e) {
+ LOG.warn("Generation Stamp Upgrade Exception " +
+ "while trying to connect to NameNode at " +
+ getDatanode().getNameNodeAddr().toString() + " : " +
+ StringUtils.stringifyException(e));
+ try {
+ Thread.sleep(10*1000);
+ } catch (InterruptedException e1) {
+ throw new IOException("Interrupted Sleep while creating RPC proxy." +
+ e1);
+ }
+ }
+ }
+ LOG.info("Block Generation Stamp Upgrade Datanode connected to " +
+ "namenode at " + namenodeAddr);
+
+ // Get a list of all the blocks :
+ LinkedList<UpgradeExecutor> blockList = new LinkedList<UpgradeExecutor>();
+
+ //Fill blockList with blocks to be upgraded.
+ Block [] blockArr = dataset.getBlockReport();
+
+ for (Block b : blockArr) {
+ File blockFile = null;
+ try {
+ blockFile = dataset.getBlockFile(b);
+ } catch (IOException e) {
+ //The block might just be deleted. ignore it.
+ LOG.warn("Could not find file location for " + b +
+ ". It might already be deleted. Exception : " +
+ StringUtils.stringifyException(e));
+ errors.getAndIncrement();
+ continue;
+ }
+ if (!blockFile.exists()) {
+ errors.getAndIncrement();
+ LOG.error("could not find block file " + blockFile);
+ continue;
+ }
+ File metaFile = dataset.getMetaFile(b);
+ File oldMetaFile = getPreGenerationMetaFile(blockFile);
+ if (metaFile.exists()) {
+ blocksPreviouslyUpgraded.getAndIncrement();
+ continue;
+ }
+ blocksToUpgrade.getAndIncrement();
+ blockList.add(new UpgradeExecutor(b));
+ }
+ blockArr = null;
+ int nLeft = blockList.size();
+
+ LOG.info("Starting upgrade of " + blocksToUpgrade.get() + " blocks out of " +
+ (blocksToUpgrade.get() + blocksPreviouslyUpgraded.get()));
+
+ // Start the pool of upgrade workers
+ ExecutorService pool = Executors.newFixedThreadPool(poolSize);
+ for (Iterator<UpgradeExecutor> it = blockList.iterator(); it.hasNext();) {
+ pool.submit(it.next());
+ }
+
+ // Inform the namenode
+ sendStatus();
+
+ // Report status to namenode every so many seconds:
+ long now = System.currentTimeMillis();
+ long statusReportIntervalMilliSec = 30*1000;
+ long lastStatusReportTime = now;
+ long lastUpdateTime = now;
+ long lastWarnTime = now;
+
+ // Now wait for the tasks to complete.
+ //
+ while (nLeft > 0) {
+ synchronized (completedList) {
+ if (completedList.size() <= 0) {
+ try {
+ completedList.wait(1000);
+ } catch (InterruptedException ignored) {}
+ }
+
+ now = System.currentTimeMillis();
+
+ if (completedList.size()> 0) {
+ UpgradeExecutor exe = completedList.remove(0);
+ nLeft--;
+ if (exe.throwable != null) {
+ errors.getAndIncrement();
+ LOG.error("Got an exception during generation stamp upgrade of " +
+ exe.block + ": " +
+ StringUtils.stringifyException(exe.throwable));
+ }
+ blocksUpgraded.getAndIncrement();
+ lastUpdateTime = now;
+ } else {
+ if ((now - lastUpdateTime) >= LONG_TIMEOUT_MILLISEC &&
+ (now - lastWarnTime) >= LONG_TIMEOUT_MILLISEC) {
+ lastWarnTime = now;
+ LOG.warn("No block was updated in last " +
+ (LONG_TIMEOUT_MILLISEC/(60*1000)) +
+ " minutes! will keep waiting... ");
+ }
+ }
+ }
+
+ if ((now-lastStatusReportTime) > statusReportIntervalMilliSec) {
+ sendStatus();
+ lastStatusReportTime = System.currentTimeMillis();
+ }
+ }
+
+ pool.shutdown();
+ upgradeCompleted.set(true);
+
+ LOG.info("Completed Block Generation Stamp Upgrade. Total of " +
+ (blocksPreviouslyUpgraded.get() + blocksToUpgrade.get()) +
+ " blocks : " + blocksPreviouslyUpgraded.get() + " blocks previously " +
+ "upgraded, " + blocksUpgraded.get() + " blocks upgraded this time " +
+ "with " + errors.get() + " errors.");
+
+ // now inform the name node about the completion.
+ // What if there is no upgrade running on Namenode now?
+ while (!sendStatus());
+
+ }
+
+ /** Sends current status and stats to namenode and logs it to local log*/
+ boolean sendStatus() {
+ LOG.info((offlineUpgrade.get() ? "Offline " : "") +
+ "Block Generation Stamp Upgrade : " +
+ getUpgradeStatus() + "% completed.");
+ if (offlineUpgrade.get()) {
+ return true;
+ }
+
+ DatanodeStatsCommand cmd = null;
+ synchronized (this) {
+ cmd = new DatanodeStatsCommand(getUpgradeStatus(),
+ getDatanode().dnRegistration,
+ blocksPreviouslyUpgraded.get() + blocksUpgraded.get(),
+ blocksToUpgrade.get()-blocksUpgraded.get(),
+ errors.get(),
+ GenerationStampUpgradeNamenode.PRE_GENERATIONSTAMP_LAYOUT_VERSION);
+ }
+ UpgradeCommand reply = sendCommand(namenodeAddr, namenode, cmd, 0);
+ if (reply == null) {
+ LOG.warn("Could not send status to Namenode. Namenode might be " +
+ "over loaded or down.");
+ }
+ return reply != null;
+ }
+
+
+ // Sends a command to the namenode
+ static UpgradeCommand sendCommand(InetSocketAddress namenodeAddr,
+ DatanodeProtocol namenode,
+ UpgradeCommand cmd, int retries) {
+ for(int i=0; i<=retries || retries<0; i++) {
+ try {
+ UpgradeCommand reply = namenode.processUpgradeCommand(cmd);
+ if (reply == null) {
+ /* namenode might not be running upgrade or finished
+ * an upgrade. We just return a static object */
+ return noUpgradeOnNamenode;
+ }
+ return reply;
+ } catch (IOException e) {
+ // print the stack trace only for the last retry.
+ LOG.warn("Exception to " + namenodeAddr +
+ " while sending command " +
+ cmd.getAction() + ": " + e +
+ ((retries<0 || i>=retries)? "... will retry ..." :
+ ": " + StringUtils.stringifyException(e)));
+ }
+ }
+ return null;
+ }
+}
+
+/**
+ * Once an upgrade starts at the namenode , this class manages the upgrade
+ * process.
+ */
+class GenerationStampUpgradeNamenode extends UpgradeObjectNamenode {
+
+ public static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.dfs.GenerationStampUpgradeNamenode");
+
+ static final long inactivityExtension = 10*1000; // 10 seconds
+ AtomicLong lastNodeCompletionTime = new AtomicLong(0);
+
+ // The layout version before the generation stamp upgrade.
+ static final int PRE_GENERATIONSTAMP_LAYOUT_VERSION = -13;
+
+ static final int DN_CMD_STATS = 300;
+
+ enum UpgradeStatus {
+ INITIALIZED,
+ STARTED,
+ DATANODES_DONE,
+ COMPLETED,
+ }
+
+ UpgradeStatus upgradeStatus = UpgradeStatus.INITIALIZED;
+
+ class DnInfo {
+ short percentCompleted = 0;
+ long blocksUpgraded = 0;
+ long blocksRemaining = 0;
+ long errors = 0;
+
+ DnInfo(short pcCompleted) {
+ percentCompleted = status;
+ }
+ DnInfo() {}
+
+ void setStats(DatanodeStatsCommand cmd) {
+ percentCompleted = cmd.getCurrentStatus();
+ blocksUpgraded = cmd.blocksUpgraded;
+ blocksRemaining = cmd.blocksRemaining;
+ errors = cmd.errors;
+ }
+
+ boolean isDone() {
+ return percentCompleted >= 100;
+ }
+ }
+
+ /* We should track only the storageIDs and not DatanodeID, which
+ * includes datanode name and storage id.
+ */
+ HashMap<DatanodeID, DnInfo> dnMap = new HashMap<DatanodeID, DnInfo>();
+ HashMap<DatanodeID, DnInfo> unfinishedDnMap =
+ new HashMap<DatanodeID, DnInfo>();
+
+ Daemon monitorThread;
+ double avgDatanodeCompletionPct = 0;
+ boolean forceDnCompletion = false;
+
+ //Upgrade object interface:
+
+ public int getVersion() {
+ return PRE_GENERATIONSTAMP_LAYOUT_VERSION;
+ }
+
+ public UpgradeCommand completeUpgrade() throws IOException {
+ return null;
+ }
+
+ @Override
+ public String getDescription() {
+ return "Block Generation Stamp Upgrade at Namenode";
+ }
+
+ @Override
+ public synchronized short getUpgradeStatus() {
+ // Reserve 10% for deleting files.
+ if (upgradeStatus == UpgradeStatus.COMPLETED) {
+ return 100;
+ }
+ return (short) avgDatanodeCompletionPct;
+ }
+
+ @Override
+ public UpgradeCommand startUpgrade() throws IOException {
+
+ assert monitorThread == null;
+ lastNodeCompletionTime.set(System.currentTimeMillis());
+
+ monitorThread = new Daemon(new UpgradeMonitor());
+ monitorThread.start();
+ return super.startUpgrade();
+ }
+
+ @Override
+ public synchronized void forceProceed() throws IOException {
+ if (forceDnCompletion) {
+ LOG.warn("forceProceed is already set for this upgrade. It can take " +
+ "a short while to take affect. Please wait.");
+ return;
+ }
+
+ LOG.info("got forceProceed request for this upgrade. Datanodes upgrade " +
+ "will be considered done. It can take a few seconds to take " +
+ "effect.");
+ forceDnCompletion = true;
+ }
+
+ @Override
+ UpgradeCommand processUpgradeCommand(UpgradeCommand command)
+ throws IOException {
+ switch (command.getAction()) {
+
+ case GenerationStampUpgradeNamenode.DN_CMD_STATS :
+ return handleStatsCmd(command);
+
+ default:
+ throw new IOException("Unknown Command for Generation Stamp Upgrade : " +
+ command.getAction());
+ }
+ }
+
+ @Override
+ public UpgradeStatusReport getUpgradeStatusReport(boolean details)
+ throws IOException {
+
+ /* If 'details' is true should we update block level status?
+ * It could take multiple minutes
+ * updateBlckLevelStats()?
+ */
+
+ String replyString = "";
+
+ short status = 0;
+
+ synchronized (this) {
+
+ status = getUpgradeStatus();
+
+ replyString = String.format(
+ ((monitorThread == null) ? "\tUpgrade has not been started yet.\n" : "")+
+ ((forceDnCompletion) ? "\tForce Proceed is ON\n" : "") +
+ "\tLast Block Level Stats updated at : %tc\n" +
+ "\tLast Block Level Stats : %s\n" +
+ "\tBrief Datanode Status : %s\n" +
+ "%s",
+ latestBlockLevelStats.updatedAt,
+ latestBlockLevelStats.statusString("\n\t "),
+ printStatus("\n\t "),
+ ((status < 100 && upgradeStatus == UpgradeStatus.DATANODES_DONE) ?
+ "\tNOTE: Upgrade at the Datanodes has finished. Deleteing \".crc\" " +
+ "files\n\tcan take longer than status implies.\n" : "")
+ );
+
+ if (details) {
+ // list all the known data nodes
+ StringBuilder str = null;
+ Iterator<DatanodeID> keys = dnMap.keySet().iterator();
+ Iterator<DnInfo> values = dnMap.values().iterator();
+
+ for(; keys.hasNext() && values.hasNext() ;) {
+ DatanodeID dn = keys.next();
+ DnInfo info = values.next();
+ String dnStr = "\t\t" + dn.getName() + "\t : " +
+ info.percentCompleted + " % \t" +
+ info.blocksUpgraded + " u \t" +
+ info.blocksRemaining + " r \t" +
+ info.errors + " e\n";
+ if ( str == null ) {
+ str = new StringBuilder(dnStr.length()*
+ (dnMap.size() + (dnMap.size()+7)/8));
+ }
+ str.append(dnStr);
+ }
+
+ replyString += "\n\tDatanode Stats (total: " + dnMap.size() + "): " +
+ "pct Completion(%) blocks upgraded (u) " +
+ "blocks remaining (r) errors (e)\n\n" +
+ (( str == null ) ?
+ "\t\tThere are no known Datanodes\n" : str);
+ }
+ }
+ return new GenerationStampUpgradeStatusReport(
+ PRE_GENERATIONSTAMP_LAYOUT_VERSION,
+ status, replyString);
+ }
+
+
+ /**
+ * The namenode process a periodic statistics message from the datanode.
+ */
+ private synchronized UpgradeCommand handleStatsCmd(UpgradeCommand cmd) {
+
+ DatanodeStatsCommand stats = (DatanodeStatsCommand)cmd;
+
+ DatanodeID dn = stats.datanodeId;
+ DnInfo dnInfo = dnMap.get(dn);
+ boolean alreadyCompleted = (dnInfo != null && dnInfo.isDone());
+
+ if (dnInfo == null) {
+ dnInfo = new DnInfo();
+ dnMap.put(dn, dnInfo);
+ LOG.info("Upgrade started/resumed at datanode " + dn.getName());
+ }
+
+ dnInfo.setStats(stats);
+ if (!dnInfo.isDone()) {
+ unfinishedDnMap.put(dn, dnInfo);
+ }
+
+ if (dnInfo.isDone() && !alreadyCompleted) {
+ LOG.info("upgrade completed on datanode " + dn.getName());
+ unfinishedDnMap.remove(dn);
+ if (unfinishedDnMap.size() == 0) {
+ lastNodeCompletionTime.set(System.currentTimeMillis());
+ }
+ }
+
+ //Should we send any more info?
+ return new UpgradeCommand();
+ }
+
+ public GenerationStampUpgradeNamenode() {
+ }
+
+ // For now we will wait for all the nodes to complete upgrade.
+ synchronized boolean isUpgradeDone() {
+ return upgradeStatus == UpgradeStatus.COMPLETED;
+ }
+
+ synchronized String printStatus(String spacing) {
+ //NOTE: iterates on all the datanodes.
+
+ // Calculate % completion on all the data nodes.
+ long errors = 0;
+ long totalCompletion = 0;
+ for( Iterator<DnInfo> it = dnMap.values().iterator(); it.hasNext(); ) {
+ DnInfo dnInfo = it.next();
+ totalCompletion += dnInfo.percentCompleted;
+ errors += dnInfo.errors;
+ }
+
+ avgDatanodeCompletionPct = totalCompletion/(dnMap.size() + 1e-20);
+
+ String msg = "Avg completion of all Datanodes: " +
+ String.format("%.2f%%", avgDatanodeCompletionPct) +
+ " with " + errors + " errors. " +
+ ((unfinishedDnMap.size() > 0) ? spacing +
+ unfinishedDnMap.size() + " out of " + dnMap.size() +
+ " nodes are not done." : "");
+
+ LOG.info("Generation Stamp Upgrade is " + (isUpgradeDone() ?
+ "complete. " : "still running. ") + spacing + msg);
+ return msg;
+ }
+
+ private synchronized void setStatus(UpgradeStatus status) {
+ upgradeStatus = status;
+ }
+
+ /* Checks if upgrade completed based on datanode's status and/or
+ * if all the blocks are upgraded.
+ */
+ private synchronized UpgradeStatus checkOverallCompletion() {
+
+ if (upgradeStatus == UpgradeStatus.COMPLETED ||
+ upgradeStatus == UpgradeStatus.DATANODES_DONE) {
+ return upgradeStatus;
+ }
+
+ if (upgradeStatus != UpgradeStatus.DATANODES_DONE) {
+ boolean datanodesDone =
+ (dnMap.size() > 0 && unfinishedDnMap.size() == 0 &&
+ ( System.currentTimeMillis() - lastNodeCompletionTime.get() ) >
+ inactivityExtension) || forceDnCompletion ;
+
+ if ( datanodesDone ) {
+ LOG.info("Upgrade of DataNode blocks is complete. " +
+ ((forceDnCompletion) ? "(ForceDnCompletion is on.)" : ""));
+ upgradeStatus = UpgradeStatus.DATANODES_DONE;
+ }
+ }
+
+ if (upgradeStatus != UpgradeStatus.DATANODES_DONE &&
+ latestBlockLevelStats.updatedAt > 0) {
+ // check if last block report marked all
+ if (latestBlockLevelStats.minimallyReplicatedBlocks == 0 &&
+ latestBlockLevelStats.underReplicatedBlocks == 0) {
+
+ LOG.info("Marking datanode upgrade complete since all the blocks are " +
+ "upgraded (even though some datanodes may not have " +
+ "reported completion. Block level stats :\n\t" +
+ latestBlockLevelStats.statusString("\n\t"));
+ upgradeStatus = UpgradeStatus.DATANODES_DONE;
+ }
+ }
+
+ return upgradeStatus;
+ }
+
+ /**
+ * This class monitors the upgrade progress and periodically prints
+ * status message to log.
+ */
+ class UpgradeMonitor implements Runnable {
+
+ static final long statusReportIntervalMillis = 1*60*1000;
+ static final long blockReportIntervalMillis = 5*60*1000;
+ static final int sleepTimeSec = 5;
+
+ public void run() {
+ long lastReportTime = System.currentTimeMillis();
+ long lastBlockReportTime = lastReportTime;
+
+ while ( !isUpgradeDone() ) {
+ UpgradeStatus status = checkOverallCompletion();
+
+ if (status == UpgradeStatus.DATANODES_DONE) {
+ setStatus(UpgradeStatus.COMPLETED);
+ }
+
+ long now = System.currentTimeMillis();
+
+
+ if (now-lastBlockReportTime >= blockReportIntervalMillis) {
+ updateBlockLevelStats();
+ // Check if all the blocks have been upgraded.
+ lastBlockReportTime = now;
+ }
+
+ if ((now - lastReportTime) >= statusReportIntervalMillis ||
+ isUpgradeDone()) {
+ printStatus("\n\t");
+ lastReportTime = now;
+ }
+
+ if (isUpgradeDone()) {
+ break;
+ }
+
+ try {
+ Thread.sleep(sleepTimeSec*1000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ LOG.info("Leaving the Generation Stamp Upgrade Namenode monitor thread");
+ }
+ }
+
+ private BlockLevelStats latestBlockLevelStats = new BlockLevelStats();
+ // internal class to hold the stats.
+ private static class BlockLevelStats {
+ long fullyReplicatedBlocks = 0;
+ long minimallyReplicatedBlocks = 0;
+ long underReplicatedBlocks = 0; // includes unReplicatedBlocks
+ long unReplicatedBlocks = 0; // zero replicas upgraded
+ long errors;
+ long updatedAt;
+
+ String statusString(String spacing) {
+ long totalBlocks = fullyReplicatedBlocks +
+ minimallyReplicatedBlocks +
+ underReplicatedBlocks;
+ double multiplier = 100/(totalBlocks + 1e-20);
+
+ if (spacing.equals("")) {
+ spacing = ", ";
+ }
+
+ return String.format(
+ "Total Blocks : %d" +
+ "%sFully Upgragraded : %.2f%%" +
+ "%sMinimally Upgraded : %.2f%%" +
+ "%sUnder Upgraded : %.2f%% (includes Un-upgraded blocks)" +
+ "%sUn-upgraded : %.2f%%" +
+ "%sErrors : %d", totalBlocks,
+ spacing, (fullyReplicatedBlocks * multiplier),
+ spacing, (minimallyReplicatedBlocks * multiplier),
+ spacing, (underReplicatedBlocks * multiplier),
+ spacing, (unReplicatedBlocks * multiplier),
+ spacing, errors);
+ }
+ }
+
+ void updateBlockLevelStats(String path, BlockLevelStats stats) {
+ DFSFileInfo[] fileArr = getFSNamesystem().dir.getListing(path);
+
+ for (DFSFileInfo file:fileArr) {
+ if (file.isDir()) {
+ updateBlockLevelStats(file.getPath().toString(), stats);
+ } else {
+ // Get the all the blocks.
+ LocatedBlocks blockLoc = null;
+ try {
+ blockLoc = getFSNamesystem().getBlockLocations(
+ file.getPath().toString(), 0, file.getLen());
+ int numBlocks = blockLoc.locatedBlockCount();
+ for (int i=0; i<numBlocks; i++) {
+ LocatedBlock loc = blockLoc.get(i);
+ DatanodeInfo[] dnArr = loc.getLocations();
+ int numUpgraded = 0;
+ synchronized (this) {
+ for (DatanodeInfo dn:dnArr) {
+ DnInfo dnInfo = dnMap.get(dn);
+ if (dnInfo != null && dnInfo.isDone()) {
+ numUpgraded++;
+ }
+ }
+ }
+
+ if (numUpgraded >= file.getReplication()) {
+ stats.fullyReplicatedBlocks++;
+ } else if (numUpgraded >= getFSNamesystem().getMinReplication()) {
+ stats.minimallyReplicatedBlocks++;
+ } else {
+ stats.underReplicatedBlocks++;
+ }
+ if (numUpgraded == 0) {
+ stats.unReplicatedBlocks++;
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("BlockGenerationStampUpgrade: could not get block locations for " +
+ file.getPath().toString() + " : " +
+ StringUtils.stringifyException(e));
+ stats.errors++;
+ }
+ }
+ }
+ }
+
+ void updateBlockLevelStats() {
+ /* This iterates over all the blocks and updates various
+ * counts.
+ * Since iterating over all the blocks at once would be quite
+ * large operation under lock, we iterate over all the files
+ * and update the counts for blocks that belong to a file.
+ */
+
+ LOG.info("Starting update of block level stats. " +
+ "This could take a few minutes");
+ BlockLevelStats stats = new BlockLevelStats();
+ updateBlockLevelStats("/", stats);
+ stats.updatedAt = System.currentTimeMillis();
+
+ LOG.info("Block level stats:\n\t" + stats.statusString("\n\t"));
+ synchronized (this) {
+ latestBlockLevelStats = stats;
+ }
+ }
+}
+
+/**
+ * The Datanode sends this statistics object to the Namenode periodically.
+ */
+class DatanodeStatsCommand extends UpgradeCommand {
+ DatanodeID datanodeId;
+ int blocksUpgraded;
+ int blocksRemaining;
+ int errors;
+
+ DatanodeStatsCommand() {
+ super(GenerationStampUpgradeNamenode.DN_CMD_STATS, 0, (short)0);
+ datanodeId = new DatanodeID();
+ }
+
+ public DatanodeStatsCommand(short status, DatanodeID dn,
+ int blocksUpgraded, int blocksRemaining,
+ int errors, int version) {
+ super(GenerationStampUpgradeNamenode.DN_CMD_STATS, version, status);
+ //copy so that only ID part gets serialized
+ datanodeId = new DatanodeID(dn);
+ this.blocksUpgraded = blocksUpgraded;
+ this.blocksRemaining = blocksRemaining;
+ this.errors = errors;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ datanodeId.readFields(in);
+ blocksUpgraded = in.readInt();
+ blocksRemaining = in.readInt();
+ errors = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ datanodeId.write(out);
+ out.writeInt(blocksUpgraded);
+ out.writeInt(blocksRemaining);
+ out.writeInt(errors);
+ }
+}
+
+
+/**
+ * A status report object for Generation Stamp Upgrades
+ */
+class GenerationStampUpgradeStatusReport extends UpgradeStatusReport {
+
+ String extraText = "";
+
+ public GenerationStampUpgradeStatusReport() {
+ }
+
+ public GenerationStampUpgradeStatusReport(int version, short status,
+ String extraText) {
+ super(version, status, false);
+ this.extraText = extraText;
+ }
+
+ @Override
+ public String getStatusText(boolean details) {
+ return super.getStatusText(details) + "\n\n" + extraText;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ extraText = Text.readString(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ Text.writeString(out, extraText);
+ }
+}
+
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java Tue May 13 23:59:46 2008
@@ -109,7 +109,8 @@
s.close();
return chosenNode;
}
- public void streamBlockInAscii(InetSocketAddress addr, long blockId, long blockSize,
+ public void streamBlockInAscii(InetSocketAddress addr, long blockId,
+ long genStamp, long blockSize,
long offsetIntoBlock, long chunkSizeToView, JspWriter out)
throws IOException {
if (chunkSizeToView == 0) return;
@@ -122,7 +123,7 @@
// Use the block name for file name.
DFSClient.BlockReader blockReader =
DFSClient.BlockReader.newBlockReader(s, addr.toString() + ":" + blockId,
- blockId, offsetIntoBlock,
+ blockId, genStamp ,offsetIntoBlock,
amtToRead,
conf.getInt("io.file.buffer.size",
4096));