You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2010/09/23 19:14:21 UTC

svn commit: r1000541 - in /hadoop/hdfs/branches/HDFS-1052: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/balancer/ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache...

Author: suresh
Date: Thu Sep 23 17:14:20 2010
New Revision: 1000541

URL: http://svn.apache.org/viewvc?rev=1000541&view=rev
Log:
HDFS-1400. HDFS federation: DataTransferProtocol uses ExtendedBlockPool to include BlockPoolID in the protocol. Contributed by Suresh Srinivas.

Modified:
    hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java

Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Thu Sep 23 17:14:20 2010
@@ -37,11 +37,15 @@ Trunk (unreleased changes)
 
     HDFS-1361. Add -fileStatus operation to NNThroughputBenchmark. (shv)
 
-    HDFS-1365.HDFS federation: propose ClusterID and BlockPoolID format (tanping via boryas)
+    HDFS-1365.HDFS federation: propose ClusterID and BlockPoolID format 
+    (tanping via boryas)
 
     HDFS-1394. modify -format option for namenode to generated new blockpool id 
     and accept newcluster (boryas)
 
+    HDFS-1400. HDFS federation: DataTransferProtocol uses ExtendedBlockPool to 
+    include BlockPoolID in the protocol. (suresh)
+
   IMPROVEMENTS
 
     HDFS-1096. fix for prev. commit. (boryas)

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/BlockReader.java Thu Sep 23 17:14:20 2010
@@ -35,7 +35,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@@ -349,7 +349,7 @@ public class BlockReader extends FSInput
   }
 
   public static BlockReader newBlockReader(Socket sock, String file,
-      Block block, Token<BlockTokenIdentifier> blockToken, 
+      ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, 
       long startOffset, long len, int bufferSize) throws IOException {
     return newBlockReader(sock, file, block, blockToken, startOffset, len, bufferSize,
         true);
@@ -357,7 +357,7 @@ public class BlockReader extends FSInput
 
   /** Java Doc required */
   public static BlockReader newBlockReader( Socket sock, String file, 
-                                     Block block, 
+                                     ExtendedBlock block, 
                                      Token<BlockTokenIdentifier> blockToken,
                                      long startOffset, long len,
                                      int bufferSize, boolean verifyChecksum)
@@ -367,7 +367,7 @@ public class BlockReader extends FSInput
   }
 
   public static BlockReader newBlockReader( Socket sock, String file,
-                                     Block block, 
+                                     ExtendedBlock block, 
                                      Token<BlockTokenIdentifier> blockToken,
                                      long startOffset, long len,
                                      int bufferSize, boolean verifyChecksum,
@@ -394,14 +394,14 @@ public class BlockReader extends FSInput
             "Got access token error for OP_READ_BLOCK, self="
                 + sock.getLocalSocketAddress() + ", remote="
                 + sock.getRemoteSocketAddress() + ", for file " + file
-                + ", for block " + block.getBlockId() 
-                + "_" + block.getGenerationStamp());
+                + ", for pool " + block.getPoolId() + " block " 
+                + block.getBlockId() + "_" + block.getGenerationStamp());
       } else {
         throw new IOException("Got error for OP_READ_BLOCK, self="
             + sock.getLocalSocketAddress() + ", remote="
             + sock.getRemoteSocketAddress() + ", for file " + file
-            + ", for block " + block.getBlockId() + "_" 
-            + block.getGenerationStamp());
+            + ", for pool " + block.getPoolId() + " block " 
+            + block.getBlockId() + "_" + block.getGenerationStamp());
       }
     }
     DataChecksum checksum = DataChecksum.newDataChecksum( in );
@@ -417,6 +417,7 @@ public class BlockReader extends FSInput
                             startOffset + " for file " + file);
     }
 
+    // TODO:FEDERATION use poolId
     return new BlockReader(file, block.getBlockId(), in, checksum,
         verifyChecksum, startOffset, firstChunkOffset, len, sock);
   }
@@ -453,9 +454,15 @@ public class BlockReader extends FSInput
     }
   }
   
-  // File name to print when accessing a block directory from servlets
+  /**
+   * File name to print when accessing a block directly (from servlets)
+   * @param s Address of the block location
+   * @param poolId Block pool ID of the block
+   * @param blockId Block ID of the block
+   * @return string that has a file name for debug purposes
+   */
   public static String getFileName(final InetSocketAddress s,
-      final long blockId) {
-    return s.toString() + ":" + blockId;
+      final String poolId, final long blockId) {
+    return s.toString() + ":" + poolId + ":" + blockId;
   }
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java Thu Sep 23 17:14:20 2010
@@ -967,7 +967,7 @@ public class DFSClient implements FSCons
                 + BLOCK_CHECKSUM + ", block=" + block);
           }
           // get block MD5
