You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2010/09/23 19:14:21 UTC
svn commit: r1000541 - in /hadoop/hdfs/branches/HDFS-1052: ./
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/balancer/
src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache...
Author: suresh
Date: Thu Sep 23 17:14:20 2010
New Revision: 1000541
URL: http://svn.apache.org/viewvc?rev=1000541&view=rev
Log:
HDFS-1400. HDFS federation: DataTransferProtocol uses ExtendedBlockPool to include BlockPoolID in the protocol. Contributed by Suresh Srinivas.
Modified:
hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/BlockReader.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java
Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Thu Sep 23 17:14:20 2010
@@ -37,11 +37,15 @@ Trunk (unreleased changes)
HDFS-1361. Add -fileStatus operation to NNThroughputBenchmark. (shv)
- HDFS-1365.HDFS federation: propose ClusterID and BlockPoolID format (tanping via boryas)
+ HDFS-1365.HDFS federation: propose ClusterID and BlockPoolID format
+ (tanping via boryas)
HDFS-1394. modify -format option for namenode to generated new blockpool id
and accept newcluster (boryas)
+ HDFS-1400. HDFS federation: DataTransferProtocol uses ExtendedBlockPool to
+ include BlockPoolID in the protocol. (suresh)
+
IMPROVEMENTS
HDFS-1096. fix for prev. commit. (boryas)
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/BlockReader.java Thu Sep 23 17:14:20 2010
@@ -35,7 +35,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@@ -349,7 +349,7 @@ public class BlockReader extends FSInput
}
public static BlockReader newBlockReader(Socket sock, String file,
- Block block, Token<BlockTokenIdentifier> blockToken,
+ ExtendedBlock block, Token<BlockTokenIdentifier> blockToken,
long startOffset, long len, int bufferSize) throws IOException {
return newBlockReader(sock, file, block, blockToken, startOffset, len, bufferSize,
true);
@@ -357,7 +357,7 @@ public class BlockReader extends FSInput
/** Java Doc required */
public static BlockReader newBlockReader( Socket sock, String file,
- Block block,
+ ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum)
@@ -367,7 +367,7 @@ public class BlockReader extends FSInput
}
public static BlockReader newBlockReader( Socket sock, String file,
- Block block,
+ ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
@@ -394,14 +394,14 @@ public class BlockReader extends FSInput
"Got access token error for OP_READ_BLOCK, self="
+ sock.getLocalSocketAddress() + ", remote="
+ sock.getRemoteSocketAddress() + ", for file " + file
- + ", for block " + block.getBlockId()
- + "_" + block.getGenerationStamp());
+ + ", for pool " + block.getPoolId() + " block "
+ + block.getBlockId() + "_" + block.getGenerationStamp());
} else {
throw new IOException("Got error for OP_READ_BLOCK, self="
+ sock.getLocalSocketAddress() + ", remote="
+ sock.getRemoteSocketAddress() + ", for file " + file
- + ", for block " + block.getBlockId() + "_"
- + block.getGenerationStamp());
+ + ", for pool " + block.getPoolId() + " block "
+ + block.getBlockId() + "_" + block.getGenerationStamp());
}
}
DataChecksum checksum = DataChecksum.newDataChecksum( in );
@@ -417,6 +417,7 @@ public class BlockReader extends FSInput
startOffset + " for file " + file);
}
+ // TODO:FEDERATION use poolId
return new BlockReader(file, block.getBlockId(), in, checksum,
verifyChecksum, startOffset, firstChunkOffset, len, sock);
}
@@ -453,9 +454,15 @@ public class BlockReader extends FSInput
}
}
- // File name to print when accessing a block directory from servlets
+ /**
+ * File name to print when accessing a block directly (from servlets)
+ * @param s Address of the block location
+ * @param poolId Block pool ID of the block
+ * @param blockId Block ID of the block
+ * @return string that has a file name for debug purposes
+ */
public static String getFileName(final InetSocketAddress s,
- final long blockId) {
- return s.toString() + ":" + blockId;
+ final String poolId, final long blockId) {
+ return s.toString() + ":" + poolId + ":" + blockId;
}
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java Thu Sep 23 17:14:20 2010
@@ -967,7 +967,7 @@ public class DFSClient implements FSCons
+ BLOCK_CHECKSUM + ", block=" + block);
}
// get block MD5
- DataTransferProtocol.Sender.opBlockChecksum(out, block.getLocalBlock(),
+ DataTransferProtocol.Sender.opBlockChecksum(out, block,
lb.getBlockToken());
final DataTransferProtocol.Status reply = DataTransferProtocol.Status.read(in);
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Thu Sep 23 17:14:20 2010
@@ -387,7 +387,7 @@ public class DFSInputStream extends FSIn
ExtendedBlock blk = targetBlock.getBlock();
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
- blockReader = BlockReader.newBlockReader(s, src, blk.getLocalBlock(),
+ blockReader = BlockReader.newBlockReader(s, src, blk,
accessToken,
offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
buffersize, verifyChecksum, dfsClient.clientName);
@@ -629,7 +629,7 @@ public class DFSInputStream extends FSIn
int len = (int) (end - start + 1);
reader = BlockReader.newBlockReader(dn, src,
- block.getBlock().getLocalBlock(),
+ block.getBlock(),
blockToken,
start, len, buffersize,
verifyChecksum, dfsClient.clientName);
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Sep 23 17:14:20 2010
@@ -892,7 +892,7 @@ class DFSOutputStream extends FSOutputSu
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
// send the request
- DataTransferProtocol.Sender.opWriteBlock(out, block.getLocalBlock(),
+ DataTransferProtocol.Sender.opWriteBlock(out, block,
nodes.length, recoveryFlag ? stage.getRecoveryStage() : stage, newGS,
block.getNumBytes(), bytesSent, dfsClient.clientName, null, nodes,
accessToken);
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Thu Sep 23 17:14:20 2010
@@ -46,9 +46,9 @@ public interface DataTransferProtocol {
* when protocol changes. It is not very obvious.
*/
/*
- * Version 19:
- * Change the block packet ack protocol to include seqno,
- * numberOfReplies, reply0, reply1, ...
+ * Version 20:
+ * Changed the protocol methods to use ExtendedBlock instead
+ * of Block.
*/
public static final int DATA_TRANSFER_VERSION = 19;
@@ -229,7 +229,7 @@ public interface DataTransferProtocol {
}
/** Send OP_READ_BLOCK */
- public static void opReadBlock(DataOutputStream out, Block blk,
+ public static void opReadBlock(DataOutputStream out, ExtendedBlock blk,
long blockOffset, long blockLen, String clientName,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
@@ -244,7 +244,7 @@ public interface DataTransferProtocol {
}
/** Send OP_WRITE_BLOCK */
- public static void opWriteBlock(DataOutputStream out, Block blk,
+ public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk,
int pipelineSize, BlockConstructionStage stage, long newGs,
long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
@@ -273,7 +273,7 @@ public interface DataTransferProtocol {
/** Send OP_REPLACE_BLOCK */
public static void opReplaceBlock(DataOutputStream out,
- Block blk, String storageId, DatanodeInfo src,
+ ExtendedBlock blk, String storageId, DatanodeInfo src,
Token<BlockTokenIdentifier> blockToken) throws IOException {
op(out, Op.REPLACE_BLOCK);
@@ -285,7 +285,7 @@ public interface DataTransferProtocol {
}
/** Send OP_COPY_BLOCK */
- public static void opCopyBlock(DataOutputStream out, Block blk,
+ public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
op(out, Op.COPY_BLOCK);
@@ -296,7 +296,7 @@ public interface DataTransferProtocol {
}
/** Send OP_BLOCK_CHECKSUM */
- public static void opBlockChecksum(DataOutputStream out, Block blk,
+ public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
op(out, Op.BLOCK_CHECKSUM);
@@ -346,7 +346,7 @@ public interface DataTransferProtocol {
/** Receive OP_READ_BLOCK */
private void opReadBlock(DataInputStream in) throws IOException {
- final Block blk = new Block();
+ final ExtendedBlock blk = new ExtendedBlock();
blk.readId(in);
final long offset = in.readLong();
final long length = in.readLong();
@@ -359,13 +359,13 @@ public interface DataTransferProtocol {
/**
* Abstract OP_READ_BLOCK method. Read a block.
*/
- protected abstract void opReadBlock(DataInputStream in, Block blk,
+ protected abstract void opReadBlock(DataInputStream in, ExtendedBlock blk,
long offset, long length, String client,
Token<BlockTokenIdentifier> blockToken) throws IOException;
/** Receive OP_WRITE_BLOCK */
private void opWriteBlock(DataInputStream in) throws IOException {
- final Block blk = new Block();
+ final ExtendedBlock blk = new ExtendedBlock();
blk.readId(in);
final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
final BlockConstructionStage stage =
@@ -394,7 +394,7 @@ public interface DataTransferProtocol {
* Abstract OP_WRITE_BLOCK method.
* Write a block.
*/
- protected abstract void opWriteBlock(DataInputStream in, Block blk,
+ protected abstract void opWriteBlock(DataInputStream in, ExtendedBlock blk,
int pipelineSize, BlockConstructionStage stage, long newGs,
long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
@@ -402,7 +402,7 @@ public interface DataTransferProtocol {
/** Receive OP_REPLACE_BLOCK */
private void opReplaceBlock(DataInputStream in) throws IOException {
- final Block blk = new Block();
+ final ExtendedBlock blk = new ExtendedBlock();
blk.readId(in);
final String sourceId = Text.readString(in); // read del hint
final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
@@ -416,12 +416,12 @@ public interface DataTransferProtocol {
* It is used for balancing purpose; send to a destination
*/
protected abstract void opReplaceBlock(DataInputStream in,
- Block blk, String sourceId, DatanodeInfo src,
+ ExtendedBlock blk, String sourceId, DatanodeInfo src,
Token<BlockTokenIdentifier> blockToken) throws IOException;
/** Receive OP_COPY_BLOCK */
private void opCopyBlock(DataInputStream in) throws IOException {
- final Block blk = new Block();
+ final ExtendedBlock blk = new ExtendedBlock();
blk.readId(in);
final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
@@ -432,13 +432,13 @@ public interface DataTransferProtocol {
* Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
* a proxy source.
*/
- protected abstract void opCopyBlock(DataInputStream in, Block blk,
+ protected abstract void opCopyBlock(DataInputStream in, ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken)
throws IOException;
/** Receive OP_BLOCK_CHECKSUM */
private void opBlockChecksum(DataInputStream in) throws IOException {
- final Block blk = new Block();
+ final ExtendedBlock blk = new ExtendedBlock();
blk.readId(in);
final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
@@ -450,7 +450,7 @@ public interface DataTransferProtocol {
* Get the checksum of a block
*/
protected abstract void opBlockChecksum(DataInputStream in,
- Block blk, Token<BlockTokenIdentifier> blockToken)
+ ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
throws IOException;
/** Read an AccessToken */
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java Thu Sep 23 17:14:20 2010
@@ -79,6 +79,18 @@ public class ExtendedBlock implements Wr
block.readHelper(in);
}
+ // Write only the identifier part of the block
+ public void writeId(DataOutput out) throws IOException {
+ DeprecatedUTF8.writeString(out, poolId);
+ block.writeId(out);
+ }
+
+ // Read only the identifier part of the block
+ public void readId(DataInput in) throws IOException {
+ this.poolId = DeprecatedUTF8.readString(in);
+ block.readId(in);
+ }
+
public String getPoolId() {
return poolId;
}
@@ -110,6 +122,11 @@ public class ExtendedBlock implements Wr
public void setNumBytes(final long len) {
block.setNumBytes(len);
}
+
+ public void set(String poolId, long blkid, long gs, long len) {
+ this.poolId = poolId;
+ block.set(blkid, gs, len);
+ }
public static Block getLocalBlock(final ExtendedBlock b) {
return b == null ? null : b.getLocalBlock();
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Thu Sep 23 17:14:20 2010
@@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -376,8 +377,9 @@ public class Balancer implements Tool {
.getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
BlockTokenSecretManager.AccessMode.COPY));
}
+ // TODO:FEDERATION use ExtendedBlock in BalancerBlock
DataTransferProtocol.Sender.opReplaceBlock(out,
- block.getBlock(), source.getStorageID(),
+ new ExtendedBlock(block.getBlock()), source.getStorageID(),
proxySource.getDatanode(), accessToken);
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Thu Sep 23 17:14:20 2010
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -180,7 +181,7 @@ public class JspHelper {
return chosenNode;
}
- public static void streamBlockInAscii(InetSocketAddress addr,
+ public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
long blockSize, long offsetIntoBlock, long chunkSizeToView,
JspWriter out, Configuration conf) throws IOException {
@@ -192,9 +193,9 @@ public class JspHelper {
long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
// Use the block name for file name.
- String file = BlockReader.getFileName(addr, blockId);
+ String file = BlockReader.getFileName(addr, poolId, blockId);
BlockReader blockReader = BlockReader.newBlockReader(s, file,
- new Block(blockId, 0, genStamp), blockToken,
+ new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
offsetIntoBlock, amtToRead, conf.getInt("io.file.buffer.size", 4096));
byte[] buf = new byte[(int)amtToRead];
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Sep 23 17:14:20 2010
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FSInputCheck
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
@@ -57,7 +58,7 @@ class BlockReceiver implements java.io.C
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
- private Block block; // the block to receive
+ private ExtendedBlock block; // the block to receive
private DataInputStream in = null; // from where data are read
private DataChecksum checksum; // from where chunks of a block can be read
private OutputStream out = null; // to block file at local disk
@@ -81,7 +82,7 @@ class BlockReceiver implements java.io.C
final private ReplicaInPipelineInterface replicaInfo;
volatile private boolean mirrorError;
- BlockReceiver(Block block, DataInputStream in, String inAddr,
+ BlockReceiver(ExtendedBlock block, DataInputStream in, String inAddr,
String myAddr, BlockConstructionStage stage,
long newGs, long minBytesRcvd, long maxBytesRcvd,
String clientName, DatanodeInfo srcDataNode, DataNode datanode)
@@ -97,29 +98,30 @@ class BlockReceiver implements java.io.C
//
// Open local disk out
//
+ // TODO:FEDERATION use ExtendedBlock in the following method calls
if (clientName.length() == 0) { //replication or move
- replicaInfo = datanode.data.createTemporary(block);
+ replicaInfo = datanode.data.createTemporary(block.getLocalBlock());
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
- replicaInfo = datanode.data.createRbw(block);
+ replicaInfo = datanode.data.createRbw(block.getLocalBlock());
break;
case PIPELINE_SETUP_STREAMING_RECOVERY:
replicaInfo = datanode.data.recoverRbw(
- block, newGs, minBytesRcvd, maxBytesRcvd);
+ block.getLocalBlock(), newGs, minBytesRcvd, maxBytesRcvd);
block.setGenerationStamp(newGs);
break;
case PIPELINE_SETUP_APPEND:
- replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
+ replicaInfo = datanode.data.append(block.getLocalBlock(), newGs, minBytesRcvd);
if (datanode.blockScanner != null) { // remove from block scanner
- datanode.blockScanner.deleteBlock(block);
+ datanode.blockScanner.deleteBlock(block.getLocalBlock());
}
block.setGenerationStamp(newGs);
break;
case PIPELINE_SETUP_APPEND_RECOVERY:
- replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
+ replicaInfo = datanode.data.recoverAppend(block.getLocalBlock(), newGs, minBytesRcvd);
if (datanode.blockScanner != null) { // remove from block scanner
- datanode.blockScanner.deleteBlock(block);
+ datanode.blockScanner.deleteBlock(block.getLocalBlock());
}
block.setGenerationStamp(newGs);
break;
@@ -613,9 +615,8 @@ class BlockReceiver implements java.io.C
try {
if (clientName.length() > 0) {
responder = new Daemon(datanode.threadGroup,
- new PacketResponder(this, block, mirrIn,
- replyOut, numTargets,
- Thread.currentThread()));
+ new PacketResponder(this, block.getLocalBlock(), mirrIn, replyOut,
+ numTargets, Thread.currentThread()));
responder.start(); // start thread to processes reponses
}
@@ -641,7 +642,8 @@ class BlockReceiver implements java.io.C
// Finalize the block. Does this fsync()?
block.setNumBytes(replicaInfo.getNumBytes());
- datanode.data.finalizeBlock(block);
+ // TODO:FEDERATION use ExtendedBlock
+ datanode.data.finalizeBlock(block.getLocalBlock());
datanode.myMetrics.blocksWritten.inc();
}
@@ -673,7 +675,8 @@ class BlockReceiver implements java.io.C
*/
private void cleanupBlock() throws IOException {
if (clientName.length() == 0) { // not client write
- datanode.data.unfinalizeBlock(block);
+ // TODO:FEDERATION use ExtendedBlock
+ datanode.data.unfinalizeBlock(block.getLocalBlock());
}
}
@@ -690,7 +693,8 @@ class BlockReceiver implements java.io.C
}
// rollback the position of the meta file
- datanode.data.adjustCrcChannelPosition(block, streams, checksumSize);
+ // TODO:FEDERATION use ExtendedBlock
+ datanode.data.adjustCrcChannelPosition(block.getLocalBlock(), streams, checksumSize);
}
/**
@@ -718,7 +722,8 @@ class BlockReceiver implements java.io.C
byte[] crcbuf = new byte[checksumSize];
FSDataset.BlockInputStreams instr = null;
try {
- instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
+ // TODO:FEDERATION use ExtendedBlock
+ instr = datanode.data.getTmpInputStreams(block.getLocalBlock(), blkoff, ckoff);
IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);
// open meta file and read in crc value computer earlier
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu Sep 23 17:14:20 2010
@@ -31,7 +31,7 @@ import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import org.apache.hadoop.io.IOUtils;
@@ -46,7 +46,7 @@ class BlockSender implements java.io.Clo
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
- private Block block; // the block to read from
+ private ExtendedBlock block; // the block to read from
/** the replica to read from */
private final Replica replica;
@@ -80,14 +80,14 @@ class BlockSender implements java.io.Clo
private volatile ChunkChecksum lastChunkChecksum = null;
- BlockSender(Block block, long startOffset, long length,
+ BlockSender(ExtendedBlock block, long startOffset, long length,
boolean corruptChecksumOk, boolean chunkOffsetOK,
boolean verifyChecksum, DataNode datanode) throws IOException {
this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK,
verifyChecksum, datanode, null);
}
- BlockSender(Block block, long startOffset, long length,
+ BlockSender(ExtendedBlock block, long startOffset, long length,
boolean corruptChecksumOk, boolean chunkOffsetOK,
boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
throws IOException {
@@ -145,10 +145,10 @@ class BlockSender implements java.io.Clo
this.transferToAllowed = datanode.transferToAllowed;
this.clientTraceFmt = clientTraceFmt;
- if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
- checksumIn = new DataInputStream(
- new BufferedInputStream(datanode.data.getMetaDataInputStream(block),
- BUFFER_SIZE));
+ // TODO:FEDERATION metaFileExists and getMetaDataInputStream should take ExtendedBlock
+ if ( !corruptChecksumOk || datanode.data.metaFileExists(block.getLocalBlock()) ) {
+ checksumIn = new DataInputStream(new BufferedInputStream(datanode.data
+ .getMetaDataInputStream(block.getLocalBlock()), BUFFER_SIZE));
// read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
@@ -230,7 +230,8 @@ class BlockSender implements java.io.Clo
DataNode.LOG.debug("replica=" + replica);
}
- blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
+ // TODO:FEDERATION getBlockInputStream must acccept ExtendedBlock
+ blockIn = datanode.data.getBlockInputStream(block.getLocalBlock(), offset); // seek to offset
} catch (IOException ioe) {
IOUtils.closeStream(this);
IOUtils.closeStream(blockIn);
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Thu Sep 23 17:14:20 2010
@@ -49,6 +49,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.io.IOUtils;
@@ -435,7 +436,8 @@ class DataBlockScanner implements Runnab
try {
adjustThrottler();
- blockSender = new BlockSender(block, 0, -1, false,
+ // TODO:FEDERATION use ExtendedBlock
+ blockSender = new BlockSender(new ExtendedBlock(block), 0, -1, false,
false, true, datanode);
DataOutputStream out =
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Sep 23 17:14:20 2010
@@ -25,7 +25,6 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
-import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -50,9 +49,6 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -69,7 +65,6 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -77,16 +72,17 @@ import org.apache.hadoop.hdfs.protocol.F
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@@ -96,7 +92,6 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -106,6 +101,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
@@ -121,15 +117,20 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.mortbay.util.ajax.JSON;
+import java.lang.management.ManagementFactory;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
/**********************************************************
* DataNode is a class (and program) that stores a set of
* blocks for a DFS deployment. A single deployment can
@@ -363,7 +364,6 @@ public class DataNode extends Configured
} else { // real storage
// read storage info, lock data dirs and transition fs state if necessary
storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
-
// adjust
this.dnRegistration.setStorageInfo(storage);
// initialize data node internal structure
@@ -1143,10 +1143,11 @@ public class DataNode extends Configured
return;
}
- private void transferBlock( Block block,
+ private void transferBlock( ExtendedBlock block,
DatanodeInfo xferTargets[]
) throws IOException {
- if (!data.isValidBlock(block)) {
+ // TODO:FEDERATION use ExtendedBlock
+ if (!data.isValidBlock(block.getLocalBlock())) {
// block does not exist or is under-construction
String errStr = "Can't send invalid block " + block;
LOG.info(errStr);
@@ -1157,7 +1158,8 @@ public class DataNode extends Configured
}
// Check if NN recorded length matches on-disk length
- long onDiskLength = data.getLength(block);
+ // TODO:FEDERATION use ExtendedBlock
+ long onDiskLength = data.getLength(block.getLocalBlock());
if (block.getNumBytes() > onDiskLength) {
// Shorter on-disk len indicates corruption so report NN the corrupt block
namenode.reportBadBlocks(new LocatedBlock[]{
@@ -1190,7 +1192,8 @@ public class DataNode extends Configured
) {
for (int i = 0; i < blocks.length; i++) {
try {
- transferBlock(blocks[i], xferTargets[i]);
+ // TODO:FEDERATION cleanup
+ transferBlock(new ExtendedBlock(blocks[i]), xferTargets[i]);
} catch (IOException ie) {
LOG.warn("Failed to transfer block " + blocks[i], ie);
}
@@ -1306,14 +1309,15 @@ public class DataNode extends Configured
*/
class DataTransfer implements Runnable {
DatanodeInfo targets[];
- Block b;
+ ExtendedBlock b;
DataNode datanode;
/**
* Connect to the first item in the target list. Pass along the
* entire target list, the block, and the data.
*/
- public DataTransfer(DatanodeInfo targets[], Block b, DataNode datanode) throws IOException {
+ public DataTransfer(DatanodeInfo targets[], ExtendedBlock b,
+ DataNode datanode) throws IOException {
this.targets = targets;
this.b = b;
this.datanode = datanode;
@@ -1350,8 +1354,8 @@ public class DataNode extends Configured
//
Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
if (isBlockTokenEnabled) {
- accessToken = blockTokenSecretManager.generateToken(null, b,
- EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
+ accessToken = blockTokenSecretManager.generateToken(b,
+ EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
}
DataTransferProtocol.Sender.opWriteBlock(out,
b, 0, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, "",
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu Sep 23 17:14:20 2010
@@ -34,9 +34,9 @@ import java.net.Socket;
import java.net.SocketException;
import org.apache.commons.logging.Log;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -129,7 +129,7 @@ class DataXceiver extends DataTransferPr
* Read a block from the disk.
*/
@Override
- protected void opReadBlock(DataInputStream in, Block block,
+ protected void opReadBlock(DataInputStream in, ExtendedBlock block,
long startOffset, long length, String clientName,
Token<BlockTokenIdentifier> blockToken) throws IOException {
OutputStream baseStream = NetUtils.getOutputStream(s,
@@ -182,7 +182,7 @@ class DataXceiver extends DataTransferPr
try {
if (DataTransferProtocol.Status.read(in) == CHECKSUM_OK
&& datanode.blockScanner != null) {
- datanode.blockScanner.verifiedByClient(block);
+ datanode.blockScanner.verifiedByClient(block.getLocalBlock());
}
} catch (IOException ignored) {}
}
@@ -216,7 +216,7 @@ class DataXceiver extends DataTransferPr
* Write a block to disk.
*/
@Override
- protected void opWriteBlock(DataInputStream in, Block block,
+ protected void opWriteBlock(DataInputStream in, ExtendedBlock block,
int pipelineSize, BlockConstructionStage stage,
long newGs, long minBytesRcvd, long maxBytesRcvd,
String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
@@ -273,7 +273,8 @@ class DataXceiver extends DataTransferPr
stage, newGs, minBytesRcvd, maxBytesRcvd,
client, srcDataNode, datanode);
} else {
- datanode.data.recoverClose(block, newGs, minBytesRcvd);
+ // TODO:FEDERATION use ExtendedBlock
+ datanode.data.recoverClose(block.getLocalBlock(), newGs, minBytesRcvd);
}
//
@@ -376,7 +377,8 @@ class DataXceiver extends DataTransferPr
// the block is finalized in the PacketResponder.
if (client.length() == 0 ||
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
- datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+ // TODO:FEDERATION use ExtendedBlock
+ datanode.closeBlock(block.getLocalBlock(), DataNode.EMPTY_DEL_HINT);
LOG.info("Received block " + block +
" src: " + remoteAddress +
" dest: " + localAddress +
@@ -406,7 +408,7 @@ class DataXceiver extends DataTransferPr
* Get block checksum (MD5 of CRC32).
*/
@Override
- protected void opBlockChecksum(DataInputStream in, Block block,
+ protected void opBlockChecksum(DataInputStream in, ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken) throws IOException {
DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
datanode.socketWriteTimeout));
@@ -429,8 +431,9 @@ class DataXceiver extends DataTransferPr
}
}
+ // TODO:FEDERATION use ExtendedBlock
final MetaDataInputStream metadataIn =
- datanode.data.getMetaDataInputStream(block);
+ datanode.data.getMetaDataInputStream(block.getLocalBlock());
final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
metadataIn, BUFFER_SIZE));
@@ -470,7 +473,7 @@ class DataXceiver extends DataTransferPr
* Read a block from the disk and then sends it to a destination.
*/
@Override
- protected void opCopyBlock(DataInputStream in, Block block,
+ protected void opCopyBlock(DataInputStream in, ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken) throws IOException {
// Read in the header
if (datanode.isBlockTokenEnabled) {
@@ -545,7 +548,7 @@ class DataXceiver extends DataTransferPr
*/
@Override
protected void opReplaceBlock(DataInputStream in,
- Block block, String sourceID, DatanodeInfo proxySource,
+ ExtendedBlock block, String sourceID, DatanodeInfo proxySource,
Token<BlockTokenIdentifier> blockToken) throws IOException {
/* read header */
block.setNumBytes(dataXceiverServer.estimateBlockSize);
@@ -616,7 +619,8 @@ class DataXceiver extends DataTransferPr
dataXceiverServer.balanceThrottler, -1);
// notify name node
- datanode.notifyNamenodeReceivedBlock(block, sourceID);
+ // TODO:FEDERATION use ExtendedBlock
+ datanode.notifyNamenodeReceivedBlock(block.getLocalBlock(), sourceID);
LOG.info("Moved block " + block +
" from " + s.getRemoteSocketAddress());
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Thu Sep 23 17:14:20 2010
@@ -246,13 +246,12 @@ public class DatanodeJspHelper {
return;
}
- String blockSizeStr = req.getParameter("blockSize");
- long blockSize = 0;
+ final String blockSizeStr = req.getParameter("blockSize");
if (blockSizeStr == null || blockSizeStr.length() == 0) {
out.print("Invalid input");
return;
}
- blockSize = Long.parseLong(blockSizeStr);
+ long blockSize = Long.parseLong(blockSizeStr);
final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddrForClient(), conf);
List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
@@ -378,6 +377,12 @@ public class DatanodeJspHelper {
out.print("Invalid input (blockId absent)");
return;
}
+
+ final String poolId = req.getParameter("poolId");
+ if (poolId == null) {
+ out.print("Invalid input (poolId absent)");
+ return;
+ }
final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddrForClient(), conf);
@@ -559,7 +564,7 @@ public class DatanodeJspHelper {
out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
try {
JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
- datanodePort), blockId, blockToken, genStamp, blockSize,
+ datanodePort), poolId, blockId, blockToken, genStamp, blockSize,
startOffset, chunkSizeToView, out, conf);
} catch (Exception e) {
out.print(e);
@@ -626,6 +631,7 @@ public class DatanodeJspHelper {
return;
}
LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
+ String poolId = lastBlk.getBlock().getPoolId();
long blockSize = lastBlk.getBlock().getNumBytes();
long blockId = lastBlk.getBlock().getBlockId();
Token<BlockTokenIdentifier> accessToken = lastBlk.getBlockToken();
@@ -644,7 +650,7 @@ public class DatanodeJspHelper {
- chunkSizeToView : 0;
out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
- JspHelper.streamBlockInAscii(addr, blockId, accessToken, genStamp,
+ JspHelper.streamBlockInAscii(addr, poolId, blockId, accessToken, genStamp,
blockSize, startOffset, chunkSizeToView, out, conf);
out.print("</textarea>");
dfs.close();
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java Thu Sep 23 17:14:20 2010
@@ -22,6 +22,7 @@ package org.apache.hadoop.hdfs.server.da
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
/**
* Exception indicating that DataNode does not have a replica
@@ -43,6 +44,11 @@ public class ReplicaNotFoundException ex
super();
}
+ ReplicaNotFoundException(ExtendedBlock b) {
+ super("Replica not found for " + b);
+ }
+
+ // TODO:FEDERATION remove this later
ReplicaNotFoundException(Block b) {
super("Replica not found for " + b);
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Thu Sep 23 17:14:20 2010
@@ -502,10 +502,10 @@ public class NamenodeFsck {
s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
- String file = BlockReader.getFileName(targetAddr, block.getBlockId());
- blockReader = BlockReader.newBlockReader(s, file,
- block.getLocalBlock(), lblock.getBlockToken(), 0, -1, conf.getInt(
- "io.file.buffer.size", 4096));
+ String file = BlockReader.getFileName(targetAddr, block.getPoolId(),
+ block.getBlockId());
+ blockReader = BlockReader.newBlockReader(s, file, block, lblock
+ .getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096));
} catch (IOException ex) {
// Put chosen node into dead list, continue
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Thu Sep 23 17:14:20 2010
@@ -26,7 +26,6 @@ import java.util.List;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -83,7 +82,7 @@ public class TestClientBlockVerification
int offset, int lenToRead) throws IOException {
InetSocketAddress targetAddr = null;
Socket s = null;
- Block block = testBlock.getBlock().getLocalBlock();
+ ExtendedBlock block = testBlock.getBlock();
DatanodeInfo[] nodes = testBlock.getLocations();
targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
s = new Socket();
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Thu Sep 23 17:14:20 2010
@@ -172,7 +172,7 @@ public class TestDataTransferProtocol ex
String description, Boolean eofExcepted) throws IOException {
sendBuf.reset();
recvBuf.reset();
- DataTransferProtocol.Sender.opWriteBlock(sendOut, block.getLocalBlock(), 0,
+ DataTransferProtocol.Sender.opWriteBlock(sendOut, block, 0,
stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
if (eofExcepted) {
@@ -338,7 +338,8 @@ public class TestDataTransferProtocol ex
createFile(fileSys, file, fileLen);
// get the first blockid for the file
- ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+ final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+ final String poolId = firstBlock.getPoolId();
long newBlockId = firstBlock.getBlockId() + 1;
recvBuf.reset();
@@ -358,7 +359,7 @@ public class TestDataTransferProtocol ex
/* Test OP_WRITE_BLOCK */
sendBuf.reset();
DataTransferProtocol.Sender.opWriteBlock(sendOut,
- new Block(newBlockId), 0,
+ new ExtendedBlock(poolId, newBlockId), 0,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
@@ -372,7 +373,7 @@ public class TestDataTransferProtocol ex
sendBuf.reset();
recvBuf.reset();
DataTransferProtocol.Sender.opWriteBlock(sendOut,
- new Block(++newBlockId), 0,
+ new ExtendedBlock(poolId, ++newBlockId), 0,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
@@ -396,7 +397,7 @@ public class TestDataTransferProtocol ex
sendBuf.reset();
recvBuf.reset();
DataTransferProtocol.Sender.opWriteBlock(sendOut,
- new Block(++newBlockId), 0,
+ new ExtendedBlock(poolId, ++newBlockId), 0,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
@@ -419,7 +420,7 @@ public class TestDataTransferProtocol ex
/* Test OP_READ_BLOCK */
- Block blk = new Block(firstBlock.getLocalBlock());
+ ExtendedBlock blk = new ExtendedBlock(firstBlock.getLocalBlock());
long blkid = blk.getBlockId();
// bad block id
sendBuf.reset();
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Thu Sep 23 17:14:20 2010
@@ -230,13 +230,8 @@ public class TestBlockReplacement extend
sock.setKeepAlive(true);
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
- out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
- REPLACE_BLOCK.write(out);
- out.writeLong(block.getBlockId());
- out.writeLong(block.getGenerationStamp());
- Text.writeString(out, source.getStorageID());
- sourceProxy.write(out);
- BlockTokenSecretManager.DUMMY_TOKEN.write(out);
+ DataTransferProtocol.Sender.opReplaceBlock(out, block, source
+ .getStorageID(), sourceProxy, BlockTokenSecretManager.DUMMY_TOKEN);
out.flush();
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Thu Sep 23 17:14:20 2010
@@ -250,7 +250,7 @@ public class TestDataNodeVolumeFailure e
String file = BlockReader.getFileName(targetAddr, block.getBlockId());
BlockReader blockReader =
- BlockReader.newBlockReader(s, file, block.getLocalBlock(), lblock
+ BlockReader.newBlockReader(s, file, block, lblock
.getBlockToken(), 0, -1, 4096);
// nothing - if it fails - it will throw and exception
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Thu Sep 23 17:14:20 2010
@@ -117,7 +117,7 @@ public class TestDiskError extends TestC
DataOutputStream out = new DataOutputStream(
s.getOutputStream());
- Sender.opWriteBlock(out, block.getBlock().getLocalBlock(), 1,
+ Sender.opWriteBlock(out, block.getBlock(), 1,
BlockConstructionStage.PIPELINE_SETUP_CREATE,
0L, 0L, 0L, "", null, new DatanodeInfo[0],
BlockTokenSecretManager.DUMMY_TOKEN);
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java Thu Sep 23 17:14:20 2010
@@ -131,7 +131,7 @@ public class TestBlockTokenWithDFS exten
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
String file = BlockReader.getFileName(targetAddr, block.getBlockId());
- blockReader = BlockReader.newBlockReader(s, file, block.getLocalBlock(),
+ blockReader = BlockReader.newBlockReader(s, file, block,
lblock.getBlockToken(), 0, -1,
conf.getInt("io.file.buffer.size", 4096));