You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:06:08 UTC
svn commit: r885143 [8/18] - in /hadoop/hdfs/branches/HDFS-326: ./
.eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/
src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs/...
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java Sat Nov 28 20:05:56 2009
@@ -17,48 +17,98 @@
*/
package org.apache.hadoop.hdfs.protocol;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+
/**
* This class provides an interface for accessing list of blocks that
* has been implemented as long[].
- * This class is usefull for block report. Rather than send block reports
+ * This class is useful for block report. Rather than send block reports
* as a Block[] we can send it as a long[].
*
+ * The structure of the array is as follows:
+ * 0: the length of the finalized replica list;
+ * 1: the length of the under-construction replica list;
+ * - followed by finalized replica list where each replica is represented by
+ * 3 longs: one for the blockId, one for the block length, and one for
+ * the generation stamp;
+ * - followed by the invalid replica represented with three -1s;
+ * - followed by the under-construction replica list where each replica is
+ * represented by 4 longs: three for the block id, length, generation
+ * stamp, and the forth for the replica state.
*/
-public class BlockListAsLongs {
+public class BlockListAsLongs implements Iterable<Block> {
/**
- * A block as 3 longs
+ * A finalized block as 3 longs
* block-id and block length and generation stamp
*/
- private static final int LONGS_PER_BLOCK = 3;
-
- private static int index2BlockId(int index) {
- return index*LONGS_PER_BLOCK;
- }
- private static int index2BlockLen(int index) {
- return (index*LONGS_PER_BLOCK) + 1;
- }
- private static int index2BlockGenStamp(int index) {
- return (index*LONGS_PER_BLOCK) + 2;
+ private static final int LONGS_PER_FINALIZED_BLOCK = 3;
+
+ /**
+ * An under-construction block as 4 longs
+ * block-id and block length, generation stamp and replica state
+ */
+ private static final int LONGS_PER_UC_BLOCK = 4;
+
+ /** Number of longs in the header */
+ private static final int HEADER_SIZE = 2;
+
+ /**
+ * Returns the index of the first long in blockList
+ * belonging to the specified block.
+ * The first long contains the block id.
+ */
+ private int index2BlockId(int blockIndex) {
+ if(blockIndex < 0 || blockIndex > getNumberOfBlocks())
+ return -1;
+ int finalizedSize = getNumberOfFinalizedReplicas();
+ if(blockIndex < finalizedSize)
+ return HEADER_SIZE + blockIndex * LONGS_PER_FINALIZED_BLOCK;
+ return HEADER_SIZE + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
+ + (blockIndex - finalizedSize) * LONGS_PER_UC_BLOCK;
}
-
+
private long[] blockList;
/**
- * Converting a block[] to a long[]
- * @param blockArray - the input array block[]
- * @return the output array of long[]
+ * Create block report from finalized and under construction lists of blocks.
+ *
+ * @param finalized - list of finalized blocks
+ * @param uc - list of under construction blocks
*/
-
- public static long[] convertToArrayLongs(final Block[] blockArray) {
- long[] blocksAsLongs = new long[blockArray.length * LONGS_PER_BLOCK];
+ public BlockListAsLongs(final List<? extends Block> finalized,
+ final List<ReplicaInfo> uc) {
+ int finalizedSize = finalized == null ? 0 : finalized.size();
+ int ucSize = uc == null ? 0 : uc.size();
+ int len = HEADER_SIZE
+ + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
+ + ucSize * LONGS_PER_UC_BLOCK;
- BlockListAsLongs bl = new BlockListAsLongs(blocksAsLongs);
- assert bl.getNumberOfBlocks() == blockArray.length;
+ blockList = new long[len];
- for (int i = 0; i < blockArray.length; i++) {
- bl.setBlock(i, blockArray[i]);
+ // set the header
+ blockList[0] = finalizedSize;
+ blockList[1] = ucSize;
+
+ // set finalized blocks
+ for (int i = 0; i < finalizedSize; i++) {
+ setBlock(i, finalized.get(i));
}
- return blocksAsLongs;
+
+ // set invalid delimiting block
+ setDelimitingBlock(finalizedSize);
+
+ // set under construction blocks
+ for (int i = 0; i < ucSize; i++) {
+ setBlock(finalizedSize + i, uc.get(i));
+ }
+ }
+
+ public BlockListAsLongs() {
+ this(null);
}
/**
@@ -67,33 +117,136 @@
*/
public BlockListAsLongs(final long[] iBlockList) {
if (iBlockList == null) {
- blockList = new long[0];
- } else {
- if (iBlockList.length%LONGS_PER_BLOCK != 0) {
- // must be multiple of LONGS_PER_BLOCK
- throw new IllegalArgumentException();
- }
- blockList = iBlockList;
+ blockList = new long[HEADER_SIZE];
+ return;
+ }
+ blockList = iBlockList;
+ }
+
+ public long[] getBlockListAsLongs() {
+ return blockList;
+ }
+
+ /**
+ * Iterates over blocks in the block report.
+ * Avoids object allocation on each iteration.
+ */
+ public class BlockReportIterator implements Iterator<Block> {
+ private int currentBlockIndex;
+ private Block block;
+ private ReplicaState currentReplicaState;
+
+ BlockReportIterator() {
+ this.currentBlockIndex = 0;
+ this.block = new Block();
+ this.currentReplicaState = null;
+ }
+
+ public boolean hasNext() {
+ return currentBlockIndex < getNumberOfBlocks();
+ }
+
+ public Block next() {
+ block.set(blockId(currentBlockIndex),
+ blockLength(currentBlockIndex),
+ blockGenerationStamp(currentBlockIndex));
+ currentReplicaState = blockReplicaState(currentBlockIndex);
+ currentBlockIndex++;
+ return block;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException("Sorry. can't remove.");
+ }
+
+ /**
+ * Get the state of the current replica.
+ * The state corresponds to the replica returned
+ * by the latest {@link #next()}.
+ */
+ public ReplicaState getCurrentReplicaState() {
+ return currentReplicaState;
}
}
-
+ /**
+ * Returns an iterator over blocks in the block report.
+ */
+ public Iterator<Block> iterator() {
+ return getBlockReportIterator();
+ }
+
+ /**
+ * Returns {@link BlockReportIterator}.
+ */
+ public BlockReportIterator getBlockReportIterator() {
+ return new BlockReportIterator();
+ }
+
/**
* The number of blocks
* @return - the number of blocks
*/
public int getNumberOfBlocks() {
- return blockList.length/LONGS_PER_BLOCK;
+ assert blockList.length == HEADER_SIZE +
+ (blockList[0] + 1) * LONGS_PER_FINALIZED_BLOCK +
+ blockList[1] * LONGS_PER_UC_BLOCK :
+ "Number of blocks is inconcistent with the array length";
+ return getNumberOfFinalizedReplicas() + getNumberOfUCReplicas();
}
-
-
+
+ /**
+ * Returns the number of finalized replicas in the block report.
+ */
+ private int getNumberOfFinalizedReplicas() {
+ return (int)blockList[0];
+ }
+
+ /**
+ * Returns the number of under construction replicas in the block report.
+ */
+ private int getNumberOfUCReplicas() {
+ return (int)blockList[1];
+ }
+
+ /**
+ * Returns the id of the specified replica of the block report.
+ */
+ private long blockId(int index) {
+ return blockList[index2BlockId(index)];
+ }
+
+ /**
+ * Returns the length of the specified replica of the block report.
+ */
+ private long blockLength(int index) {
+ return blockList[index2BlockId(index) + 1];
+ }
+
+ /**
+ * Returns the generation stamp of the specified replica of the block report.
+ */
+ private long blockGenerationStamp(int index) {
+ return blockList[index2BlockId(index) + 2];
+ }
+
+ /**
+ * Returns the state of the specified replica of the block report.
+ */
+ private ReplicaState blockReplicaState(int index) {
+ if(index < getNumberOfFinalizedReplicas())
+ return ReplicaState.FINALIZED;
+ return ReplicaState.getState((int)blockList[index2BlockId(index) + 3]);
+ }
+
/**
* The block-id of the indexTh block
* @param index - the block whose block-id is desired
* @return the block-id
*/
+ @Deprecated
public long getBlockId(final int index) {
- return blockList[index2BlockId(index)];
+ return blockId(index);
}
/**
@@ -101,8 +254,9 @@
* @param index - the block whose block-len is desired
* @return - the block-len
*/
+ @Deprecated
public long getBlockLen(final int index) {
- return blockList[index2BlockLen(index)];
+ return blockLength(index);
}
/**
@@ -110,8 +264,9 @@
* @param index - the block whose block-len is desired
* @return - the generation stamp
*/
+ @Deprecated
public long getBlockGenStamp(final int index) {
- return blockList[index2BlockGenStamp(index)];
+ return blockGenerationStamp(index);
}
/**
@@ -119,9 +274,28 @@
* @param index - the index of the block to set
* @param b - the block is set to the value of the this block
*/
- void setBlock(final int index, final Block b) {
- blockList[index2BlockId(index)] = b.getBlockId();
- blockList[index2BlockLen(index)] = b.getNumBytes();
- blockList[index2BlockGenStamp(index)] = b.getGenerationStamp();
+ private <T extends Block> void setBlock(final int index, final T b) {
+ int pos = index2BlockId(index);
+ blockList[pos] = b.getBlockId();
+ blockList[pos + 1] = b.getNumBytes();
+ blockList[pos + 2] = b.getGenerationStamp();
+ if(index < getNumberOfFinalizedReplicas())
+ return;
+ assert ((ReplicaInfo)b).getState() != ReplicaState.FINALIZED :
+ "Must be under-construction replica.";
+ blockList[pos + 3] = ((ReplicaInfo)b).getState().getValue();
+ }
+
+ /**
+ * Set the invalid delimiting block between the finalized and
+ * the under-construction lists.
+ * The invalid block has all three fields set to -1.
+ * @param finalizedSzie - the size of the finalized list
+ */
+ private void setDelimitingBlock(final int finalizedSzie) {
+ int idx = HEADER_SIZE + finalizedSzie * LONGS_PER_FINALIZED_BLOCK;
+ blockList[idx] = -1;
+ blockList[idx+1] = -1;
+ blockList[idx+2] = -1;
}
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Sat Nov 28 20:05:56 2009
@@ -29,19 +29,10 @@
public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
/**
- * 4: never return null and always return a newly generated access token
+ * 6: recoverBlock() removed.
*/
- public static final long versionID = 4L;
+ public static final long versionID = 6L;
- /** Start generation-stamp recovery for specified block
- * @param block the specified block
- * @param keepLength keep the block length
- * @param targets the list of possible locations of specified block
- * @return either a new generation stamp, or the original generation stamp.
- * Regardless of whether a new generation stamp is returned, a newly
- * generated access token is returned as part of the return value.
- * @throws IOException
- */
- LocatedBlock recoverBlock(Block block, boolean keepLength,
- DatanodeInfo[] targets) throws IOException;
+ /** Return the visible length of a replica. */
+ long getReplicaVisibleLength(Block b) throws IOException;
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Sat Nov 28 20:05:56 2009
@@ -21,8 +21,11 @@
import java.io.IOException;
import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -43,9 +46,9 @@
* Compared to the previous version the following changes have been introduced:
* (Only the latest change is reflected.
* The log of historical changes can be retrieved from the svn).
- * 45: add create flag for create command, see Hadoop-5438
+ * 52: adding concat() API
*/
- public static final long versionID = 45L;
+ public static final long versionID = 52L;
///////////////////////////////////////
// File contents
@@ -74,6 +77,13 @@
long length) throws IOException;
/**
+ * Get server default values for a number of configuration params.
+ * @return a set of server default configuration values
+ * @throws IOException
+ */
+ public FsServerDefaults getServerDefaults() throws IOException;
+
+ /**
* Create a new file entry in the namespace.
* <p>
* This will create an empty file specified by the source path.
@@ -85,14 +95,15 @@
* {@link #rename(String, String)} it until the file is completed
* or explicitly as a result of lease expiration.
* <p>
- * Blocks have a maximum size. Clients that intend to
- * create multi-block files must also use {@link #addBlock(String, String)}.
+ * Blocks have a maximum size. Clients that intend to create
+ * multi-block files must also use {@link #addBlock(String, String, Block)}.
*
* @param src path of the file being created.
* @param masked masked permission.
* @param clientName name of the current client.
* @param flag indicates whether the file should be
* overwritten if it already exists or create if it does not exist or append.
+ * @param createParent create missing parent directory if true
* @param replication block replication factor.
* @param blockSize maximum block size.
*
@@ -107,6 +118,7 @@
FsPermission masked,
String clientName,
EnumSetWritable<CreateFlag> flag,
+ boolean createParent,
short replication,
long blockSize
) throws IOException;
@@ -177,9 +189,17 @@
* addBlock() allocates a new block and datanodes the block data
* should be replicated to.
*
+ * addBlock() also commits the previous block by reporting
+ * to the name-node the actual generation stamp and the length
+ * of the block that the client has transmitted to data-nodes.
+ *
* @return LocatedBlock allocated block information.
*/
- public LocatedBlock addBlock(String src, String clientName) throws IOException;
+ public LocatedBlock addBlock(String src, String clientName,
+ Block previous) throws IOException;
+
+ public LocatedBlock addBlock(String src, String clientName,
+ Block previous, DatanodeInfo[] excludedNode) throws IOException;
/**
* The client is done writing data to the given filename, and would
@@ -187,13 +207,18 @@
*
* The function returns whether the file has been closed successfully.
* If the function returns false, the caller should try again.
+ *
+ * close() also commits the last block of the file by reporting
+ * to the name-node the actual generation stamp and the length
+ * of the block that the client has transmitted to data-nodes.
*
* A call to complete() will not return true until all the file's
* blocks have been replicated the minimum number of times. Thus,
* DataNode failures may cause a client to call complete() several
* times before succeeding.
*/
- public boolean complete(String src, String clientName) throws IOException;
+ public boolean complete(String src, String clientName,
+ Block last) throws IOException;
/**
* The client wants to report corrupted blocks (blocks with specified
@@ -207,7 +232,6 @@
///////////////////////////////////////
/**
* Rename an item in the file system namespace.
- *
* @param src existing file or directory name.
* @param dst new name.
* @return true if successful, or false if the old name does not exist
@@ -215,10 +239,45 @@
* @throws IOException if the new name is invalid.
* @throws QuotaExceededException if the rename would violate
* any quota restriction
+ * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead.
*/
+ @Deprecated
public boolean rename(String src, String dst) throws IOException;
/**
+ * moves blocks from srcs to trg and delete srcs
+ *
+ * @param trg existing file
+ * @param srcs - list of existing files (same block size, same replication)
+ * @throws IOException if some arguments are invalid
+ * @throws QuotaExceededException if the rename would violate
+ * any quota restriction
+ */
+ public void concat(String trg, String [] srcs) throws IOException;
+
+ /**
+ * Rename src to dst.
+ * <ul>
+ * <li>Fails if src is a file and dst is a directory.
+ * <li>Fails if src is a directory and dst is a file.
+ * <li>Fails if the parent of dst does not exist or is a file.
+ * </ul>
+ * <p>
+ * Without OVERWRITE option, rename fails if the dst already exists.
+ * With OVERWRITE option, rename overwrites the dst, if it is a file
+ * or an empty directory. Rename fails if dst is a non-empty directory.
+ * <p>
+ * This implementation of rename is atomic.
+ * <p>
+ * @param src existing file or directory name.
+ * @param dst new name.
+ * @param options Rename options
+ * @throws IOException if rename failed
+ */
+ public void rename(String src, String dst, Options.Rename... options)
+ throws IOException;
+
+ /**
* Delete the given file or directory from the file system.
* <p>
* Any blocks belonging to the deleted files will be garbage-collected.
@@ -250,6 +309,7 @@
*
* @param src The path of the directory being created
* @param masked The masked permission of the directory being created
+ * @param createParent create missing parent directory if true
* @return True if the operation success.
* @throws {@link AccessControlException} if permission to create file is
* denied by the system. As usually on the client side the exception will
@@ -257,7 +317,8 @@
* @throws QuotaExceededException if the operation would violate
* any quota restriction.
*/
- public boolean mkdirs(String src, FsPermission masked) throws IOException;
+ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
+ throws IOException;
/**
* Get a listing of the indicated directory
@@ -344,7 +405,7 @@
* percentage called threshold of blocks, which satisfy the minimal
* replication condition.
* The minimal replication condition is that each block must have at least
- * <tt>dfs.replication.min</tt> replicas.
+ * <tt>dfs.namenode.replication.min</tt> replicas.
* When the threshold is reached the name node extends safe mode
* for a configurable amount of time
* to let the remaining data nodes to check in before it
@@ -360,7 +421,7 @@
* <h4>Configuration parameters:</h4>
* <tt>dfs.safemode.threshold.pct</tt> is the threshold parameter.<br>
* <tt>dfs.safemode.extension</tt> is the safe mode extension parameter.<br>
- * <tt>dfs.replication.min</tt> is the minimal replication parameter.
+ * <tt>dfs.namenode.replication.min</tt> is the minimal replication parameter.
*
* <h4>Special cases:</h4>
* The name node does not enter safe mode at startup if the threshold is
@@ -488,4 +549,32 @@
* by this call.
*/
public void setTimes(String src, long mtime, long atime) throws IOException;
+
+ /**
+ * Get a new generation stamp together with an access token for
+ * a block under construction
+ *
+ * This method is called only when a client needs to recover a failed
+ * pipeline or set up a pipeline for appending to a block.
+ *
+ * @param block a block
+ * @param clientName the name of the client
+ * @return a located block with a new generation stamp and an access token
+ * @throws IOException if any error occurs
+ */
+ public LocatedBlock updateBlockForPipeline(Block block, String clientName)
+ throws IOException;
+
+ /**
+ * Update a pipeline for a block under construction
+ *
+ * @param clientName the name of the client
+ * @param oldBlock the old block
+ * @param newBlock the new block containing new generation stamp and length
+ * @param newNodes datanodes in the pipeline
+ * @throws IOException if any error occurs
+ */
+ public void updatePipeline(String clientName, Block oldBlock,
+ Block newBlock, DatanodeID[] newNodes)
+ throws IOException;
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Sat Nov 28 20:05:56 2009
@@ -24,8 +24,9 @@
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.io.WritableUtils;
/**
* Transfer data to/from datanode using a streaming protocol.
@@ -38,12 +39,12 @@
* when protocol changes. It is not very obvious.
*/
/*
- * Version 16:
- * Datanode now needs to send back a status code together
- * with firstBadLink during pipeline setup for dfs write
- * (only for DFSClients, not for other datanodes).
+ * Version 17:
+ * Change the block write protocol to support pipeline recovery.
+ * Additional fields, like recovery flags, new GS, minBytesRcvd,
+ * and maxBytesRcvd are included.
*/
- public static final int DATA_TRANSFER_VERSION = 16;
+ public static final int DATA_TRANSFER_VERSION = 17;
/** Operation */
public enum Op {
@@ -119,6 +120,55 @@
}
};
+ public enum BlockConstructionStage {
+ /** The enumerates are always listed as regular stage followed by the
+ * recovery stage.
+ * Changing this order will make getRecoveryStage not working.
+ */
+ // pipeline set up for block append
+ PIPELINE_SETUP_APPEND,
+ // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
+ PIPELINE_SETUP_APPEND_RECOVERY,
+ // data streaming
+ DATA_STREAMING,
+ // pipeline setup for failed data streaming recovery
+ PIPELINE_SETUP_STREAMING_RECOVERY,
+ // close the block and pipeline
+ PIPELINE_CLOSE,
+ // Recover a failed PIPELINE_CLOSE
+ PIPELINE_CLOSE_RECOVERY,
+ // pipeline set up for block creation
+ PIPELINE_SETUP_CREATE;
+
+ final static private byte RECOVERY_BIT = (byte)1;
+
+ /**
+ * get the recovery stage of this stage
+ */
+ public BlockConstructionStage getRecoveryStage() {
+ if (this == PIPELINE_SETUP_CREATE) {
+ throw new IllegalArgumentException( "Unexpected blockStage " + this);
+ } else {
+ return values()[ordinal()|RECOVERY_BIT];
+ }
+ }
+
+ private static BlockConstructionStage valueOf(byte code) {
+ return code < 0 || code >= values().length? null: values()[code];
+ }
+
+ /** Read from in */
+ private static BlockConstructionStage readFields(DataInput in)
+ throws IOException {
+ return valueOf(in.readByte());
+ }
+
+ /** write to out */
+ private void write(DataOutput out) throws IOException {
+ out.writeByte(ordinal());
+ }
+ }
+
/** @deprecated Deprecated at 0.21. Use Op.WRITE_BLOCK instead. */
@Deprecated
public static final byte OP_WRITE_BLOCK = Op.WRITE_BLOCK.code;
@@ -173,7 +223,7 @@
/** Send OP_READ_BLOCK */
public static void opReadBlock(DataOutputStream out,
long blockId, long blockGs, long blockOffset, long blockLen,
- String clientName, AccessToken accessToken) throws IOException {
+ String clientName, BlockAccessToken accessToken) throws IOException {
op(out, Op.READ_BLOCK);
out.writeLong(blockId);
@@ -187,15 +237,19 @@
/** Send OP_WRITE_BLOCK */
public static void opWriteBlock(DataOutputStream out,
- long blockId, long blockGs, int pipelineSize, boolean isRecovery,
- String client, DatanodeInfo src, DatanodeInfo[] targets,
- AccessToken accesstoken) throws IOException {
+ long blockId, long blockGs, int pipelineSize,
+ BlockConstructionStage stage, long newGs, long minBytesRcvd,
+ long maxBytesRcvd, String client, DatanodeInfo src,
+ DatanodeInfo[] targets, BlockAccessToken accesstoken) throws IOException {
op(out, Op.WRITE_BLOCK);
out.writeLong(blockId);
out.writeLong(blockGs);
out.writeInt(pipelineSize);
- out.writeBoolean(isRecovery);
+ stage.write(out);
+ WritableUtils.writeVLong(out, newGs);
+ WritableUtils.writeVLong(out, minBytesRcvd);
+ WritableUtils.writeVLong(out, maxBytesRcvd);
Text.writeString(out, client);
out.writeBoolean(src != null);
@@ -213,7 +267,7 @@
/** Send OP_REPLACE_BLOCK */
public static void opReplaceBlock(DataOutputStream out,
long blockId, long blockGs, String storageId, DatanodeInfo src,
- AccessToken accesstoken) throws IOException {
+ BlockAccessToken accesstoken) throws IOException {
op(out, Op.REPLACE_BLOCK);
out.writeLong(blockId);
@@ -226,7 +280,7 @@
/** Send OP_COPY_BLOCK */
public static void opCopyBlock(DataOutputStream out,
- long blockId, long blockGs, AccessToken accesstoken) throws IOException {
+ long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException {
op(out, Op.COPY_BLOCK);
out.writeLong(blockId);
@@ -237,7 +291,7 @@
/** Send OP_BLOCK_CHECKSUM */
public static void opBlockChecksum(DataOutputStream out,
- long blockId, long blockGs, AccessToken accesstoken) throws IOException {
+ long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException {
op(out, Op.BLOCK_CHECKSUM);
out.writeLong(blockId);
@@ -289,7 +343,7 @@
final long offset = in.readLong();
final long length = in.readLong();
final String client = Text.readString(in);
- final AccessToken accesstoken = readAccessToken(in);
+ final BlockAccessToken accesstoken = readAccessToken(in);
opReadBlock(in, blockId, blockGs, offset, length, client, accesstoken);
}
@@ -300,14 +354,18 @@
*/
protected abstract void opReadBlock(DataInputStream in,
long blockId, long blockGs, long offset, long length,
- String client, AccessToken accesstoken) throws IOException;
+ String client, BlockAccessToken accesstoken) throws IOException;
/** Receive OP_WRITE_BLOCK */
private void opWriteBlock(DataInputStream in) throws IOException {
final long blockId = in.readLong();
final long blockGs = in.readLong();
final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
- final boolean isRecovery = in.readBoolean(); // is this part of recovery?
+ final BlockConstructionStage stage =
+ BlockConstructionStage.readFields(in);
+ final long newGs = WritableUtils.readVLong(in);
+ final long minBytesRcvd = WritableUtils.readVLong(in);
+ final long maxBytesRcvd = WritableUtils.readVLong(in);
final String client = Text.readString(in); // working on behalf of this client
final DatanodeInfo src = in.readBoolean()? DatanodeInfo.read(in): null;
@@ -319,10 +377,10 @@
for (int i = 0; i < targets.length; i++) {
targets[i] = DatanodeInfo.read(in);
}
- final AccessToken accesstoken = readAccessToken(in);
+ final BlockAccessToken accesstoken = readAccessToken(in);
- opWriteBlock(in, blockId, blockGs, pipelineSize, isRecovery,
- client, src, targets, accesstoken);
+ opWriteBlock(in, blockId, blockGs, pipelineSize, stage,
+ newGs, minBytesRcvd, maxBytesRcvd, client, src, targets, accesstoken);
}
/**
@@ -330,9 +388,11 @@
* Write a block.
*/
protected abstract void opWriteBlock(DataInputStream in,
- long blockId, long blockGs, int pipelineSize, boolean isRecovery,
+ long blockId, long blockGs,
+ int pipelineSize, BlockConstructionStage stage,
+ long newGs, long minBytesRcvd, long maxBytesRcvd,
String client, DatanodeInfo src, DatanodeInfo[] targets,
- AccessToken accesstoken) throws IOException;
+ BlockAccessToken accesstoken) throws IOException;
/** Receive OP_REPLACE_BLOCK */
private void opReplaceBlock(DataInputStream in) throws IOException {
@@ -340,7 +400,7 @@
final long blockGs = in.readLong();
final String sourceId = Text.readString(in); // read del hint
final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
- final AccessToken accesstoken = readAccessToken(in);
+ final BlockAccessToken accesstoken = readAccessToken(in);
opReplaceBlock(in, blockId, blockGs, sourceId, src, accesstoken);
}
@@ -351,13 +411,13 @@
*/
protected abstract void opReplaceBlock(DataInputStream in,
long blockId, long blockGs, String sourceId, DatanodeInfo src,
- AccessToken accesstoken) throws IOException;
+ BlockAccessToken accesstoken) throws IOException;
/** Receive OP_COPY_BLOCK */
private void opCopyBlock(DataInputStream in) throws IOException {
final long blockId = in.readLong();
final long blockGs = in.readLong();
- final AccessToken accesstoken = readAccessToken(in);
+ final BlockAccessToken accesstoken = readAccessToken(in);
opCopyBlock(in, blockId, blockGs, accesstoken);
}
@@ -367,13 +427,13 @@
* It is used for balancing purpose; send to a proxy source.
*/
protected abstract void opCopyBlock(DataInputStream in,
- long blockId, long blockGs, AccessToken accesstoken) throws IOException;
+ long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException;
/** Receive OP_BLOCK_CHECKSUM */
private void opBlockChecksum(DataInputStream in) throws IOException {
final long blockId = in.readLong();
final long blockGs = in.readLong();
- final AccessToken accesstoken = readAccessToken(in);
+ final BlockAccessToken accesstoken = readAccessToken(in);
opBlockChecksum(in, blockId, blockGs, accesstoken);
}
@@ -383,12 +443,12 @@
* Get the checksum of a block
*/
protected abstract void opBlockChecksum(DataInputStream in,
- long blockId, long blockGs, AccessToken accesstoken) throws IOException;
+ long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException;
/** Read an AccessToken */
- static private AccessToken readAccessToken(DataInputStream in
+ static private BlockAccessToken readAccessToken(DataInputStream in
) throws IOException {
- final AccessToken t = new AccessToken();
+ final BlockAccessToken t = new BlockAccessToken();
t.readFields(in);
return t;
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Sat Nov 28 20:05:56 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
/************************************
* Some handy constants
@@ -27,7 +28,7 @@
public static int MIN_BLOCKS_FOR_WRITE = 5;
// Chunk the block Invalidate message
- public static final int BLOCK_INVALIDATE_CHUNK = 100;
+ public static final int BLOCK_INVALIDATE_CHUNK = 1000;
// Long that indicates "leave current quota unchanged"
public static final long QUOTA_DONT_SET = Long.MAX_VALUE;
@@ -48,11 +49,15 @@
public static int MAX_PATH_LENGTH = 8000;
public static int MAX_PATH_DEPTH = 1000;
- public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096);
+ public static final int BUFFER_SIZE = new HdfsConfiguration().getInt("io.file.buffer.size", 4096);
//Used for writing header etc.
public static final int SMALL_BUFFER_SIZE = Math.min(BUFFER_SIZE/2, 512);
//TODO mb@media-style.com: should be conf injected?
public static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
+ public static final int DEFAULT_BYTES_PER_CHECKSUM = 512;
+ public static final int DEFAULT_WRITE_PACKET_SIZE = 64 * 1024;
+ public static final short DEFAULT_REPLICATION_FACTOR = 3;
+ public static final int DEFAULT_FILE_BUFFER_SIZE = 4096;
public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;
public static final int SIZE_OF_INTEGER = Integer.SIZE / Byte.SIZE;
@@ -86,7 +91,7 @@
// Version is reflected in the data storage file.
// Versions are negative.
// Decrement LAYOUT_VERSION to define a new version.
- public static final int LAYOUT_VERSION = -19;
+ public static final int LAYOUT_VERSION = -22;
// Current version:
- // -19: Sticky bit
+ // -22: added new OP_CONCAT_DELETE
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Sat Nov 28 20:05:56 2009
@@ -17,8 +17,8 @@
*/
package org.apache.hadoop.hdfs.protocol;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
import org.apache.hadoop.io.*;
-import org.apache.hadoop.security.AccessToken;
import java.io.*;
@@ -44,7 +44,7 @@
// else false. If block has few corrupt replicas, they are filtered and
// their locations are not part of this object
private boolean corrupt;
- private AccessToken accessToken = new AccessToken();
+ private BlockAccessToken accessToken = new BlockAccessToken();
/**
*/
@@ -78,11 +78,11 @@
}
}
- public AccessToken getAccessToken() {
+ public BlockAccessToken getAccessToken() {
return accessToken;
}
- public void setAccessToken(AccessToken token) {
+ public void setAccessToken(BlockAccessToken token) {
this.accessToken = token;
}
@@ -145,4 +145,21 @@
locs[i].readFields(in);
}
}
+
+ /** Read LocatedBlock from in. */
+ public static LocatedBlock read(DataInput in) throws IOException {
+ final LocatedBlock lb = new LocatedBlock();
+ lb.readFields(in);
+ return lb;
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return getClass().getSimpleName() + "{" + b
+ + "; getBlockSize()=" + getBlockSize()
+ + "; corrupt=" + corrupt
+ + "; offset=" + offset
+ + "; locs=" + java.util.Arrays.asList(locs)
+ + "}";
+ }
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java Sat Nov 28 20:05:56 2009
@@ -36,6 +36,8 @@
private long fileLength;
private List<LocatedBlock> blocks; // array of blocks with prioritized locations
private boolean underConstruction;
+ private LocatedBlock lastLocatedBlock = null;
+ private boolean isLastBlockComplete = false;
LocatedBlocks() {
fileLength = 0;
@@ -43,11 +45,15 @@
underConstruction = false;
}
- public LocatedBlocks(long flength, List<LocatedBlock> blks, boolean isUnderConstuction) {
-
+ /** public Constructor */
+ public LocatedBlocks(long flength, boolean isUnderConstuction,
+ List<LocatedBlock> blks,
+ LocatedBlock lastBlock, boolean isLastBlockCompleted) {
fileLength = flength;
blocks = blks;
underConstruction = isUnderConstuction;
+ this.lastLocatedBlock = lastBlock;
+ this.isLastBlockComplete = isLastBlockCompleted;
}
/**
@@ -57,6 +63,16 @@
return blocks;
}
+ /** Get the last located block. */
+ public LocatedBlock getLastLocatedBlock() {
+ return lastLocatedBlock;
+ }
+
+ /** Is the last block completed? */
+ public boolean isLastBlockComplete() {
+ return isLastBlockComplete;
+ }
+
/**
* Get located block.
*/
@@ -161,6 +177,15 @@
public void write(DataOutput out) throws IOException {
out.writeLong(this.fileLength);
out.writeBoolean(underConstruction);
+
+ //write the last located block
+ final boolean isNull = lastLocatedBlock == null;
+ out.writeBoolean(isNull);
+ if (!isNull) {
+ lastLocatedBlock.write(out);
+ }
+ out.writeBoolean(isLastBlockComplete);
+
// write located blocks
int nrBlocks = locatedBlockCount();
out.writeInt(nrBlocks);
@@ -175,6 +200,14 @@
public void readFields(DataInput in) throws IOException {
this.fileLength = in.readLong();
underConstruction = in.readBoolean();
+
+ //read the last located block
+ final boolean isNull = in.readBoolean();
+ if (!isNull) {
+ lastLocatedBlock = LocatedBlock.read(in);
+ }
+ isLastBlockComplete = in.readBoolean();
+
// read located blocks
int nrBlocks = in.readInt();
this.blocks = new ArrayList<LocatedBlock>(nrBlocks);
@@ -184,4 +217,18 @@
this.blocks.add(blk);
}
}
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ final StringBuilder b = new StringBuilder(getClass().getSimpleName());
+ b.append("{")
+ .append("\n fileLength=").append(fileLength)
+ .append("\n underConstruction=").append(underConstruction)
+ .append("\n blocks=").append(blocks)
+ .append("\n lastLocatedBlock=").append(lastLocatedBlock)
+ .append("\n isLastBlockComplete=").append(isLastBlockComplete)
+ .append("}");
+ return b.toString();
+ }
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Sat Nov 28 20:05:56 2009
@@ -25,6 +25,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.lang.Class;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -61,9 +62,15 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.io.IOUtils;
@@ -74,9 +81,6 @@
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.security.AccessToken;
-import org.apache.hadoop.security.AccessTokenHandler;
-import org.apache.hadoop.security.ExportedAccessKeys;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
@@ -365,7 +369,7 @@
/* Send a block replace request to the output stream*/
private void sendRequest(DataOutputStream out) throws IOException {
- AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+ BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
if (isAccessTokenEnabled) {
accessToken = accessTokenHandler.generateToken(null, block.getBlock()
.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.REPLACE,
@@ -772,18 +776,31 @@
}
}
}
+
+ /* Check that this Balancer is compatible with the Block Placement Policy
+ * used by the Namenode.
+ */
+ private void checkReplicationPolicyCompatibility(Configuration conf) throws UnsupportedActionException {
+ if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() !=
+ BlockPlacementPolicyDefault.class) {
+ throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
+ }
+ }
/** Default constructor */
- Balancer() {
+ Balancer() throws UnsupportedActionException {
+ checkReplicationPolicyCompatibility(getConf());
}
/** Construct a balancer from the given configuration */
- Balancer(Configuration conf) {
+ Balancer(Configuration conf) throws UnsupportedActionException {
+ checkReplicationPolicyCompatibility(conf);
setConf(conf);
}
/** Construct a balancer from the given configuration and threshold */
- Balancer(Configuration conf, double threshold) {
+ Balancer(Configuration conf, double threshold) throws UnsupportedActionException {
+ checkReplicationPolicyCompatibility(conf);
setConf(conf);
this.threshold = threshold;
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java Sat Nov 28 20:05:56 2009
@@ -17,35 +17,36 @@
*/
package org.apache.hadoop.hdfs.server.common;
-import java.io.*;
-import org.apache.hadoop.io.*;
-
/****************************************************************
* A GenerationStamp is a Hadoop FS primitive, identified by a long.
****************************************************************/
-public class GenerationStamp implements WritableComparable<GenerationStamp> {
- public static final long WILDCARD_STAMP = 1;
+public class GenerationStamp implements Comparable<GenerationStamp> {
+ /**
+ * The first valid generation stamp.
+ */
public static final long FIRST_VALID_STAMP = 1000L;
- static { // register a ctor
- WritableFactories.setFactory
- (GenerationStamp.class,
- new WritableFactory() {
- public Writable newInstance() { return new GenerationStamp(0); }
- });
- }
+ /**
+ * Generation stamp of blocks that pre-date the introduction
+ * of a generation stamp.
+ */
+ public static final long GRANDFATHER_GENERATION_STAMP = 0;
- long genstamp;
+ private volatile long genstamp;
/**
* Create a new instance, initialized to FIRST_VALID_STAMP.
*/
- public GenerationStamp() {this(GenerationStamp.FIRST_VALID_STAMP);}
+ public GenerationStamp() {
+ this(GenerationStamp.FIRST_VALID_STAMP);
+ }
/**
* Create a new instance, initialized to the specified value.
*/
- GenerationStamp(long stamp) {this.genstamp = stamp;}
+ GenerationStamp(long stamp) {
+ this.genstamp = stamp;
+ }
/**
* Returns the current generation stamp
@@ -69,46 +70,22 @@
return this.genstamp;
}
- /////////////////////////////////////
- // Writable
- /////////////////////////////////////
- public void write(DataOutput out) throws IOException {
- out.writeLong(genstamp);
- }
-
- public void readFields(DataInput in) throws IOException {
- this.genstamp = in.readLong();
- if (this.genstamp < 0) {
- throw new IOException("Bad Generation Stamp: " + this.genstamp);
- }
- }
-
- /////////////////////////////////////
- // Comparable
- /////////////////////////////////////
- public static int compare(long x, long y) {
- return x < y? -1: x == y? 0: 1;
- }
-
- /** {@inheritDoc} */
+ @Override // Comparable
public int compareTo(GenerationStamp that) {
- return compare(this.genstamp, that.genstamp);
+ return this.genstamp < that.genstamp ? -1 :
+ this.genstamp > that.genstamp ? 1 : 0;
}
- /** {@inheritDoc} */
+ @Override // Object
public boolean equals(Object o) {
if (!(o instanceof GenerationStamp)) {
return false;
}
- return genstamp == ((GenerationStamp)o).genstamp;
- }
-
- public static boolean equalsWithWildcard(long x, long y) {
- return x == y || x == WILDCARD_STAMP || y == WILDCARD_STAMP;
+ return compareTo((GenerationStamp)o) == 0;
}
- /** {@inheritDoc} */
+ @Override // Object
public int hashCode() {
- return 37 * 17 + (int) (genstamp^(genstamp>>>32));
+ return (int) (genstamp^(genstamp>>>32));
}
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Sat Nov 28 20:05:56 2009
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hdfs.server.common;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
/************************************
* Some handy internal HDFS constants
*
@@ -80,5 +84,77 @@
return description;
}
}
+
+ /**
+ * Block replica states, which it can go through while being constructed.
+ */
+ static public enum ReplicaState {
+ /** Replica is finalized. The state when replica is not modified. */
+ FINALIZED(0),
+ /** Replica is being written to. */
+ RBW(1),
+ /** Replica is waiting to be recovered. */
+ RWR(2),
+ /** Replica is under recovery. */
+ RUR(3),
+ /** Temporary replica: created for replication and relocation only. */
+ TEMPORARY(4);
+
+ private int value;
+
+ private ReplicaState(int v) {
+ value = v;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ public static ReplicaState getState(int v) {
+ return ReplicaState.values()[v];
+ }
+
+ /** Read from in */
+ public static ReplicaState read(DataInput in) throws IOException {
+ return values()[in.readByte()];
+ }
+
+ /** Write to out */
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(ordinal());
+ }
+ }
+
+ /**
+ * States, which a block can go through while it is under construction.
+ */
+ static public enum BlockUCState {
+ /**
+ * Block construction completed.<br>
+ * The block has at least one {@link ReplicaState#FINALIZED} replica,
+ * and is not going to be modified.
+ */
+ COMPLETE,
+ /**
+ * The block is under construction.<br>
+ * It has been recently allocated for write or append.
+ */
+ UNDER_CONSTRUCTION,
+ /**
+ * The block is under recovery.<br>
+ * When a file lease expires its last block may not be {@link #COMPLETE}
+ * and needs to go through a recovery procedure,
+ * which synchronizes the existing replicas contents.
+ */
+ UNDER_RECOVERY,
+ /**
+ * The block is committed.<br>
+ * The client reported that all bytes are written to data-nodes
+ * with the given generation stamp and block length, but no
+ * {@link ReplicaState#FINALIZED}
+ * replicas has yet been reported by data-nodes themselves.
+ */
+ COMMITTED;
+ }
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Sat Nov 28 20:05:56 2009
@@ -36,18 +36,19 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessToken;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.VersionInfo;
public class JspHelper {
final static public String WEB_UGI_PROPERTY_NAME = "dfs.web.ugi";
- public static final Configuration conf = new Configuration();
+ public static final Configuration conf = new HdfsConfiguration();
public static final UnixUserGroupInformation webUGI
= UnixUserGroupInformation.createImmutable(
conf.getStrings(WEB_UGI_PROPERTY_NAME));
@@ -105,7 +106,7 @@
}
public static void streamBlockInAscii(InetSocketAddress addr, long blockId,
- AccessToken accessToken, long genStamp, long blockSize,
+ BlockAccessToken accessToken, long genStamp, long blockSize,
long offsetIntoBlock, long chunkSizeToView, JspWriter out)
throws IOException {
if (chunkSizeToView == 0) return;
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/Storage.java Sat Nov 28 20:05:56 2009
@@ -74,6 +74,9 @@
* any upgrade code that uses this constant should also be removed. */
public static final int PRE_GENERATIONSTAMP_LAYOUT_VERSION = -13;
+ // last layout version that did not support persistent rbw replicas
+ public static final int PRE_RBW_LAYOUT_VERSION = -19;
+
private static final String STORAGE_FILE_LOCK = "in_use.lock";
protected static final String STORAGE_FILE_VERSION = "VERSION";
public static final String STORAGE_DIR_CURRENT = "current";
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Sat Nov 28 20:05:56 2009
@@ -39,6 +39,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
@@ -55,7 +56,6 @@
static final Log ClientTraceLog = DataNode.ClientTraceLog;
private Block block; // the block to receive
- protected boolean finalized;
private DataInputStream in = null; // from where data are read
private DataChecksum checksum; // from where chunks of a block can be read
private OutputStream out = null; // to block file at local disk
@@ -65,7 +65,6 @@
private ByteBuffer buf; // contains one full packet.
private int bufRead; //amount of valid data in the buf
private int maxPacketReadLen;
- protected long offsetInBlock;
protected final String inAddr;
protected final String myAddr;
private String mirrorAddr;
@@ -73,46 +72,82 @@
private Daemon responder = null;
private BlockTransferThrottler throttler;
private FSDataset.BlockWriteStreams streams;
- private boolean isRecovery = false;
private String clientName;
DatanodeInfo srcDataNode = null;
private Checksum partialCrc = null;
private final DataNode datanode;
+ final private ReplicaInPipelineInterface replicaInfo;
BlockReceiver(Block block, DataInputStream in, String inAddr,
- String myAddr, boolean isRecovery, String clientName,
- DatanodeInfo srcDataNode, DataNode datanode) throws IOException {
+ String myAddr, BlockConstructionStage stage,
+ long newGs, long minBytesRcvd, long maxBytesRcvd,
+ String clientName, DatanodeInfo srcDataNode, DataNode datanode)
+ throws IOException {
try{
this.block = block;
this.in = in;
this.inAddr = inAddr;
this.myAddr = myAddr;
- this.isRecovery = isRecovery;
this.clientName = clientName;
- this.offsetInBlock = 0;
this.srcDataNode = srcDataNode;
this.datanode = datanode;
- this.checksum = DataChecksum.newDataChecksum(in);
- this.bytesPerChecksum = checksum.getBytesPerChecksum();
- this.checksumSize = checksum.getChecksumSize();
//
// Open local disk out
//
- streams = datanode.data.writeToBlock(block, isRecovery);
- this.finalized = datanode.data.isValidBlock(block);
+ if (clientName.length() == 0) { //replication or move
+ replicaInfo = datanode.data.createTemporary(block);
+ } else {
+ switch (stage) {
+ case PIPELINE_SETUP_CREATE:
+ replicaInfo = datanode.data.createRbw(block);
+ break;
+ case PIPELINE_SETUP_STREAMING_RECOVERY:
+ replicaInfo = datanode.data.recoverRbw(
+ block, newGs, minBytesRcvd, maxBytesRcvd);
+ block.setGenerationStamp(newGs);
+ break;
+ case PIPELINE_SETUP_APPEND:
+ replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
+ if (datanode.blockScanner != null) { // remove from block scanner
+ datanode.blockScanner.deleteBlock(block);
+ }
+ block.setGenerationStamp(newGs);
+ break;
+ case PIPELINE_SETUP_APPEND_RECOVERY:
+ replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
+ if (datanode.blockScanner != null) { // remove from block scanner
+ datanode.blockScanner.deleteBlock(block);
+ }
+ block.setGenerationStamp(newGs);
+ break;
+ default: throw new IOException("Unsupported stage " + stage +
+ " while receiving block " + block + " from " + inAddr);
+ }
+ }
+ // read checksum meta information
+ this.checksum = DataChecksum.newDataChecksum(in);
+ this.bytesPerChecksum = checksum.getBytesPerChecksum();
+ this.checksumSize = checksum.getChecksumSize();
+
+ boolean isCreate = stage == BlockConstructionStage.PIPELINE_SETUP_CREATE
+ || clientName.length() == 0;
+ streams = replicaInfo.createStreams(isCreate,
+ this.bytesPerChecksum, this.checksumSize);
if (streams != null) {
this.out = streams.dataOut;
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.checksumOut,
SMALL_BUFFER_SIZE));
- // If this block is for appends, then remove it from periodic
- // validation.
- if (datanode.blockScanner != null && isRecovery) {
- datanode.blockScanner.deleteBlock(block);
- }
+
+ // write data chunk header if creating a new replica
+ if (isCreate) {
+ BlockMetadataHeader.writeHeader(checksumOut, checksum);
+ }
}
- } catch (BlockAlreadyExistsException bae) {
+ } catch (ReplicaAlreadyExistsException bae) {
throw bae;
+ } catch (ReplicaNotFoundException bne) {
+ throw bne;
} catch(IOException ioe) {
IOUtils.closeStream(this);
cleanupBlock();
@@ -288,7 +323,7 @@
* It tries to read a full packet with single read call.
* Consecutive packets are usually of the same length.
*/
- private int readNextPacket() throws IOException {
+ private void readNextPacket() throws IOException {
/* This dances around buf a little bit, mainly to read
* full packet with single read and to accept arbitarary size
* for next packet at the same time.
@@ -324,12 +359,6 @@
int payloadLen = buf.getInt();
buf.reset();
- if (payloadLen == 0) {
- //end of stream!
- buf.limit(buf.position() + SIZE_OF_INTEGER);
- return 0;
- }
-
// check corrupt values for pktLen, 100MB upper limit should be ok?
if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
throw new IOException("Incorrect value for packet payload : " +
@@ -369,42 +398,68 @@
if (pktSize > maxPacketReadLen) {
maxPacketReadLen = pktSize;
}
-
- return payloadLen;
}
/**
* Receives and processes a packet. It can contain many chunks.
- * returns size of the packet.
+ * returns the number of data bytes that the packet has.
*/
private int receivePacket() throws IOException {
-
- int payloadLen = readNextPacket();
-
- if (payloadLen <= 0) {
- return payloadLen;
- }
+ // read the next packet
+ readNextPacket();
buf.mark();
//read the header
buf.getInt(); // packet length
- offsetInBlock = buf.getLong(); // get offset of packet in block
+ long offsetInBlock = buf.getLong(); // get offset of packet in block
+
+ if (offsetInBlock > replicaInfo.getNumBytes()) {
+ throw new IOException("Received an out-of-sequence packet for " + block +
+ "from " + inAddr + " at offset " + offsetInBlock +
+ ". Expecting packet starting at " + replicaInfo.getNumBytes());
+ }
long seqno = buf.getLong(); // get seqno
boolean lastPacketInBlock = (buf.get() != 0);
+ int len = buf.getInt();
+ if (len < 0) {
+ throw new IOException("Got wrong length during writeBlock(" + block +
+ ") from " + inAddr + " at offset " +
+ offsetInBlock + ": " + len);
+ }
int endOfHeader = buf.position();
buf.reset();
+ return receivePacket(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader);
+ }
+
+ /**
+ * Receives and processes a packet. It can contain many chunks.
+ * returns the number of data bytes that the packet has.
+ */
+ private int receivePacket(long offsetInBlock, long seqno,
+ boolean lastPacketInBlock, int len, int endOfHeader) throws IOException {
if (LOG.isDebugEnabled()){
LOG.debug("Receiving one packet for block " + block +
- " of length " + payloadLen +
+ " of length " + len +
" seqno " + seqno +
" offsetInBlock " + offsetInBlock +
" lastPacketInBlock " + lastPacketInBlock);
}
- setBlockPosition(offsetInBlock);
+ // update received bytes
+ long firstByteInBlock = offsetInBlock;
+ offsetInBlock += len;
+ if (replicaInfo.getNumBytes() < offsetInBlock) {
+ replicaInfo.setNumBytes(offsetInBlock);
+ }
+ // put in queue for pending acks
+ if (responder != null) {
+ ((PacketResponder)responder.getRunnable()).enqueue(seqno,
+ lastPacketInBlock, offsetInBlock);
+ }
+
//First write the packet to the mirror:
if (mirrorOut != null) {
try {
@@ -416,25 +471,19 @@
}
buf.position(endOfHeader);
- int len = buf.getInt();
- if (len < 0) {
- throw new IOException("Got wrong length during writeBlock(" + block +
- ") from " + inAddr + " at offset " +
- offsetInBlock + ": " + len);
- }
-
- if (len == 0) {
- LOG.debug("Receiving empty packet for block " + block);
+ if (lastPacketInBlock || len == 0) {
+ LOG.debug("Receiving an empty packet or the end of the block " + block);
} else {
- offsetInBlock += len;
-
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
checksumSize;
if ( buf.remaining() != (checksumLen + len)) {
- throw new IOException("Data remaining in packet does not match " +
- "sum of checksumLen and dataLen");
+ throw new IOException("Data remaining in packet does not match" +
+ "sum of checksumLen and dataLen " +
+ " size remaining: " + buf.remaining() +
+ " data len: " + len +
+ " checksum Len: " + checksumLen);
}
int checksumOff = buf.position();
int dataOff = checksumOff + checksumLen;
@@ -454,9 +503,29 @@
}
try {
- if (!finalized) {
+ long onDiskLen = replicaInfo.getBytesOnDisk();
+ if (onDiskLen<offsetInBlock) {
//finally write to the disk :
- out.write(pktBuf, dataOff, len);
+
+ if (onDiskLen % bytesPerChecksum != 0) {
+ // prepare to overwrite last checksum
+ adjustCrcFilePosition();
+ }
+
+ // If this is a partial chunk, then read in pre-existing checksum
+ if (firstByteInBlock % bytesPerChecksum != 0) {
+ LOG.info("Packet starts at " + firstByteInBlock +
+ " for block " + block +
+ " which is not a multiple of bytesPerChecksum " +
+ bytesPerChecksum);
+ long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
+ onDiskLen / bytesPerChecksum * checksumSize;
+ computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
+ }
+
+ int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
+ int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
+ out.write(pktBuf, startByteToDisk, numBytesToDisk);
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk.
@@ -468,7 +537,7 @@
" len = " + len +
" bytesPerChecksum " + bytesPerChecksum);
}
- partialCrc.update(pktBuf, dataOff, len);
+ partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk);
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
checksumOut.write(buf);
LOG.debug("Writing out partial crc for data len " + len);
@@ -476,7 +545,10 @@
} else {
checksumOut.write(pktBuf, checksumOff, checksumLen);
}
+ replicaInfo.setBytesOnDisk(offsetInBlock);
datanode.myMetrics.bytesWritten.inc(len);
+ /// flush entire packet
+ flush();
}
} catch (IOException iex) {
datanode.checkDiskError(iex);
@@ -484,20 +556,11 @@
}
}
- /// flush entire packet before sending ack
- flush();
-
- // put in queue for pending acks
- if (responder != null) {
- ((PacketResponder)responder.getRunnable()).enqueue(seqno,
- lastPacketInBlock);
- }
-
if (throttler != null) { // throttle I/O
- throttler.throttle(payloadLen);
+ throttler.throttle(len);
}
- return payloadLen;
+ return len;
}
void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -518,10 +581,6 @@
throttler = throttlerArg;
try {
- // write data chunk header
- if (!finalized) {
- BlockMetadataHeader.writeHeader(checksumOut, checksum);
- }
if (clientName.length() > 0) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(this, block, mirrIn,
@@ -530,20 +589,10 @@
}
/*
- * Receive until packet length is zero.
+ * Receive until packet has zero bytes of data.
*/
while (receivePacket() > 0) {}
- // flush the mirror out
- if (mirrorOut != null) {
- try {
- mirrorOut.writeInt(0); // mark the end of the block
- mirrorOut.flush();
- } catch (IOException e) {
- handleMirrorOutError(e);
- }
- }
-
// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
// Mark that responder has been closed for future processing
@@ -560,7 +609,7 @@
close();
// Finalize the block. Does this fsync()?
- block.setNumBytes(offsetInBlock);
+ block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
datanode.myMetrics.blocksWritten.inc();
}
@@ -598,29 +647,10 @@
}
/**
- * Sets the file pointer in the local block file to the specified value.
+ * Adjust the file pointer in the local meta file so that the last checksum
+ * will be overwritten.
*/
- private void setBlockPosition(long offsetInBlock) throws IOException {
- if (finalized) {
- if (!isRecovery) {
- throw new IOException("Write to offset " + offsetInBlock +
- " of block " + block +
- " that is already finalized.");
- }
- if (offsetInBlock > datanode.data.getLength(block)) {
- throw new IOException("Write to offset " + offsetInBlock +
- " of block " + block +
- " that is already finalized and is of size " +
- datanode.data.getLength(block));
- }
- return;
- }
-
- if (datanode.data.getChannelPosition(block, streams) == offsetInBlock) {
- return; // nothing to do
- }
- long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
- offsetInBlock / bytesPerChecksum * checksumSize;
+ private void adjustCrcFilePosition() throws IOException {
if (out != null) {
out.flush();
}
@@ -628,23 +658,8 @@
checksumOut.flush();
}
- // If this is a partial chunk, then read in pre-existing checksum
- if (offsetInBlock % bytesPerChecksum != 0) {
- LOG.info("setBlockPosition trying to set position to " +
- offsetInBlock +
- " for block " + block +
- " which is not a multiple of bytesPerChecksum " +
- bytesPerChecksum);
- computePartialChunkCrc(offsetInBlock, offsetInChecksum, bytesPerChecksum);
- }
-
- LOG.info("Changing block file offset of block " + block + " from " +
- datanode.data.getChannelPosition(block, streams) +
- " to " + offsetInBlock +
- " meta file offset to " + offsetInChecksum);
-
- // set the position of the block file
- datanode.data.setChannelPosition(block, streams, offsetInBlock, offsetInChecksum);
+ // rollback the position of the meta file
+ datanode.data.adjustCrcChannelPosition(block, streams, checksumSize);
}
/**
@@ -732,12 +747,13 @@
* enqueue the seqno that is still be to acked by the downstream datanode.
* @param seqno
* @param lastPacketInBlock
+ * @param lastByteInPacket
*/
- synchronized void enqueue(long seqno, boolean lastPacketInBlock) {
+ synchronized void enqueue(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
if (running) {
LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
" to ack queue.");
- ackQueue.addLast(new Packet(seqno, lastPacketInBlock));
+ ackQueue.addLast(new Packet(seqno, lastPacketInBlock, lastByteInPacket));
notifyAll();
}
}
@@ -798,9 +814,8 @@
if (!running || !datanode.shouldRun) {
break;
}
- Packet pkt = ackQueue.removeFirst();
+ Packet pkt = ackQueue.getFirst();
long expected = pkt.seqno;
- notifyAll();
LOG.debug("PacketResponder " + numTargets +
" for block " + block +
" acking for packet " + expected);
@@ -808,33 +823,34 @@
// If this is the last packet in block, then close block
// file and finalize the block before responding success
if (pkt.lastPacketInBlock) {
- if (!receiver.finalized) {
- receiver.close();
- final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
- block.setNumBytes(receiver.offsetInBlock);
- datanode.data.finalizeBlock(block);
- datanode.myMetrics.blocksWritten.inc();
- datanode.notifyNamenodeReceivedBlock(block,
- DataNode.EMPTY_DEL_HINT);
- if (ClientTraceLog.isInfoEnabled() &&
- receiver.clientName.length() > 0) {
- long offset = 0;
- ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
- receiver.inAddr, receiver.myAddr, block.getNumBytes(),
- "HDFS_WRITE", receiver.clientName, offset,
- datanode.dnRegistration.getStorageID(), block, endTime-startTime));
- } else {
- LOG.info("Received block " + block +
- " of size " + block.getNumBytes() +
- " from " + receiver.inAddr);
- }
+ receiver.close();
+ final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
+ block.setNumBytes(replicaInfo.getNumBytes());
+ datanode.data.finalizeBlock(block);
+ datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+ if (ClientTraceLog.isInfoEnabled() &&
+ receiver.clientName.length() > 0) {
+ long offset = 0;
+ ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
+ receiver.inAddr, receiver.myAddr, block.getNumBytes(),
+ "HDFS_WRITE", receiver.clientName, offset,
+ datanode.dnRegistration.getStorageID(), block, endTime-startTime));
+ } else {
+ LOG.info("Received block " + block +
+ " of size " + block.getNumBytes() +
+ " from " + receiver.inAddr);
}
lastPacket = true;
}
- replyOut.writeLong(expected);
- SUCCESS.write(replyOut);
+ ackReply(expected);
replyOut.flush();
+ // remove the packet from the ack queue
+ removeAckHead();
+ // update the bytes acked
+ if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+ replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+ }
} catch (Exception e) {
LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
if (running) {
@@ -854,6 +870,14 @@
" for block " + block + " terminating");
}
+ // This method is introduced to facilitate testing. Otherwise
+ // there was a little chance to bind an AspectJ advice to such a sequence
+ // of calls
+ private void ackReply(long expected) throws IOException {
+ replyOut.writeLong(expected);
+ SUCCESS.write(replyOut);
+ }
+
/**
* Thread to process incoming acks.
* @see java.lang.Runnable#run()
@@ -870,9 +894,11 @@
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (running && datanode.shouldRun && !lastPacketInBlock) {
+ boolean isInterrupted = false;
try {
DataTransferProtocol.Status op = SUCCESS;
boolean didRead = false;
+ Packet pkt = null;
long expected = -2;
try {
// read seqno from downstream datanode
@@ -888,7 +914,6 @@
} else {
LOG.debug("PacketResponder " + numTargets + " got seqno = " +
seqno);
- Packet pkt = null;
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
@@ -897,11 +922,15 @@
" for block " + block +
" waiting for local datanode to finish write.");
}
- wait();
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ isInterrupted = true;
+ throw e;
+ }
}
- pkt = ackQueue.removeFirst();
+ pkt = ackQueue.getFirst();
expected = pkt.seqno;
- notifyAll();
LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
if (seqno != expected) {
throw new IOException("PacketResponder " + numTargets +
@@ -920,7 +949,7 @@
}
}
- if (Thread.interrupted()) {
+ if (Thread.interrupted() || isInterrupted) {
/* The receiver thread cancelled this thread.
* We could also check any other status updates from the
* receiver thread (e.g. if it is ok to write to replyOut).
@@ -941,14 +970,12 @@
// If this is the last packet in block, then close block
// file and finalize the block before responding success
- if (lastPacketInBlock && !receiver.finalized) {
+ if (lastPacketInBlock) {
receiver.close();
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
- block.setNumBytes(receiver.offsetInBlock);
+ block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
- datanode.myMetrics.blocksWritten.inc();
- datanode.notifyNamenodeReceivedBlock(block,
- DataNode.EMPTY_DEL_HINT);
+ datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
if (ClientTraceLog.isInfoEnabled() &&
receiver.clientName.length() > 0) {
long offset = 0;
@@ -964,20 +991,21 @@
}
// send my status back to upstream datanode
- replyOut.writeLong(expected); // send seqno upstream
- SUCCESS.write(replyOut);
+ ackReply(expected);
LOG.debug("PacketResponder " + numTargets +
" for block " + block +
" responded my status " +
" for seqno " + expected);
+ boolean success = true;
// forward responses from downstream datanodes.
for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
try {
if (op == SUCCESS) {
op = Status.read(mirrorIn);
if (op != SUCCESS) {
+ success = false;
LOG.debug("PacketResponder for block " + block +
": error code received from downstream " +
" datanode[" + i + "] " + op);
@@ -985,13 +1013,23 @@
}
} catch (Throwable e) {
op = ERROR;
+ success = false;
}
op.write(replyOut);
}
replyOut.flush();
+
LOG.debug("PacketResponder " + block + " " + numTargets +
" responded other status " + " for seqno " + expected);
+ if (pkt != null) {
+ // remove the packet from the ack queue
+ removeAckHead();
+ // update bytes acked
+ if (success && pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+ replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+ }
+ }
// If we were unable to read the seqno from downstream, then stop.
if (expected == -2) {
running = false;
@@ -1025,6 +1063,16 @@
LOG.info("PacketResponder " + numTargets +
" for block " + block + " terminating");
}
+
+ /**
+ * Remove a packet from the head of the ack queue
+ *
+ * This should be called only when the ack queue is not empty
+ */
+ private synchronized void removeAckHead() {
+ ackQueue.removeFirst();
+ notifyAll();
+ }
}
/**
@@ -1033,10 +1081,12 @@
static private class Packet {
long seqno;
boolean lastPacketInBlock;
+ long lastByteInBlock;
- Packet(long seqno, boolean lastPacketInBlock) {
+ Packet(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
this.seqno = seqno;
this.lastPacketInBlock = lastPacketInBlock;
+ this.lastByteInBlock = lastByteInPacket;
}
}
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Sat Nov 28 20:05:56 2009
@@ -46,13 +46,18 @@
static final Log ClientTraceLog = DataNode.ClientTraceLog;
private Block block; // the block to read from
+
+ /** the replica to read from */
+ private final Replica replica;
+ /** The visible length of a replica. */
+ private final long replicaVisibleLength;
+
private InputStream blockIn; // data stream
private long blockInPosition = -1; // updated while using transferTo().
private DataInputStream checksumIn; // checksum datastream
private DataChecksum checksum; // checksum stream
private long offset; // starting position to read
private long endOffset; // ending position
- private long blockLength;
private int bytesPerChecksum; // chunk size
private int checksumSize; // checksum size
private boolean corruptChecksumOk; // if need to verify checksum
@@ -86,10 +91,29 @@
throws IOException {
try {
this.block = block;
+ synchronized(datanode.data) {
+ this.replica = datanode.data.getReplica(block.getBlockId());
+ if (replica == null) {
+ throw new ReplicaNotFoundException(block);
+ }
+ this.replicaVisibleLength = replica.getVisibleLength();
+ }
+ if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+ throw new IOException(
+ "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+ + block + ", replica=" + replica);
+ }
+ if (replicaVisibleLength < 0) {
+ throw new IOException("The replica is not readable, block="
+ + block + ", replica=" + replica);
+ }
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("block=" + block + ", replica=" + replica);
+ }
+
this.chunkOffsetOK = chunkOffsetOK;
this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum;
- this.blockLength = datanode.data.getLength(block);
this.transferToAllowed = datanode.transferToAllowed;
this.clientTraceFmt = clientTraceFmt;
@@ -119,18 +143,18 @@
* blockLength.
*/
bytesPerChecksum = checksum.getBytesPerChecksum();
- if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
+ if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > replicaVisibleLength) {
checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
- Math.max((int)blockLength, 10*1024*1024));
+ Math.max((int)replicaVisibleLength, 10*1024*1024));
bytesPerChecksum = checksum.getBytesPerChecksum();
}
checksumSize = checksum.getChecksumSize();
if (length < 0) {
- length = blockLength;
+ length = replicaVisibleLength;
}
- endOffset = blockLength;
+ endOffset = replicaVisibleLength;
if (startOffset < 0 || startOffset > endOffset
|| (length + startOffset) > endOffset) {
String msg = " Offset " + startOffset + " and length " + length
@@ -163,6 +187,18 @@
}
seqno = 0;
+ //sleep a few times if getBytesOnDisk() < visible length
+ for(int i = 0; i < 30 && replica.getBytesOnDisk() < replicaVisibleLength; i++) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ }
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("replica=" + replica);
+ }
+
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
} catch (IOException ioe) {
IOUtils.closeStream(this);
@@ -234,10 +270,6 @@
int len = Math.min((int) (endOffset - offset),
bytesPerChecksum*maxChunks);
- if (len == 0) {
- return 0;
- }
-
int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
int packetLen = len + numChunks*checksumSize + 4;
pkt.clear();
@@ -246,7 +278,7 @@
pkt.putInt(packetLen);
pkt.putLong(offset);
pkt.putLong(seqno);
- pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
+ pkt.put((byte)((len == 0) ? 1 : 0));
//why no ByteBuf.putBoolean()?
pkt.putInt(len);
@@ -407,7 +439,8 @@
seqno++;
}
try {
- out.writeInt(0); // mark the end of block
+ // send an empty packet to mark the end of the block
+ sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
@@ -420,7 +453,7 @@
close();
}
- blockReadFully = (initialOffset == 0 && offset >= blockLength);
+ blockReadFully = initialOffset == 0 && offset >= replicaVisibleLength;
return totalRead;
}