-          DataTransferProtocol.Sender.opBlockChecksum(out, block.getLocalBlock(),
+          DataTransferProtocol.Sender.opBlockChecksum(out, block,
               lb.getBlockToken());
 
           final DataTransferProtocol.Status reply = DataTransferProtocol.Status.read(in);

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Thu Sep 23 17:14:20 2010
@@ -387,7 +387,7 @@ public class DFSInputStream extends FSIn
         ExtendedBlock blk = targetBlock.getBlock();
         Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
         
-        blockReader = BlockReader.newBlockReader(s, src, blk.getLocalBlock(), 
+        blockReader = BlockReader.newBlockReader(s, src, blk,
             accessToken, 
             offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
             buffersize, verifyChecksum, dfsClient.clientName);
@@ -629,7 +629,7 @@ public class DFSInputStream extends FSIn
         int len = (int) (end - start + 1);
             
         reader = BlockReader.newBlockReader(dn, src, 
-                                            block.getBlock().getLocalBlock(),
+                                            block.getBlock(),
                                             blockToken,
                                             start, len, buffersize, 
                                             verifyChecksum, dfsClient.clientName);

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Sep 23 17:14:20 2010
@@ -892,7 +892,7 @@ class DFSOutputStream extends FSOutputSu
         blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
 
         // send the request
-        DataTransferProtocol.Sender.opWriteBlock(out, block.getLocalBlock(), 
+        DataTransferProtocol.Sender.opWriteBlock(out, block,
             nodes.length, recoveryFlag ? stage.getRecoveryStage() : stage, newGS, 
             block.getNumBytes(), bytesSent, dfsClient.clientName, null, nodes,
             accessToken);

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Thu Sep 23 17:14:20 2010
@@ -46,9 +46,9 @@ public interface DataTransferProtocol {
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 19:
-   *    Change the block packet ack protocol to include seqno,
-   *    numberOfReplies, reply0, reply1, ...
+   * Version 20:
+   *    Changed the protocol methods to use ExtendedBlock instead
+   *    of Block.
    */
   public static final int DATA_TRANSFER_VERSION = 19;
 
@@ -229,7 +229,7 @@ public interface DataTransferProtocol {
     }
 
     /** Send OP_READ_BLOCK */
-    public static void opReadBlock(DataOutputStream out, Block blk,
+    public static void opReadBlock(DataOutputStream out, ExtendedBlock blk,
         long blockOffset, long blockLen, String clientName,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
@@ -244,7 +244,7 @@ public interface DataTransferProtocol {
     }
     
     /** Send OP_WRITE_BLOCK */
-    public static void opWriteBlock(DataOutputStream out, Block blk,
+    public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk,
         int pipelineSize, BlockConstructionStage stage, long newGs,
         long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
         DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
@@ -273,7 +273,7 @@ public interface DataTransferProtocol {
     
     /** Send OP_REPLACE_BLOCK */
     public static void opReplaceBlock(DataOutputStream out,
-        Block blk, String storageId, DatanodeInfo src,
+        ExtendedBlock blk, String storageId, DatanodeInfo src,
         Token<BlockTokenIdentifier> blockToken) throws IOException {
       op(out, Op.REPLACE_BLOCK);
 
@@ -285,7 +285,7 @@ public interface DataTransferProtocol {
     }
 
     /** Send OP_COPY_BLOCK */
-    public static void opCopyBlock(DataOutputStream out, Block blk,
+    public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
       op(out, Op.COPY_BLOCK);
@@ -296,7 +296,7 @@ public interface DataTransferProtocol {
     }
 
     /** Send OP_BLOCK_CHECKSUM */
-    public static void opBlockChecksum(DataOutputStream out, Block blk,
+    public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
       op(out, Op.BLOCK_CHECKSUM);
@@ -346,7 +346,7 @@ public interface DataTransferProtocol {
 
     /** Receive OP_READ_BLOCK */
     private void opReadBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final long offset = in.readLong();
       final long length = in.readLong();
@@ -359,13 +359,13 @@ public interface DataTransferProtocol {
     /**
      * Abstract OP_READ_BLOCK method. Read a block.
      */
-    protected abstract void opReadBlock(DataInputStream in, Block blk,
+    protected abstract void opReadBlock(DataInputStream in, ExtendedBlock blk,
         long offset, long length, String client,
         Token<BlockTokenIdentifier> blockToken) throws IOException;
     
     /** Receive OP_WRITE_BLOCK */
     private void opWriteBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
       final BlockConstructionStage stage = 
@@ -394,7 +394,7 @@ public interface DataTransferProtocol {
      * Abstract OP_WRITE_BLOCK method. 
      * Write a block.
      */
-    protected abstract void opWriteBlock(DataInputStream in, Block blk,
+    protected abstract void opWriteBlock(DataInputStream in, ExtendedBlock blk,
         int pipelineSize, BlockConstructionStage stage, long newGs,
         long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
         DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
@@ -402,7 +402,7 @@ public interface DataTransferProtocol {
 
     /** Receive OP_REPLACE_BLOCK */
     private void opReplaceBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final String sourceId = Text.readString(in); // read del hint
       final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
@@ -416,12 +416,12 @@ public interface DataTransferProtocol {
      * It is used for balancing purpose; send to a destination
      */
     protected abstract void opReplaceBlock(DataInputStream in,
-        Block blk, String sourceId, DatanodeInfo src,
+        ExtendedBlock blk, String sourceId, DatanodeInfo src,
         Token<BlockTokenIdentifier> blockToken) throws IOException;
 
     /** Receive OP_COPY_BLOCK */
     private void opCopyBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
@@ -432,13 +432,13 @@ public interface DataTransferProtocol {
      * Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
      * a proxy source.
      */
-    protected abstract void opCopyBlock(DataInputStream in, Block blk,
+    protected abstract void opCopyBlock(DataInputStream in, ExtendedBlock blk,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException;
 
     /** Receive OP_BLOCK_CHECKSUM */
     private void opBlockChecksum(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
@@ -450,7 +450,7 @@ public interface DataTransferProtocol {
      * Get the checksum of a block 
      */
     protected abstract void opBlockChecksum(DataInputStream in,
-        Block blk, Token<BlockTokenIdentifier> blockToken)
+        ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
         throws IOException;
 
     /** Read an AccessToken */

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java Thu Sep 23 17:14:20 2010
@@ -79,6 +79,18 @@ public class ExtendedBlock implements Wr
     block.readHelper(in);
   }
 
+  // Write only the identifier part of the block
+  public void writeId(DataOutput out) throws IOException {
+    DeprecatedUTF8.writeString(out, poolId);
+    block.writeId(out);
+  }
+
+  // Read only the identifier part of the block
+  public void readId(DataInput in) throws IOException {
+    this.poolId = DeprecatedUTF8.readString(in);
+    block.readId(in);
+  }
+  
   public String getPoolId() {
     return poolId;
   }
@@ -110,6 +122,11 @@ public class ExtendedBlock implements Wr
   public void setNumBytes(final long len) {
     block.setNumBytes(len);
   }
+  
+  public void set(String poolId, long blkid, long gs, long len) {
+    this.poolId = poolId;
+    block.set(blkid, gs, len);
+  }
 
   public static Block getLocalBlock(final ExtendedBlock b) {
     return b == null ? null : b.getLocalBlock();

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Thu Sep 23 17:14:20 2010
@@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -376,8 +377,9 @@ public class Balancer implements Tool {
             .getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
             BlockTokenSecretManager.AccessMode.COPY));
       }
+      // TODO:FEDERATION use ExtendedBlock in BalancerBlock
       DataTransferProtocol.Sender.opReplaceBlock(out,
-          block.getBlock(), source.getStorageID(), 
+          new ExtendedBlock(block.getBlock()), source.getStorageID(), 
           proxySource.getDatanode(), accessToken);
     }
     

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Thu Sep 23 17:14:20 2010
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -180,7 +181,7 @@ public class JspHelper {
     return chosenNode;
   }
 
-  public static void streamBlockInAscii(InetSocketAddress addr, 
+  public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
       long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
       JspWriter out, Configuration conf) throws IOException {
@@ -192,9 +193,9 @@ public class JspHelper {
       long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);     
       
       // Use the block name for file name. 
-      String file = BlockReader.getFileName(addr, blockId);
+      String file = BlockReader.getFileName(addr, poolId, blockId);
       BlockReader blockReader = BlockReader.newBlockReader(s, file,
-        new Block(blockId, 0, genStamp), blockToken,
+        new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
         offsetIntoBlock, amtToRead, conf.getInt("io.file.buffer.size", 4096));
         
     byte[] buf = new byte[(int)amtToRead];

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Sep 23 17:14:20 2010
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FSInputCheck
 import org.apache.hadoop.fs.FSOutputSummer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
@@ -57,7 +58,7 @@ class BlockReceiver implements java.io.C
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
-  private Block block; // the block to receive
+  private ExtendedBlock block; // the block to receive
   private DataInputStream in = null; // from where data are read
   private DataChecksum checksum; // from where chunks of a block can be read
   private OutputStream out = null; // to block file at local disk
@@ -81,7 +82,7 @@ class BlockReceiver implements java.io.C
   final private ReplicaInPipelineInterface replicaInfo;
   volatile private boolean mirrorError;
 
-  BlockReceiver(Block block, DataInputStream in, String inAddr,
+  BlockReceiver(ExtendedBlock block, DataInputStream in, String inAddr,
                 String myAddr, BlockConstructionStage stage, 
                 long newGs, long minBytesRcvd, long maxBytesRcvd, 
                 String clientName, DatanodeInfo srcDataNode, DataNode datanode)
@@ -97,29 +98,30 @@ class BlockReceiver implements java.io.C
       //
       // Open local disk out
       //
+      // TODO:FEDERATION use ExtendedBlock in the following method calls
       if (clientName.length() == 0) { //replication or move
-        replicaInfo = datanode.data.createTemporary(block);
+        replicaInfo = datanode.data.createTemporary(block.getLocalBlock());
       } else {
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
-          replicaInfo = datanode.data.createRbw(block);
+          replicaInfo = datanode.data.createRbw(block.getLocalBlock());
           break;
         case PIPELINE_SETUP_STREAMING_RECOVERY:
           replicaInfo = datanode.data.recoverRbw(
-              block, newGs, minBytesRcvd, maxBytesRcvd);
+              block.getLocalBlock(), newGs, minBytesRcvd, maxBytesRcvd);
           block.setGenerationStamp(newGs);
           break;
         case PIPELINE_SETUP_APPEND:
-          replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
+          replicaInfo = datanode.data.append(block.getLocalBlock(), newGs, minBytesRcvd);
           if (datanode.blockScanner != null) { // remove from block scanner
-            datanode.blockScanner.deleteBlock(block);
+            datanode.blockScanner.deleteBlock(block.getLocalBlock());
           }
           block.setGenerationStamp(newGs);
           break;
         case PIPELINE_SETUP_APPEND_RECOVERY:
-          replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
+          replicaInfo = datanode.data.recoverAppend(block.getLocalBlock(), newGs, minBytesRcvd);
           if (datanode.blockScanner != null) { // remove from block scanner
-            datanode.blockScanner.deleteBlock(block);
+            datanode.blockScanner.deleteBlock(block.getLocalBlock());
           }
           block.setGenerationStamp(newGs);
           break;
@@ -613,9 +615,8 @@ class BlockReceiver implements java.io.C
     try {
       if (clientName.length() > 0) {
         responder = new Daemon(datanode.threadGroup, 
-                               new PacketResponder(this, block, mirrIn, 
-                                                   replyOut, numTargets,
-                                                   Thread.currentThread()));
+            new PacketResponder(this, block.getLocalBlock(), mirrIn, replyOut, 
+                                numTargets, Thread.currentThread()));
         responder.start(); // start thread to processes reponses
       }
 
@@ -641,7 +642,8 @@ class BlockReceiver implements java.io.C
 
         // Finalize the block. Does this fsync()?
         block.setNumBytes(replicaInfo.getNumBytes());
-        datanode.data.finalizeBlock(block);
+        // TODO:FEDERATION use ExtendedBlock
+        datanode.data.finalizeBlock(block.getLocalBlock());
         datanode.myMetrics.blocksWritten.inc();
       }
 
@@ -673,7 +675,8 @@ class BlockReceiver implements java.io.C
    */
   private void cleanupBlock() throws IOException {
     if (clientName.length() == 0) { // not client write
-      datanode.data.unfinalizeBlock(block);
+      // TODO:FEDERATION use ExtendedBlock
+      datanode.data.unfinalizeBlock(block.getLocalBlock());
     }
   }
 
@@ -690,7 +693,8 @@ class BlockReceiver implements java.io.C
     }
 
     // rollback the position of the meta file
-    datanode.data.adjustCrcChannelPosition(block, streams, checksumSize);
+    // TODO:FEDERATION use ExtendedBlock
+    datanode.data.adjustCrcChannelPosition(block.getLocalBlock(), streams, checksumSize);
   }
 
   /**
@@ -718,7 +722,8 @@ class BlockReceiver implements java.io.C
     byte[] crcbuf = new byte[checksumSize];
     FSDataset.BlockInputStreams instr = null;
     try { 
-      instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
+      // TODO:FEDERATION use ExtendedBlock
+      instr = datanode.data.getTmpInputStreams(block.getLocalBlock(), blkoff, ckoff);
       IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);
 
       // open meta file and read in crc value computer earlier

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu Sep 23 17:14:20 2010
@@ -31,7 +31,7 @@ import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.io.IOUtils;
@@ -46,7 +46,7 @@ class BlockSender implements java.io.Clo
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
-  private Block block; // the block to read from
+  private ExtendedBlock block; // the block to read from
 
   /** the replica to read from */
   private final Replica replica;
@@ -80,14 +80,14 @@ class BlockSender implements java.io.Clo
   private volatile ChunkChecksum lastChunkChecksum = null;
 
   
-  BlockSender(Block block, long startOffset, long length,
+  BlockSender(ExtendedBlock block, long startOffset, long length,
               boolean corruptChecksumOk, boolean chunkOffsetOK,
               boolean verifyChecksum, DataNode datanode) throws IOException {
     this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK,
          verifyChecksum, datanode, null);
   }
 
-  BlockSender(Block block, long startOffset, long length,
+  BlockSender(ExtendedBlock block, long startOffset, long length,
               boolean corruptChecksumOk, boolean chunkOffsetOK,
               boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
       throws IOException {
@@ -145,10 +145,10 @@ class BlockSender implements java.io.Clo
       this.transferToAllowed = datanode.transferToAllowed;
       this.clientTraceFmt = clientTraceFmt;
 
-      if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
-        checksumIn = new DataInputStream(
-                new BufferedInputStream(datanode.data.getMetaDataInputStream(block),
-                                        BUFFER_SIZE));
+      // TODO:FEDERATION metaFileExists and getMetaDataInputStream should take ExtendedBlock
+      if ( !corruptChecksumOk || datanode.data.metaFileExists(block.getLocalBlock()) ) {
+        checksumIn = new DataInputStream(new BufferedInputStream(datanode.data
+            .getMetaDataInputStream(block.getLocalBlock()), BUFFER_SIZE));
 
         // read and handle the common header here. For now just a version
        BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
@@ -230,7 +230,8 @@ class BlockSender implements java.io.Clo
         DataNode.LOG.debug("replica=" + replica);
       }
 
-      blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
+      // TODO:FEDERATION getBlockInputStream must acccept ExtendedBlock
+      blockIn = datanode.data.getBlockInputStream(block.getLocalBlock(), offset); // seek to offset
     } catch (IOException ioe) {
       IOUtils.closeStream(this);
       IOUtils.closeStream(blockIn);

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Thu Sep 23 17:14:20 2010
@@ -49,6 +49,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.io.IOUtils;
@@ -435,7 +436,8 @@ class DataBlockScanner implements Runnab
       try {
         adjustThrottler();
         
-        blockSender = new BlockSender(block, 0, -1, false, 
+        // TODO:FEDERATION use ExtendedBlock
+        blockSender = new BlockSender(new ExtendedBlock(block), 0, -1, false, 
                                                false, true, datanode);
 
         DataOutputStream out = 

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Sep 23 17:14:20 2010
@@ -25,7 +25,6 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.lang.management.ManagementFactory;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -50,9 +49,6 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -69,7 +65,6 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -77,16 +72,17 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@@ -96,7 +92,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -106,6 +101,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
@@ -121,15 +117,20 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.mortbay.util.ajax.JSON;
 
+import java.lang.management.ManagementFactory;  
+
+import javax.management.MBeanServer; 
+import javax.management.ObjectName;
+
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
  * blocks for a DFS deployment.  A single deployment can
@@ -363,7 +364,6 @@ public class DataNode extends Configured
     } else { // real storage
       // read storage info, lock data dirs and transition fs state if necessary
       storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
-      
       // adjust
       this.dnRegistration.setStorageInfo(storage);
       // initialize data node internal structure
@@ -1143,10 +1143,11 @@ public class DataNode extends Configured
     return;
   }
 
-  private void transferBlock( Block block, 
+  private void transferBlock( ExtendedBlock block, 
                               DatanodeInfo xferTargets[] 
                               ) throws IOException {
-    if (!data.isValidBlock(block)) {
+    // TODO:FEDERATION use ExtendedBlock
+    if (!data.isValidBlock(block.getLocalBlock())) {
       // block does not exist or is under-construction
       String errStr = "Can't send invalid block " + block;
       LOG.info(errStr);
@@ -1157,7 +1158,8 @@ public class DataNode extends Configured
     }
 
     // Check if NN recorded length matches on-disk length 
-    long onDiskLength = data.getLength(block);
+    // TODO:FEDERATION use ExtendedBlock
+    long onDiskLength = data.getLength(block.getLocalBlock());
     if (block.getNumBytes() > onDiskLength) {
       // Shorter on-disk len indicates corruption so report NN the corrupt block
       namenode.reportBadBlocks(new LocatedBlock[]{
@@ -1190,7 +1192,8 @@ public class DataNode extends Configured
                                ) {
     for (int i = 0; i < blocks.length; i++) {
       try {
-        transferBlock(blocks[i], xferTargets[i]);
+        // TODO:FEDERATION cleanup
+        transferBlock(new ExtendedBlock(blocks[i]), xferTargets[i]);
       } catch (IOException ie) {
         LOG.warn("Failed to transfer block " + blocks[i], ie);
       }
@@ -1306,14 +1309,15 @@ public class DataNode extends Configured
    */
   class DataTransfer implements Runnable {
     DatanodeInfo targets[];
-    Block b;
+    ExtendedBlock b;
     DataNode datanode;
 
     /**
      * Connect to the first item in the target list.  Pass along the 
      * entire target list, the block, and the data.
      */
-    public DataTransfer(DatanodeInfo targets[], Block b, DataNode datanode) throws IOException {
+    public DataTransfer(DatanodeInfo targets[], ExtendedBlock b,
+        DataNode datanode) throws IOException {
       this.targets = targets;
       this.b = b;
       this.datanode = datanode;
@@ -1350,8 +1354,8 @@ public class DataNode extends Configured
         //
         Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
         if (isBlockTokenEnabled) {
-          accessToken = blockTokenSecretManager.generateToken(null, b,
-          EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
+          accessToken = blockTokenSecretManager.generateToken(b, 
+              EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
         }
         DataTransferProtocol.Sender.opWriteBlock(out,
             b, 0, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, "",

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu Sep 23 17:14:20 2010
@@ -34,9 +34,9 @@ import java.net.Socket;
 import java.net.SocketException;
 
 import org.apache.commons.logging.Log;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -129,7 +129,7 @@ class DataXceiver extends DataTransferPr
    * Read a block from the disk.
    */
   @Override
-  protected void opReadBlock(DataInputStream in, Block block,
+  protected void opReadBlock(DataInputStream in, ExtendedBlock block,
       long startOffset, long length, String clientName,
       Token<BlockTokenIdentifier> blockToken) throws IOException {
     OutputStream baseStream = NetUtils.getOutputStream(s, 
@@ -182,7 +182,7 @@ class DataXceiver extends DataTransferPr
         try {
           if (DataTransferProtocol.Status.read(in) == CHECKSUM_OK
               && datanode.blockScanner != null) {
-            datanode.blockScanner.verifiedByClient(block);
+            datanode.blockScanner.verifiedByClient(block.getLocalBlock());
           }
         } catch (IOException ignored) {}
       }
@@ -216,7 +216,7 @@ class DataXceiver extends DataTransferPr
    * Write a block to disk.
    */
   @Override
-  protected void opWriteBlock(DataInputStream in, Block block, 
+  protected void opWriteBlock(DataInputStream in, ExtendedBlock block, 
       int pipelineSize, BlockConstructionStage stage,
       long newGs, long minBytesRcvd, long maxBytesRcvd,
       String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
@@ -273,7 +273,8 @@ class DataXceiver extends DataTransferPr
             stage, newGs, minBytesRcvd, maxBytesRcvd,
             client, srcDataNode, datanode);
       } else {
-        datanode.data.recoverClose(block, newGs, minBytesRcvd);
+        // TODO:FEDERATION use ExtendedBlock
+        datanode.data.recoverClose(block.getLocalBlock(), newGs, minBytesRcvd);
       }
 
       //
@@ -376,7 +377,8 @@ class DataXceiver extends DataTransferPr
       // the block is finalized in the PacketResponder.
       if (client.length() == 0 || 
           stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
-        datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+        // TODO:FEDERATION use ExtendedBlock
+        datanode.closeBlock(block.getLocalBlock(), DataNode.EMPTY_DEL_HINT);
         LOG.info("Received block " + block + 
                  " src: " + remoteAddress +
                  " dest: " + localAddress +
@@ -406,7 +408,7 @@ class DataXceiver extends DataTransferPr
    * Get block checksum (MD5 of CRC32).
    */
   @Override
-  protected void opBlockChecksum(DataInputStream in, Block block,
+  protected void opBlockChecksum(DataInputStream in, ExtendedBlock block,
       Token<BlockTokenIdentifier> blockToken) throws IOException {
     DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
         datanode.socketWriteTimeout));
@@ -429,8 +431,9 @@ class DataXceiver extends DataTransferPr
       }
     }
 
+    // TODO:FEDERATION use ExtendedBlock
     final MetaDataInputStream metadataIn = 
-      datanode.data.getMetaDataInputStream(block);
+      datanode.data.getMetaDataInputStream(block.getLocalBlock());
     final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
         metadataIn, BUFFER_SIZE));
 
@@ -470,7 +473,7 @@ class DataXceiver extends DataTransferPr
    * Read a block from the disk and then sends it to a destination.
    */
   @Override
-  protected void opCopyBlock(DataInputStream in, Block block,
+  protected void opCopyBlock(DataInputStream in, ExtendedBlock block,
       Token<BlockTokenIdentifier> blockToken) throws IOException {
     // Read in the header
     if (datanode.isBlockTokenEnabled) {
@@ -545,7 +548,7 @@ class DataXceiver extends DataTransferPr
    */
   @Override
   protected void opReplaceBlock(DataInputStream in,
-      Block block, String sourceID, DatanodeInfo proxySource,
+      ExtendedBlock block, String sourceID, DatanodeInfo proxySource,
       Token<BlockTokenIdentifier> blockToken) throws IOException {
     /* read header */
     block.setNumBytes(dataXceiverServer.estimateBlockSize);
@@ -616,7 +619,8 @@ class DataXceiver extends DataTransferPr
           dataXceiverServer.balanceThrottler, -1);
                     
       // notify name node
-      datanode.notifyNamenodeReceivedBlock(block, sourceID);
+      // TODO:FEDERATION use ExtendedBlock
+      datanode.notifyNamenodeReceivedBlock(block.getLocalBlock(), sourceID);
 
       LOG.info("Moved block " + block + 
           " from " + s.getRemoteSocketAddress());

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Thu Sep 23 17:14:20 2010
@@ -246,13 +246,12 @@ public class DatanodeJspHelper {
       return;
     }
 
-    String blockSizeStr = req.getParameter("blockSize");
-    long blockSize = 0;
+    final String blockSizeStr = req.getParameter("blockSize");
     if (blockSizeStr == null || blockSizeStr.length() == 0) {
       out.print("Invalid input");
       return;
     }
-    blockSize = Long.parseLong(blockSizeStr);
+    long blockSize = Long.parseLong(blockSizeStr);
 
     final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddrForClient(), conf);
     List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
@@ -378,6 +377,12 @@ public class DatanodeJspHelper {
       out.print("Invalid input (blockId absent)");
       return;
     }
+    
+    final String poolId = req.getParameter("poolId");
+    if (poolId == null) {
+      out.print("Invalid input (poolId absent)");
+      return;
+    }
 
     final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddrForClient(), conf);
 
@@ -559,7 +564,7 @@ public class DatanodeJspHelper {
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
     try {
       JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
-          datanodePort), blockId, blockToken, genStamp, blockSize,
+          datanodePort), poolId, blockId, blockToken, genStamp, blockSize,
           startOffset, chunkSizeToView, out, conf);
     } catch (Exception e) {
       out.print(e);
@@ -626,6 +631,7 @@ public class DatanodeJspHelper {
       return;
     }
     LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
+    String poolId = lastBlk.getBlock().getPoolId();
     long blockSize = lastBlk.getBlock().getNumBytes();
     long blockId = lastBlk.getBlock().getBlockId();
     Token<BlockTokenIdentifier> accessToken = lastBlk.getBlockToken();
@@ -644,7 +650,7 @@ public class DatanodeJspHelper {
         - chunkSizeToView : 0;
 
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
-    JspHelper.streamBlockInAscii(addr, blockId, accessToken, genStamp,
+    JspHelper.streamBlockInAscii(addr, poolId, blockId, accessToken, genStamp,
         blockSize, startOffset, chunkSizeToView, out, conf);
     out.print("</textarea>");
     dfs.close();

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java Thu Sep 23 17:14:20 2010
@@ -22,6 +22,7 @@ package org.apache.hadoop.hdfs.server.da
 import java.io.IOException;
 
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 
 /**
  * Exception indicating that DataNode does not have a replica
@@ -43,6 +44,11 @@ public class ReplicaNotFoundException ex
     super();
   }
 
+  ReplicaNotFoundException(ExtendedBlock b) {
+    super("Replica not found for " + b);
+  }
+  
+  // TODO:FEDERATION remove this later
   ReplicaNotFoundException(Block b) {
     super("Replica not found for " + b);
   }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Thu Sep 23 17:14:20 2010
@@ -502,10 +502,10 @@ public class NamenodeFsck {
         s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
         s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
         
-        String file = BlockReader.getFileName(targetAddr, block.getBlockId());
-        blockReader = BlockReader.newBlockReader(s, file,
-            block.getLocalBlock(), lblock.getBlockToken(), 0, -1, conf.getInt(
-                "io.file.buffer.size", 4096));
+        String file = BlockReader.getFileName(targetAddr, block.getPoolId(),
+            block.getBlockId());
+        blockReader = BlockReader.newBlockReader(s, file, block, lblock
+            .getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096));
         
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Thu Sep 23 17:14:20 2010
@@ -26,7 +26,6 @@ import java.util.List;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -83,7 +82,7 @@ public class TestClientBlockVerification
     int offset, int lenToRead) throws IOException {
     InetSocketAddress targetAddr = null;
     Socket s = null;
-    Block block = testBlock.getBlock().getLocalBlock();
+    ExtendedBlock block = testBlock.getBlock();
     DatanodeInfo[] nodes = testBlock.getLocations();
     targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
     s = new Socket();

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Thu Sep 23 17:14:20 2010
@@ -172,7 +172,7 @@ public class TestDataTransferProtocol ex
       String description, Boolean eofExcepted) throws IOException {
     sendBuf.reset();
     recvBuf.reset();
-    DataTransferProtocol.Sender.opWriteBlock(sendOut, block.getLocalBlock(), 0,
+    DataTransferProtocol.Sender.opWriteBlock(sendOut, block, 0,
         stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     if (eofExcepted) {
@@ -338,7 +338,8 @@ public class TestDataTransferProtocol ex
     createFile(fileSys, file, fileLen);
 
     // get the first blockid for the file
-    ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+    final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+    final String poolId = firstBlock.getPoolId();
     long newBlockId = firstBlock.getBlockId() + 1;
 
     recvBuf.reset();
@@ -358,7 +359,7 @@ public class TestDataTransferProtocol ex
     /* Test OP_WRITE_BLOCK */
     sendBuf.reset();
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
-        new Block(newBlockId), 0,
+        new ExtendedBlock(poolId, newBlockId), 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
@@ -372,7 +373,7 @@ public class TestDataTransferProtocol ex
     sendBuf.reset();
     recvBuf.reset();
     DataTransferProtocol.Sender.opWriteBlock(sendOut,
-        new Block(++newBlockId), 0,
+        new ExtendedBlock(poolId, ++newBlockId), 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
@@ -396,7 +397,7 @@ public class TestDataTransferProtocol ex
     sendBuf.reset();
     recvBuf.reset();
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
-        new Block(++newBlockId), 0,
+        new ExtendedBlock(poolId, ++newBlockId), 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
@@ -419,7 +420,7 @@ public class TestDataTransferProtocol ex
     
     /* Test OP_READ_BLOCK */
 
-    Block blk = new Block(firstBlock.getLocalBlock());
+    ExtendedBlock blk = new ExtendedBlock(firstBlock.getLocalBlock());
     long blkid = blk.getBlockId();
     // bad block id
     sendBuf.reset();

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Thu Sep 23 17:14:20 2010
@@ -230,13 +230,8 @@ public class TestBlockReplacement extend
     sock.setKeepAlive(true);
     // sendRequest
     DataOutputStream out = new DataOutputStream(sock.getOutputStream());
-    out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-    REPLACE_BLOCK.write(out);
-    out.writeLong(block.getBlockId());
-    out.writeLong(block.getGenerationStamp());
-    Text.writeString(out, source.getStorageID());
-    sourceProxy.write(out);
-    BlockTokenSecretManager.DUMMY_TOKEN.write(out);
+    DataTransferProtocol.Sender.opReplaceBlock(out, block, source
+        .getStorageID(), sourceProxy, BlockTokenSecretManager.DUMMY_TOKEN);
     out.flush();
     // receiveResponse
     DataInputStream reply = new DataInputStream(sock.getInputStream());

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Thu Sep 23 17:14:20 2010
@@ -250,7 +250,7 @@ public class TestDataNodeVolumeFailure e
 
     String file = BlockReader.getFileName(targetAddr, block.getBlockId());
     BlockReader blockReader = 
-      BlockReader.newBlockReader(s, file, block.getLocalBlock(), lblock
+      BlockReader.newBlockReader(s, file, block, lblock
         .getBlockToken(), 0, -1, 4096);
 
     // nothing - if it fails - it will throw and exception

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Thu Sep 23 17:14:20 2010
@@ -117,7 +117,7 @@ public class TestDiskError extends TestC
       DataOutputStream out = new DataOutputStream(
           s.getOutputStream());
 
-      Sender.opWriteBlock(out, block.getBlock().getLocalBlock(), 1, 
+      Sender.opWriteBlock(out, block.getBlock(), 1, 
           BlockConstructionStage.PIPELINE_SETUP_CREATE, 
           0L, 0L, 0L, "", null, new DatanodeInfo[0], 
           BlockTokenSecretManager.DUMMY_TOKEN);

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java?rev=1000541&r1=1000540&r2=1000541&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java Thu Sep 23 17:14:20 2010
@@ -131,7 +131,7 @@ public class TestBlockTokenWithDFS exten
       s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
 
       String file = BlockReader.getFileName(targetAddr, block.getBlockId());
-      blockReader = BlockReader.newBlockReader(s, file, block.getLocalBlock(), 
+      blockReader = BlockReader.newBlockReader(s, file, block, 
           lblock.getBlockToken(), 0, -1, 
           conf.getInt("io.file.buffer.size", 4096));