You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2011/04/29 20:16:38 UTC

svn commit: r1097905 [2/14] - in /hadoop/hdfs/trunk: ./ bin/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/ja...

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Apr 29 18:16:32 2011
@@ -67,9 +67,9 @@ public interface ClientProtocol extends 
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 66: Add getAdditionalDatanode(..)
+   * 67: Add block pool ID to Block
    */
-  public static final long versionID = 66L;
+  public static final long versionID = 67L;
   
   ///////////////////////////////////////
   // File contents
@@ -126,7 +126,7 @@ public interface ClientProtocol extends 
    * <p>
    * Blocks have a maximum size.  Clients that intend to create
    * multi-block files must also use 
-   * {@link #addBlock(String, String, Block, DatanodeInfo[])}
+   * {@link #addBlock(String, String, ExtendedBlock, DatanodeInfo[])}
    *
    * @param src path of the file being created.
    * @param masked masked permission.
@@ -259,7 +259,7 @@ public interface ClientProtocol extends 
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws IOException If an I/O error occurred
    */
-  public void abandonBlock(Block b, String src, String holder)
+  public void abandonBlock(ExtendedBlock b, String src, String holder)
       throws AccessControlException, FileNotFoundException,
       UnresolvedLinkException, IOException;
 
@@ -293,7 +293,7 @@ public interface ClientProtocol extends 
    * @throws IOException If an I/O error occurred
    */
   public LocatedBlock addBlock(String src, String clientName,
-      @Nullable Block previous, @Nullable DatanodeInfo[] excludeNodes)
+      @Nullable ExtendedBlock previous, @Nullable DatanodeInfo[] excludeNodes)
       throws AccessControlException, FileNotFoundException,
       NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
       IOException;
@@ -316,7 +316,7 @@ public interface ClientProtocol extends 
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws IOException If an I/O error occurred
    */
-  public LocatedBlock getAdditionalDatanode(final String src, final Block blk,
+  public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
       final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
       final int numAdditionalNodes, final String clientName
       ) throws AccessControlException, FileNotFoundException,
@@ -329,7 +329,7 @@ public interface ClientProtocol extends 
    * The function returns whether the file has been closed successfully.
    * If the function returns false, the caller should try again.
    * 
-   * close() also commits the last block of the file by reporting
+   * close() also commits the last block of file by reporting
    * to the name-node the actual generation stamp and the length
    * of the block that the client has transmitted to data-nodes.
    *
@@ -344,7 +344,7 @@ public interface ClientProtocol extends 
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink 
    * @throws IOException If an I/O error occurred
    */
-  public boolean complete(String src, String clientName, Block last)
+  public boolean complete(String src, String clientName, ExtendedBlock last)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException;
 
@@ -554,6 +554,8 @@ public interface ClientProtocol extends 
    * <li> [3] contains number of under replicated blocks in the system.</li>
    * <li> [4] contains number of blocks with a corrupt replica. </li>
    * <li> [5] contains number of blocks without any good replicas left. </li>
+   * <li> [5] contains number of blocks without any good replicas left. </li>
+   * <li> [6] contains the total used space of the block pool. </li>
    * </ul>
    * Use public constants like {@link #GET_STATS_CAPACITY_IDX} in place of 
    * actual numbers to index into the array.
@@ -854,8 +856,8 @@ public interface ClientProtocol extends 
    * @return a located block with a new generation stamp and an access token
    * @throws IOException if any error occurs
    */
-  public LocatedBlock updateBlockForPipeline(Block block, String clientName) 
-      throws IOException;
+  public LocatedBlock updateBlockForPipeline(ExtendedBlock block,
+      String clientName) throws IOException;
 
   /**
    * Update a pipeline for a block under construction
@@ -866,8 +868,8 @@ public interface ClientProtocol extends 
    * @param newNodes datanodes in the pipeline
    * @throws IOException if any error occurs
    */
-  public void updatePipeline(String clientName, Block oldBlock, 
-      Block newBlock, DatanodeID[] newNodes)
+  public void updatePipeline(String clientName, ExtendedBlock oldBlock, 
+      ExtendedBlock newBlock, DatanodeID[] newNodes)
       throws IOException;
 
   /**

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Fri Apr 29 18:16:32 2011
@@ -51,10 +51,11 @@ public interface DataTransferProtocol {
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 22:
-   *    Add a new feature to replace datanode on failure.
+   * Version 23:
+   *    Changed the protocol methods to use ExtendedBlock instead
+   *    of Block.
    */
-  public static final int DATA_TRANSFER_VERSION = 22;
+  public static final int DATA_TRANSFER_VERSION = 23;
 
   /** Operation */
   public enum Op {
@@ -238,7 +239,7 @@ public interface DataTransferProtocol {
     }
 
     /** Send OP_READ_BLOCK */
-    public static void opReadBlock(DataOutputStream out, Block blk,
+    public static void opReadBlock(DataOutputStream out, ExtendedBlock blk,
         long blockOffset, long blockLen, String clientName,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
@@ -253,7 +254,7 @@ public interface DataTransferProtocol {
     }
     
     /** Send OP_WRITE_BLOCK */
-    public static void opWriteBlock(DataOutputStream out, Block blk,
+    public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk,
         int pipelineSize, BlockConstructionStage stage, long newGs,
         long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
         DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
@@ -277,7 +278,7 @@ public interface DataTransferProtocol {
     }
 
     /** Send {@link Op#TRANSFER_BLOCK} */
-    public static void opTransferBlock(DataOutputStream out, Block blk,
+    public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk,
         String client, DatanodeInfo[] targets,
         Token<BlockTokenIdentifier> blockToken) throws IOException {
       op(out, Op.TRANSFER_BLOCK);
@@ -291,7 +292,7 @@ public interface DataTransferProtocol {
 
     /** Send OP_REPLACE_BLOCK */
     public static void opReplaceBlock(DataOutputStream out,
-        Block blk, String storageId, DatanodeInfo src,
+        ExtendedBlock blk, String storageId, DatanodeInfo src,
         Token<BlockTokenIdentifier> blockToken) throws IOException {
       op(out, Op.REPLACE_BLOCK);
 
@@ -303,7 +304,7 @@ public interface DataTransferProtocol {
     }
 
     /** Send OP_COPY_BLOCK */
-    public static void opCopyBlock(DataOutputStream out, Block blk,
+    public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
       op(out, Op.COPY_BLOCK);
@@ -314,7 +315,7 @@ public interface DataTransferProtocol {
     }
 
     /** Send OP_BLOCK_CHECKSUM */
-    public static void opBlockChecksum(DataOutputStream out, Block blk,
+    public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
       op(out, Op.BLOCK_CHECKSUM);
@@ -377,7 +378,7 @@ public interface DataTransferProtocol {
 
     /** Receive OP_READ_BLOCK */
     private void opReadBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final long offset = in.readLong();
       final long length = in.readLong();
@@ -390,13 +391,13 @@ public interface DataTransferProtocol {
     /**
      * Abstract OP_READ_BLOCK method. Read a block.
      */
-    protected abstract void opReadBlock(DataInputStream in, Block blk,
+    protected abstract void opReadBlock(DataInputStream in, ExtendedBlock blk,
         long offset, long length, String client,
         Token<BlockTokenIdentifier> blockToken) throws IOException;
     
     /** Receive OP_WRITE_BLOCK */
     private void opWriteBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
       final BlockConstructionStage stage = 
@@ -418,7 +419,7 @@ public interface DataTransferProtocol {
      * Abstract OP_WRITE_BLOCK method. 
      * Write a block.
      */
-    protected abstract void opWriteBlock(DataInputStream in, Block blk,
+    protected abstract void opWriteBlock(DataInputStream in, ExtendedBlock blk,
         int pipelineSize, BlockConstructionStage stage, long newGs,
         long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
         DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
@@ -426,7 +427,7 @@ public interface DataTransferProtocol {
 
     /** Receive {@link Op#TRANSFER_BLOCK} */
     private void opTransferBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final String client = Text.readString(in);
       final DatanodeInfo targets[] = readDatanodeInfos(in);
@@ -440,14 +441,14 @@ public interface DataTransferProtocol {
      * For {@link BlockConstructionStage#TRANSFER_RBW}
      * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
      */
-    protected abstract void opTransferBlock(DataInputStream in, Block blk,
+    protected abstract void opTransferBlock(DataInputStream in, ExtendedBlock blk,
         String client, DatanodeInfo[] targets,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException;
 
     /** Receive OP_REPLACE_BLOCK */
     private void opReplaceBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final String sourceId = Text.readString(in); // read del hint
       final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
@@ -461,12 +462,12 @@ public interface DataTransferProtocol {
      * It is used for balancing purpose; send to a destination
      */
     protected abstract void opReplaceBlock(DataInputStream in,
-        Block blk, String sourceId, DatanodeInfo src,
+        ExtendedBlock blk, String sourceId, DatanodeInfo src,
         Token<BlockTokenIdentifier> blockToken) throws IOException;
 
     /** Receive OP_COPY_BLOCK */
     private void opCopyBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
@@ -477,13 +478,13 @@ public interface DataTransferProtocol {
      * Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
      * a proxy source.
      */
-    protected abstract void opCopyBlock(DataInputStream in, Block blk,
+    protected abstract void opCopyBlock(DataInputStream in, ExtendedBlock blk,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException;
 
     /** Receive OP_BLOCK_CHECKSUM */
     private void opBlockChecksum(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
@@ -495,7 +496,7 @@ public interface DataTransferProtocol {
      * Get the checksum of a block 
      */
     protected abstract void opBlockChecksum(DataInputStream in,
-        Block blk, Token<BlockTokenIdentifier> blockToken)
+        ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
         throws IOException;
 
     /** Read an array of {@link DatanodeInfo} */

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Fri Apr 29 18:16:32 2011
@@ -24,6 +24,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -76,6 +77,18 @@ public class DatanodeID implements Writa
     this.ipcPort = ipcPort;
   }
   
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public void setInfoPort(int infoPort) {
+    this.infoPort = infoPort;
+  }
+  
+  public void setIpcPort(int ipcPort) {
+    this.ipcPort = ipcPort;
+  }
+  
   /**
    * @return hostname:portNumber.
    */

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Fri Apr 29 18:16:32 2011
@@ -24,6 +24,7 @@ import java.util.Date;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
@@ -48,6 +49,7 @@ public class DatanodeInfo extends Datano
   protected long capacity;
   protected long dfsUsed;
   protected long remaining;
+  protected long blockPoolUsed;
   protected long lastUpdate;
   protected int xceiverCount;
   protected String location = NetworkTopology.DEFAULT_RACK;
@@ -89,6 +91,7 @@ public class DatanodeInfo extends Datano
     this.capacity = from.getCapacity();
     this.dfsUsed = from.getDfsUsed();
     this.remaining = from.getRemaining();
+    this.blockPoolUsed = from.getBlockPoolUsed();
     this.lastUpdate = from.getLastUpdate();
     this.xceiverCount = from.getXceiverCount();
     this.location = from.getNetworkLocation();
@@ -101,6 +104,7 @@ public class DatanodeInfo extends Datano
     this.capacity = 0L;
     this.dfsUsed = 0L;
     this.remaining = 0L;
+    this.blockPoolUsed = 0L;
     this.lastUpdate = 0L;
     this.xceiverCount = 0;
     this.adminState = null;    
@@ -118,6 +122,9 @@ public class DatanodeInfo extends Datano
   /** The used space by the data node. */
   public long getDfsUsed() { return dfsUsed; }
 
+  /** The used space by the block pool on data node. */
+  public long getBlockPoolUsed() { return blockPoolUsed; }
+
   /** The used space by the data node. */
   public long getNonDfsUsed() { 
     long nonDFSUsed = capacity - dfsUsed - remaining;
@@ -126,23 +133,20 @@ public class DatanodeInfo extends Datano
 
   /** The used space by the data node as percentage of present capacity */
   public float getDfsUsedPercent() { 
-    if (capacity <= 0) {
-      return 100;
-    }
-
-    return ((float)dfsUsed * 100.0f)/(float)capacity; 
+    return DFSUtil.getPercentUsed(dfsUsed, capacity);
   }
 
   /** The raw free space. */
   public long getRemaining() { return remaining; }
 
+  /** Used space by the block pool as percentage of present capacity */
+  public float getBlockPoolUsedPercent() {
+    return DFSUtil.getPercentUsed(blockPoolUsed, capacity);
+  }
+  
   /** The remaining space as percentage of configured capacity. */
   public float getRemainingPercent() { 
-    if (capacity <= 0) {
-      return 0;
-    }
-
-    return ((float)remaining * 100.0f)/(float)capacity; 
+    return DFSUtil.getPercentRemaining(remaining, capacity);
   }
 
   /** The time when this information was accurate. */
@@ -161,6 +165,11 @@ public class DatanodeInfo extends Datano
     this.remaining = remaining; 
   }
 
+  /** Sets block pool used space */
+  public void setBlockPoolUsed(long bpUsed) { 
+    this.blockPoolUsed = bpUsed; 
+  }
+
   /** Sets time when this information was accurate. */
   public void setLastUpdate(long lastUpdate) { 
     this.lastUpdate = lastUpdate; 
@@ -342,6 +351,7 @@ public class DatanodeInfo extends Datano
     out.writeLong(capacity);
     out.writeLong(dfsUsed);
     out.writeLong(remaining);
+    out.writeLong(blockPoolUsed);
     out.writeLong(lastUpdate);
     out.writeInt(xceiverCount);
     Text.writeString(out, location);
@@ -359,6 +369,7 @@ public class DatanodeInfo extends Datano
     this.capacity = in.readLong();
     this.dfsUsed = in.readLong();
     this.remaining = in.readLong();
+    this.blockPoolUsed = in.readLong();
     this.lastUpdate = in.readLong();
     this.xceiverCount = in.readInt();
     this.location = Text.readString(in);

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Fri Apr 29 18:16:32 2011
@@ -88,7 +88,7 @@ public interface FSConstants {
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -34;
+  public static final int LAYOUT_VERSION = -35;
   // Current version: 
-  // -31, -32 and -33 are reserved for 0.20.203, 0.20.204 and 0.22.
+  // -35: Adding support for block pools and multiple namenodes
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Fri Apr 29 18:16:32 2011
@@ -42,7 +42,7 @@ public class LocatedBlock implements Wri
        });
   }
 
-  private Block b;
+  private ExtendedBlock b;
   private long offset;  // offset of the first byte of the block in the file
   private DatanodeInfo[] locs;
   // corrupt flag is true if all of the replicas of a block are corrupt.
@@ -51,27 +51,23 @@ public class LocatedBlock implements Wri
   private boolean corrupt;
   private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
 
-  /**
-   */
   public LocatedBlock() {
-    this(new Block(), new DatanodeInfo[0], 0L, false);
+    this(new ExtendedBlock(), new DatanodeInfo[0], 0L, false);
   }
 
-  /**
-   */
-  public LocatedBlock(Block b, DatanodeInfo[] locs) {
+  public LocatedBlock(String bpid, Block b, DatanodeInfo[] locs) {
+    this(new ExtendedBlock(bpid, b), locs, -1, false); // startOffset is unknown
+  }
+
+  public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
     this(b, locs, -1, false); // startOffset is unknown
   }
 
-  /**
-   */
-  public LocatedBlock(Block b, DatanodeInfo[] locs, long startOffset) {
+  public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset) {
     this(b, locs, startOffset, false);
   }
 
-  /**
-   */
-  public LocatedBlock(Block b, DatanodeInfo[] locs, long startOffset, 
+  public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset, 
                       boolean corrupt) {
     this.b = b;
     this.offset = startOffset;
@@ -93,7 +89,7 @@ public class LocatedBlock implements Wri
 
   /**
    */
-  public Block getBlock() {
+  public ExtendedBlock getBlock() {
     return b;
   }
 
@@ -141,7 +137,7 @@ public class LocatedBlock implements Wri
     blockToken.readFields(in);
     this.corrupt = in.readBoolean();
     offset = in.readLong();
-    this.b = new Block();
+    this.b = new ExtendedBlock();
     b.readFields(in);
     int count = in.readInt();
     this.locs = new DatanodeInfo[count];

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java Fri Apr 29 18:16:32 2011
@@ -37,19 +37,21 @@ public class BlockTokenIdentifier extend
   private long expiryDate;
   private int keyId;
   private String userId;
+  private String blockPoolId;
   private long blockId;
   private EnumSet<AccessMode> modes;
 
   private byte [] cache;
   
   public BlockTokenIdentifier() {
-    this(null, 0, EnumSet.noneOf(AccessMode.class));
+    this(null, null, 0, EnumSet.noneOf(AccessMode.class));
   }
 
-  public BlockTokenIdentifier(String userId, long blockId,
+  public BlockTokenIdentifier(String userId, String bpid, long blockId,
       EnumSet<AccessMode> modes) {
     this.cache = null;
     this.userId = userId;
+    this.blockPoolId = bpid;
     this.blockId = blockId;
     this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
   }
@@ -62,7 +64,8 @@ public class BlockTokenIdentifier extend
   @Override
   public UserGroupInformation getUser() {
     if (userId == null || "".equals(userId)) {
-      return UserGroupInformation.createRemoteUser(Long.toString(blockId));
+      String user = blockPoolId + ":" + Long.toString(blockId);
+      return UserGroupInformation.createRemoteUser(user);
     }
     return UserGroupInformation.createRemoteUser(userId);
   }
@@ -89,6 +92,10 @@ public class BlockTokenIdentifier extend
     return userId;
   }
 
+  public String getBlockPoolId() {
+    return blockPoolId;
+  }
+
   public long getBlockId() {
     return blockId;
   }
@@ -101,6 +108,7 @@ public class BlockTokenIdentifier extend
   public String toString() {
     return "block_token_identifier (expiryDate=" + this.getExpiryDate()
         + ", keyId=" + this.getKeyId() + ", userId=" + this.getUserId()
+        + ", blockPoolId=" + this.getBlockPoolId()
         + ", blockId=" + this.getBlockId() + ", access modes="
         + this.getAccessModes() + ")";
   }
@@ -117,7 +125,9 @@ public class BlockTokenIdentifier extend
     if (obj instanceof BlockTokenIdentifier) {
       BlockTokenIdentifier that = (BlockTokenIdentifier) obj;
       return this.expiryDate == that.expiryDate && this.keyId == that.keyId
-          && isEqual(this.userId, that.userId) && this.blockId == that.blockId
+          && isEqual(this.userId, that.userId) 
+          && isEqual(this.blockPoolId, that.blockPoolId)
+          && this.blockId == that.blockId
           && isEqual(this.modes, that.modes);
     }
     return false;
@@ -126,7 +136,8 @@ public class BlockTokenIdentifier extend
   /** {@inheritDoc} */
   public int hashCode() {
     return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()
-        ^ (userId == null ? 0 : userId.hashCode());
+        ^ (userId == null ? 0 : userId.hashCode())
+        ^ (blockPoolId == null ? 0 : blockPoolId.hashCode());
   }
 
   public void readFields(DataInput in) throws IOException {
@@ -134,6 +145,7 @@ public class BlockTokenIdentifier extend
     expiryDate = WritableUtils.readVLong(in);
     keyId = WritableUtils.readVInt(in);
     userId = WritableUtils.readString(in);
+    blockPoolId = WritableUtils.readString(in);
     blockId = WritableUtils.readVLong(in);
     int length = WritableUtils.readVInt(in);
     for (int i = 0; i < length; i++) {
@@ -145,6 +157,7 @@ public class BlockTokenIdentifier extend
     WritableUtils.writeVLong(out, expiryDate);
     WritableUtils.writeVInt(out, keyId);
     WritableUtils.writeString(out, userId);
+    WritableUtils.writeString(out, blockPoolId);
     WritableUtils.writeVLong(out, blockId);
     WritableUtils.writeVInt(out, modes.size());
     for (AccessMode aMode : modes) {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java Fri Apr 29 18:16:32 2011
@@ -31,7 +31,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
@@ -174,7 +174,7 @@ public class BlockTokenSecretManager ext
   }
 
   /** Generate an block token for current user */
-  public Token<BlockTokenIdentifier> generateToken(Block block,
+  public Token<BlockTokenIdentifier> generateToken(ExtendedBlock block,
       EnumSet<AccessMode> modes) throws IOException {
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     String userID = (ugi == null ? null : ugi.getShortUserName());
@@ -182,10 +182,10 @@ public class BlockTokenSecretManager ext
   }
 
   /** Generate a block token for a specified user */
-  public Token<BlockTokenIdentifier> generateToken(String userId, Block block,
-      EnumSet<AccessMode> modes) throws IOException {
+  public Token<BlockTokenIdentifier> generateToken(String userId,
+      ExtendedBlock block, EnumSet<AccessMode> modes) throws IOException {
     BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
-        .getBlockId(), modes);
+        .getBlockPoolId(), block.getBlockId(), modes);
     return new Token<BlockTokenIdentifier>(id, this);
   }
 
@@ -194,8 +194,8 @@ public class BlockTokenSecretManager ext
    * method doesn't check if token password is correct. It should be used only
    * when token password has already been verified (e.g., in the RPC layer).
    */
-  public void checkAccess(BlockTokenIdentifier id, String userId, Block block,
-      AccessMode mode) throws InvalidToken {
+  public void checkAccess(BlockTokenIdentifier id, String userId,
+      ExtendedBlock block, AccessMode mode) throws InvalidToken {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Checking access for user=" + userId + ", block=" + block
           + ", access mode=" + mode + " using " + id.toString());
@@ -204,6 +204,10 @@ public class BlockTokenSecretManager ext
       throw new InvalidToken("Block token with " + id.toString()
           + " doesn't belong to user " + userId);
     }
+    if (!id.getBlockPoolId().equals(block.getBlockPoolId())) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " doesn't apply to block " + block);
+    }
     if (id.getBlockId() != block.getBlockId()) {
       throw new InvalidToken("Block token with " + id.toString()
           + " doesn't apply to block " + block);
@@ -220,7 +224,7 @@ public class BlockTokenSecretManager ext
 
   /** Check if access should be allowed. userID is not checked if null */
   public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
-      Block block, AccessMode mode) throws InvalidToken {
+      ExtendedBlock block, AccessMode mode) throws InvalidToken {
     BlockTokenIdentifier id = new BlockTokenIdentifier();
     try {
       id.readFields(new DataInputStream(new ByteArrayInputStream(token

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Apr 29 18:16:32 2011
@@ -24,16 +24,14 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.text.DateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
-import java.util.EnumSet;
 import java.util.Formatter;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -46,45 +44,31 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
-import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -187,27 +171,19 @@ import org.apache.hadoop.util.ToolRunner
  */
 
 @InterfaceAudience.Private
-public class Balancer implements Tool {
-  private static final Log LOG = 
-    LogFactory.getLog(Balancer.class.getName());
+public class Balancer {
+  static final Log LOG = LogFactory.getLog(Balancer.class);
   final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
+  private static long WIN_WIDTH = 5400*1000L; // 1.5 hour
 
   /** The maximum number of concurrent blocks moves for 
    * balancing purpose at a datanode
    */
   public static final int MAX_NUM_CONCURRENT_MOVES = 5;
   
-  private Configuration conf;
-
-  private double threshold = 10D;
-  private NamenodeProtocol namenode;
-  private ClientProtocol client;
-  private FileSystem fs;
-  private boolean isBlockTokenEnabled;
-  private boolean shouldRun;
-  private long keyUpdaterInterval;
-  private BlockTokenSecretManager blockTokenSecretManager;
-  private Daemon keyupdaterthread = null; // AccessKeyUpdater thread
+  private final NameNodeConnector nnc;
+  private final BalancingPolicy policy;
+  private final double threshold;
   private final static Random rnd = new Random();
   
   // all data node lists
@@ -233,8 +209,6 @@ public class Balancer implements Tool {
   
   private NetworkTopology cluster = new NetworkTopology();
   
-  private double avgUtilization = 0.0D;
-  
   final static private int MOVER_THREAD_POOL_SIZE = 1000;
   final private ExecutorService moverExecutor = 
     Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
@@ -242,6 +216,7 @@ public class Balancer implements Tool {
   final private ExecutorService dispatcherExecutor =
     Executors.newFixedThreadPool(DISPATCHER_THREAD_POOL_SIZE);
   
+
   /* This class keeps track of a scheduled block move */
   private class PendingBlockMove {
     private BalancerBlock block;
@@ -369,14 +344,9 @@ public class Balancer implements Tool {
     
     /* Send a block replace request to the output stream*/
     private void sendRequest(DataOutputStream out) throws IOException {
-      Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
-      if (isBlockTokenEnabled) {
-        accessToken = blockTokenSecretManager.generateToken(null, block
-            .getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
-            BlockTokenSecretManager.AccessMode.COPY));
-      }
-      DataTransferProtocol.Sender.opReplaceBlock(out,
-          block.getBlock(), source.getStorageID(), 
+      final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
+      final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
+      DataTransferProtocol.Sender.opReplaceBlock(out, eb, source.getStorageID(), 
           proxySource.getDatanode(), accessToken);
     }
     
@@ -487,30 +457,33 @@ public class Balancer implements Tool {
     }
   }
   
-  /* Return the utilization of a datanode */
-  static private double getUtilization(DatanodeInfo datanode) {
-    return ((double)datanode.getDfsUsed())/datanode.getCapacity()*100;
-  }
   
   /* A class that keeps track of a datanode in Balancer */
   private static class BalancerDatanode {
     final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
-    protected DatanodeInfo datanode;
-    private double utilization;
-    protected long maxSizeToMove;
+    final DatanodeInfo datanode;
+    final double utilization;
+    final long maxSize2Move;
     protected long scheduledSize = 0L;
     //  blocks being moved but not confirmed yet
     private List<PendingBlockMove> pendingBlocks = 
       new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES); 
     
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + "[" + getName()
+          + ", utilization=" + utilization + "]";
+    }
+
     /* Constructor 
      * Depending on avgutil & threshold, calculate maximum bytes to move 
      */
-    private BalancerDatanode(
-        DatanodeInfo node, double avgUtil, double threshold) {
+    private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold) {
       datanode = node;
-      utilization = Balancer.getUtilization(node);
-        
+      utilization = policy.getUtilization(node);
+      final double avgUtil = policy.getAvgUtilization();
+      long maxSizeToMove;
+
       if (utilization >= avgUtil+threshold
           || utilization <= avgUtil-threshold) { 
         maxSizeToMove = (long)(threshold*datanode.getCapacity()/100);
@@ -521,7 +494,7 @@ public class Balancer implements Tool {
       if (utilization < avgUtil ) {
         maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
       }
-      maxSizeToMove = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+      this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
     }
     
     /** Get the datanode */
@@ -541,12 +514,12 @@ public class Balancer implements Tool {
     
     /** Decide if still need to move more bytes */
     protected boolean isMoveQuotaFull() {
-      return scheduledSize<maxSizeToMove;
+      return scheduledSize<maxSize2Move;
     }
 
     /** Return the total number of bytes that need to be moved */
     protected long availableSizeToMove() {
-      return maxSizeToMove-scheduledSize;
+      return maxSize2Move-scheduledSize;
     }
     
     /* increment scheduled size */
@@ -604,8 +577,8 @@ public class Balancer implements Tool {
             = new ArrayList<BalancerBlock>();
     
     /* constructor */
-    private Source(DatanodeInfo node, double avgUtil, double threshold) {
-      super(node, avgUtil, threshold);
+    private Source(DatanodeInfo node, BalancingPolicy policy, double threshold) {
+      super(node, policy, threshold);
     }
     
     /** Add a node task */
@@ -626,7 +599,7 @@ public class Balancer implements Tool {
      * Return the total size of the received blocks in the number of bytes.
      */
     private long getBlockList() throws IOException {
-      BlockWithLocations[] newBlocks = namenode.getBlocks(datanode, 
+      BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(datanode, 
         Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks();
       long bytesReceived = 0;
       for (BlockWithLocations blk : newBlocks) {
@@ -780,160 +753,25 @@ public class Balancer implements Tool {
   /* Check that this Balancer is compatible with the Block Placement Policy
    * used by the Namenode.
    */
-  private void checkReplicationPolicyCompatibility(Configuration conf) throws UnsupportedActionException {
+  private static void checkReplicationPolicyCompatibility(Configuration conf
+      ) throws UnsupportedActionException {
     if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != 
         BlockPlacementPolicyDefault.class) {
       throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
     }
   }
-  
-  /** Default constructor */
-  Balancer() throws UnsupportedActionException {
-  }
-  
-  /** Construct a balancer from the given configuration */
-  Balancer(Configuration conf) throws UnsupportedActionException {
-    checkReplicationPolicyCompatibility(conf);
-    setConf(conf);
-  } 
-
-  /** Construct a balancer from the given configuration and threshold */
-  Balancer(Configuration conf, double threshold) throws UnsupportedActionException {
-    checkReplicationPolicyCompatibility(conf);
-    setConf(conf);
-    this.threshold = threshold;
-  }
 
   /**
-   * Run a balancer
-   * @param args
-   */
-  public static void main(String[] args) {
-    try {
-      System.exit( ToolRunner.run(null, new Balancer(), args) );
-    } catch (Throwable e) {
-      LOG.error(StringUtils.stringifyException(e));
-      System.exit(-1);
-    }
-
-  }
-
-  private static void printUsage() {
-    System.out.println("Usage: java Balancer");
-    System.out.println("          [-threshold <threshold>]\t" 
-        +"percentage of disk capacity");
-  }
-
-  /* parse argument to get the threshold */
-  private double parseArgs(String[] args) {
-    double threshold=0;
-    int argsLen = (args == null) ? 0 : args.length;
-    if (argsLen==0) {
-      threshold = 10;
-    } else {
-      if (argsLen != 2 || !"-threshold".equalsIgnoreCase(args[0])) {
-        printUsage();
-        throw new IllegalArgumentException(Arrays.toString(args));
-      } else {
-        try {
-          threshold = Double.parseDouble(args[1]);
-          if (threshold < 0 || threshold >100) {
-            throw new NumberFormatException();
-          }
-          LOG.info( "Using a threshold of " + threshold );
-        } catch(NumberFormatException e) {
-          System.err.println(
-              "Expect a double parameter in the range of [0, 100]: "+ args[1]);
-          printUsage();
-          throw e;
-        }
-      }
-    }
-    return threshold;
-  }
-  
-  /* Initialize balancer. It sets the value of the threshold, and 
+   * Construct a balancer.
+   * Initialize balancer. It sets the value of the threshold, and 
    * builds the communication proxies to
    * namenode as a client and a secondary namenode and retry proxies
    * when connection fails.
    */
-  private void init(double threshold) throws IOException {
-    this.threshold = threshold;
-    this.namenode = createNamenode(conf);
-    this.client = DFSClient.createNamenode(conf);
-    this.fs = FileSystem.get(conf);
-    ExportedBlockKeys keys = namenode.getBlockKeys();
-    this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
-    if (isBlockTokenEnabled) {
-      long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
-      long blockTokenLifetime = keys.getTokenLifetime();
-      LOG.info("Block token params received from NN: keyUpdateInterval="
-          + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
-          + blockTokenLifetime / (60 * 1000) + " min(s)");
-      this.blockTokenSecretManager = new BlockTokenSecretManager(false,
-          blockKeyUpdateInterval, blockTokenLifetime);
-      this.blockTokenSecretManager.setKeys(keys);
-      /*
-       * Balancer should sync its block keys with NN more frequently than NN
-       * updates its block keys
-       */
-      this.keyUpdaterInterval = blockKeyUpdateInterval / 4;
-      LOG.info("Balancer will update its block keys every "
-          + keyUpdaterInterval / (60 * 1000) + " minute(s)");
-      this.keyupdaterthread = new Daemon(new BlockKeyUpdater());
-      this.shouldRun = true;
-      this.keyupdaterthread.start();
-    }
-  }
-  
-  /**
-   * Periodically updates access keys.
-   */
-  class BlockKeyUpdater implements Runnable {
-
-    public void run() {
-      while (shouldRun) {
-        try {
-          blockTokenSecretManager.setKeys(namenode.getBlockKeys());
-        } catch (Exception e) {
-          LOG.error(StringUtils.stringifyException(e));
-        }
-        try {
-          Thread.sleep(keyUpdaterInterval);
-        } catch (InterruptedException ie) {
-        }
-      }
-    }
-  }
-  
-  /* Build a NamenodeProtocol connection to the namenode and
-   * set up the retry policy */ 
-  private static NamenodeProtocol createNamenode(Configuration conf)
-    throws IOException {
-    InetSocketAddress nameNodeAddr = NameNode.getServiceAddress(conf, true);
-    RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
-        5, 200, TimeUnit.MILLISECONDS);
-    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
-      new HashMap<Class<? extends Exception>, RetryPolicy>();
-    RetryPolicy methodPolicy = RetryPolicies.retryByException(
-        timeoutPolicy, exceptionToPolicyMap);
-    Map<String,RetryPolicy> methodNameToPolicyMap =
-        new HashMap<String, RetryPolicy>();
-    methodNameToPolicyMap.put("getBlocks", methodPolicy);
-    methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
-
-    UserGroupInformation ugi;
-    ugi = UserGroupInformation.getCurrentUser();
-
-    return (NamenodeProtocol) RetryProxy.create(
-        NamenodeProtocol.class,
-        RPC.getProxy(NamenodeProtocol.class,
-            NamenodeProtocol.versionID,
-            nameNodeAddr,
-            ugi,
-            conf,
-            NetUtils.getDefaultSocketFactory(conf)),
-        methodNameToPolicyMap);
+  Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
+    this.threshold = p.threshold;
+    this.policy = p.policy;
+    this.nnc = theblockpool;
   }
   
   /* Shuffle datanode array */
@@ -946,13 +784,6 @@ public class Balancer implements Tool {
     }
   }
   
-  /* get all live datanodes of a cluster and their disk usage
-   * decide the number of bytes need to be moved
-   */
-  private long initNodes() throws IOException {
-    return initNodes(client.getDatanodeReport(DatanodeReportType.LIVE));
-  }
-  
   /* Given a data node set, build a network topology and decide
    * over-utilized datanodes, above average utilized datanodes, 
    * below average utilized datanodes, and underutilized datanodes. 
@@ -968,15 +799,13 @@ public class Balancer implements Tool {
    */
   private long initNodes(DatanodeInfo[] datanodes) {
     // compute average utilization
-    long totalCapacity=0L, totalUsedSpace=0L;
     for (DatanodeInfo datanode : datanodes) {
       if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
         continue; // ignore decommissioning or decommissioned nodes
       }
-      totalCapacity += datanode.getCapacity();
-      totalUsedSpace += datanode.getDfsUsed();
+      policy.accumulateSpaces(datanode);
     }
-    this.avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
+    policy.initAvgUtilization();
 
     /*create network topology and all data node lists: 
      * overloaded, above-average, below-average, and underloaded
@@ -991,19 +820,20 @@ public class Balancer implements Tool {
       }
       cluster.add(datanode);
       BalancerDatanode datanodeS;
-      if (getUtilization(datanode) > avgUtilization) {
-        datanodeS = new Source(datanode, avgUtilization, threshold);
+      final double avg = policy.getAvgUtilization();
+      if (policy.getUtilization(datanode) > avg) {
+        datanodeS = new Source(datanode, policy, threshold);
         if (isAboveAvgUtilized(datanodeS)) {
           this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
         } else {
           assert(isOverUtilized(datanodeS)) :
             datanodeS.getName()+ "is not an overUtilized node";
           this.overUtilizedDatanodes.add((Source)datanodeS);
-          overLoadedBytes += (long)((datanodeS.utilization-avgUtilization
+          overLoadedBytes += (long)((datanodeS.utilization-avg
               -threshold)*datanodeS.datanode.getCapacity()/100.0);
         }
       } else {
-        datanodeS = new BalancerDatanode(datanode, avgUtilization, threshold);
+        datanodeS = new BalancerDatanode(datanode, policy, threshold);
         if ( isBelowOrEqualAvgUtilized(datanodeS)) {
           this.belowAvgUtilizedDatanodes.add(datanodeS);
         } else {
@@ -1011,7 +841,7 @@ public class Balancer implements Tool {
               + datanodeS.getName() + ")=" + isUnderUtilized(datanodeS)
               + ", utilization=" + datanodeS.utilization; 
           this.underUtilizedDatanodes.add(datanodeS);
-          underLoadedBytes += (long)((avgUtilization-threshold-
+          underLoadedBytes += (long)((avg-threshold-
               datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
         }
       }
@@ -1019,7 +849,7 @@ public class Balancer implements Tool {
     }
 
     //logging
-    logImbalancedNodes();
+    logNodes();
     
     assert (this.datanodes.size() == 
       overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
@@ -1031,25 +861,20 @@ public class Balancer implements Tool {
   }
 
   /* log the over utilized & under utilized nodes */
-  private void logImbalancedNodes() {
-    StringBuilder msg = new StringBuilder();
-    msg.append(overUtilizedDatanodes.size());
-    msg.append(" over utilized nodes:");
-    for (Source node : overUtilizedDatanodes) {
-      msg.append( " " );
-      msg.append( node.getName() );
-    }
-    LOG.info(msg);
-    msg = new StringBuilder();
-    msg.append(underUtilizedDatanodes.size());
-    msg.append(" under utilized nodes: ");
-    for (BalancerDatanode node : underUtilizedDatanodes) {
-      msg.append( " " );
-      msg.append( node.getName() );
-    }
-    LOG.info(msg);
+  private void logNodes() {
+    logNodes("over-utilized", overUtilizedDatanodes);
+    if (LOG.isTraceEnabled()) {
+      logNodes("above-average", aboveAvgUtilizedDatanodes);
+      logNodes("below-average", belowAvgUtilizedDatanodes);
+    }
+    logNodes("underutilized", underUtilizedDatanodes);
   }
-  
+
+  private static <T extends BalancerDatanode> void logNodes(
+      String name, Collection<T> nodes) {
+    LOG.info(nodes.size() + " " + name + ": " + nodes);
+  }
+
   /* Decide all <source, target> pairs and
    * the number of bytes to move from a source to a target
    * Maximum bytes to be moved per node is
@@ -1313,7 +1138,6 @@ public class Balancer implements Tool {
    */ 
   private static class MovedBlocks {
     private long lastCleanupTime = System.currentTimeMillis();
-    private static long winWidth = 5400*1000L; // 1.5 hour
     final private static int CUR_WIN = 0;
     final private static int OLD_WIN = 1;
     final private static int NUM_WINS = 2;
@@ -1326,13 +1150,6 @@ public class Balancer implements Tool {
       movedBlocks.add(new HashMap<Block,BalancerBlock>());
     }
 
-    /* set the win width */
-    private void setWinWidth(Configuration conf) {
-      winWidth = conf.getLong(
-          DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 
-          DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
-    }
-    
     /* add a block thus marking a block to be moved */
     synchronized private void add(BalancerBlock block) {
       movedBlocks.get(CUR_WIN).put(block.getBlock(), block);
@@ -1353,7 +1170,7 @@ public class Balancer implements Tool {
     synchronized private void cleanup() {
       long curTime = System.currentTimeMillis();
       // check if old win is older than winWidth
-      if (lastCleanupTime + winWidth <= curTime) {
+      if (lastCleanupTime + WIN_WIDTH <= curTime) {
         // purge the old window
         movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN));
         movedBlocks.set(CUR_WIN, new HashMap<Block, BalancerBlock>());
@@ -1419,7 +1236,7 @@ public class Balancer implements Tool {
     this.datanodes.clear();
     this.sources.clear();
     this.targets.clear();  
-    this.avgUtilization = 0.0D;
+    this.policy.reset();
     cleanGlobalBlockList();
     this.movedBlocks.cleanup();
   }
@@ -1439,182 +1256,172 @@ public class Balancer implements Tool {
   
   /* Return true if the given datanode is overUtilized */
   private boolean isOverUtilized(BalancerDatanode datanode) {
-    return datanode.utilization > (avgUtilization+threshold);
+    return datanode.utilization > (policy.getAvgUtilization()+threshold);
   }
   
   /* Return true if the given datanode is above average utilized
    * but not overUtilized */
   private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
-    return (datanode.utilization <= (avgUtilization+threshold))
-        && (datanode.utilization > avgUtilization);
+    final double avg = policy.getAvgUtilization();
+    return (datanode.utilization <= (avg+threshold))
+        && (datanode.utilization > avg);
   }
   
   /* Return true if the given datanode is underUtilized */
   private boolean isUnderUtilized(BalancerDatanode datanode) {
-    return datanode.utilization < (avgUtilization-threshold);
+    return datanode.utilization < (policy.getAvgUtilization()-threshold);
   }
 
   /* Return true if the given datanode is below average utilized 
    * but not underUtilized */
   private boolean isBelowOrEqualAvgUtilized(BalancerDatanode datanode) {
-        return (datanode.utilization >= (avgUtilization-threshold))
-                 && (datanode.utilization <= avgUtilization);
+    final double avg = policy.getAvgUtilization();
+    return (datanode.utilization >= (avg-threshold))
+             && (datanode.utilization <= avg);
   }
 
   // Exit status
-  final public static int SUCCESS = 1;
-  final public static int ALREADY_RUNNING = -1;
-  final public static int NO_MOVE_BLOCK = -2;
-  final public static int NO_MOVE_PROGRESS = -3;
-  final public static int IO_EXCEPTION = -4;
-  final public static int ILLEGAL_ARGS = -5;
-  /** main method of Balancer
-   * @param args arguments to a Balancer
-   * @throws Exception exception that occured during datanode balancing
-   */
-  public int run(String[] args) throws Exception {
-    long startTime = Util.now();
-    OutputStream out = null;
+  enum ReturnStatus {
+    SUCCESS(1),
+    IN_PROGRESS(0),
+    ALREADY_RUNNING(-1),
+    NO_MOVE_BLOCK(-2),
+    NO_MOVE_PROGRESS(-3),
+    IO_EXCEPTION(-4),
+    ILLEGAL_ARGS(-5),
+    INTERRUPTED(-6);
+
+    final int code;
+
+    ReturnStatus(int code) {
+      this.code = code;
+    }
+  }
+
+  /** Run an iteration for all datanodes. */
+  private ReturnStatus run(int iteration, Formatter formatter) {
     try {
-      // initialize a balancer
-      init(parseArgs(args));
+      /* get all live datanodes of a cluster and their disk usage
+       * decide the number of bytes need to be moved
+       */
+      final long bytesLeftToMove = initNodes(nnc.client.getDatanodeReport(DatanodeReportType.LIVE));
+      if (bytesLeftToMove == 0) {
+        System.out.println("The cluster is balanced. Exiting...");
+        return ReturnStatus.SUCCESS;
+      } else {
+        LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
+            + " to make the cluster balanced." );
+      }
       
-      /* Check if there is another balancer running.
-       * Exit if there is another one running.
+      /* Decide all the nodes that will participate in the block move and
+       * the number of bytes that need to be moved from one node to another
+       * in this iteration. Maximum bytes to be moved per node is
+       * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
        */
-      out = checkAndMarkRunningBalancer(); 
-      if (out == null) {
-        System.out.println("Another balancer is running. Exiting...");
-        return ALREADY_RUNNING;
+      final long bytesToMove = chooseNodes();
+      if (bytesToMove == 0) {
+        System.out.println("No block can be moved. Exiting...");
+        return ReturnStatus.NO_MOVE_BLOCK;
+      } else {
+        LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
+            " in this iteration");
       }
 
-      Formatter formatter = new Formatter(System.out);
-      System.out.println("Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved");
-      int iterations = 0;
-      while (true ) {
-        /* get all live datanodes of a cluster and their disk usage
-         * decide the number of bytes need to be moved
-         */
-        long bytesLeftToMove = initNodes();
-        if (bytesLeftToMove == 0) {
-          System.out.println("The cluster is balanced. Exiting...");
-          return SUCCESS;
-        } else {
-          LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
-              +" bytes to make the cluster balanced." );
-        }
-        
-        /* Decide all the nodes that will participate in the block move and
-         * the number of bytes that need to be moved from one node to another
-         * in this iteration. Maximum bytes to be moved per node is
-         * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
-         */
-        long bytesToMove = chooseNodes();
-        if (bytesToMove == 0) {
-          System.out.println("No block can be moved. Exiting...");
-          return NO_MOVE_BLOCK;
-        } else {
-          LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
-              "bytes in this iteration");
-        }
-   
-        formatter.format("%-24s %10d  %19s  %18s  %17s\n", 
-            DateFormat.getDateTimeInstance().format(new Date()),
-            iterations,
-            StringUtils.byteDesc(bytesMoved.get()),
-            StringUtils.byteDesc(bytesLeftToMove),
-            StringUtils.byteDesc(bytesToMove)
-            );
-        
-        /* For each pair of <source, target>, start a thread that repeatedly 
-         * decide a block to be moved and its proxy source, 
-         * then initiates the move until all bytes are moved or no more block
-         * available to move.
-         * Exit no byte has been moved for 5 consecutive iterations.
-         */
-        if (dispatchBlockMoves() > 0) {
-          notChangedIterations = 0;
-        } else {
-          notChangedIterations++;
-          if (notChangedIterations >= 5) {
-            System.out.println(
-                "No block has been moved for 5 iterations. Exiting...");
-            return NO_MOVE_PROGRESS;
-          }
-        }
-
-        // clean all lists
-        resetData();
-        
-        try {
-          Thread.sleep(2000*conf.getLong(
-              DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
-              DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT));
-        } catch (InterruptedException ignored) {
+      formatter.format("%-24s %10d  %19s  %18s  %17s\n", 
+          DateFormat.getDateTimeInstance().format(new Date()),
+          iteration,
+          StringUtils.byteDesc(bytesMoved.get()),
+          StringUtils.byteDesc(bytesLeftToMove),
+          StringUtils.byteDesc(bytesToMove)
+          );
+      
+      /* For each pair of <source, target>, start a thread that repeatedly 
+       * decide a block to be moved and its proxy source, 
+       * then initiates the move until all bytes are moved or no more block
+       * available to move.
+       * Exit no byte has been moved for 5 consecutive iterations.
+       */
+      if (dispatchBlockMoves() > 0) {
+        notChangedIterations = 0;
+      } else {
+        notChangedIterations++;
+        if (notChangedIterations >= 5) {
+          System.out.println(
+              "No block has been moved for 5 iterations. Exiting...");
+          return ReturnStatus.NO_MOVE_PROGRESS;
         }
-        
-        iterations++;
       }
-    } catch (IllegalArgumentException ae) {
-      return ILLEGAL_ARGS;
+
+      // clean all lists
+      resetData();
+      return ReturnStatus.IN_PROGRESS;
+    } catch (IllegalArgumentException e) {
+      System.out.println(e + ".  Exiting ...");
+      return ReturnStatus.ILLEGAL_ARGS;
     } catch (IOException e) {
-      System.out.println("Received an IO exception: " + e.getMessage() +
-          " . Exiting...");
-      return IO_EXCEPTION;
+      System.out.println(e + ".  Exiting ...");
+      return ReturnStatus.IO_EXCEPTION;
+    } catch (InterruptedException e) {
+      System.out.println(e + ".  Exiting ...");
+      return ReturnStatus.INTERRUPTED;
     } finally {
       // shutdown thread pools
       dispatcherExecutor.shutdownNow();
       moverExecutor.shutdownNow();
-
-      shouldRun = false;
-      try {
-        if (keyupdaterthread != null) keyupdaterthread.interrupt();
-      } catch (Exception e) {
-        LOG.warn("Exception shutting down access key updater thread", e);
-      }
-      // close the output file
-      IOUtils.closeStream(out); 
-      if (fs != null) {
-        try {
-          fs.delete(BALANCER_ID_PATH, true);
-        } catch(IOException ignored) {
-        }
-      }
-      System.out.println("Balancing took " + 
-          time2Str(Util.now()-startTime));
     }
   }
 
-  private Path BALANCER_ID_PATH = new Path("/system/balancer.id");
-  /* The idea for making sure that there is no more than one balancer
-   * running in an HDFS is to create a file in the HDFS, writes the IP address
-   * of the machine on which the balancer is running to the file, but did not
-   * close the file until the balancer exits. 
-   * This prevents the second balancer from running because it can not
-   * creates the file while the first one is running.
-   * 
-   * This method checks if there is any running balancer and 
-   * if no, mark yes if no.
-   * Note that this is an atomic operation.
-   * 
-   * Return null if there is a running balancer; otherwise the output stream
-   * to the newly created file.
+  /**
+   * Balance all namenodes.
+   * For each iteration,
+   * for each namenode,
+   * execute a {@link Balancer} to work through all datanodes once.  
    */
-  private OutputStream checkAndMarkRunningBalancer() throws IOException {
+  static int run(List<InetSocketAddress> namenodes, final Parameters p,
+      Configuration conf) throws IOException, InterruptedException {
+    final long sleeptime = 2000*conf.getLong(
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+    LOG.info("namenodes = " + namenodes);
+    LOG.info("p         = " + p);
+    
+    final Formatter formatter = new Formatter(System.out);
+    System.out.println("Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved");
+    
+    final List<NameNodeConnector> connectors
+        = new ArrayList<NameNodeConnector>(namenodes.size());
     try {
-      DataOutputStream out = fs.create(BALANCER_ID_PATH);
-      out. writeBytes(InetAddress.getLocalHost().getHostName());
-      out.flush();
-      return out;
-    } catch(RemoteException e) {
-      if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
-        return null;
-      } else {
-        throw e;
+      for(InetSocketAddress isa : namenodes) {
+        connectors.add(new NameNodeConnector(isa, conf));
+      }
+    
+      boolean done = false;
+      for(int iteration = 0; !done; iteration++) {
+        done = true;
+        Collections.shuffle(connectors);
+        for(NameNodeConnector nnc : connectors) {
+          final Balancer b = new Balancer(nnc, p, conf);
+          final ReturnStatus r = b.run(iteration, formatter);
+          if (r == ReturnStatus.IN_PROGRESS) {
+            done = false;
+          } else if (r != ReturnStatus.SUCCESS) {
+            //must be an error statue, return.
+            return r.code;
+          }
+        }
+
+        if (!done) {
+          Thread.sleep(sleeptime);
+        }
+      }
+    } finally {
+      for(NameNodeConnector nnc : connectors) {
+        nnc.close();
       }
     }
+    return ReturnStatus.SUCCESS.code;
   }
-  
+
   /* Given elaspedTime in ms, return a printable string */
   private static String time2Str(long elapsedTime) {
     String unit;
@@ -1635,15 +1442,116 @@ public class Balancer implements Tool {
     return time+" "+unit;
   }
 
-  /** return this balancer's configuration */
-  public Configuration getConf() {
-    return conf;
+  static class Parameters {
+    static final Parameters DEFALUT = new Parameters(
+        BalancingPolicy.Node.INSTANCE, 10.0);
+
+    final BalancingPolicy policy;
+    final double threshold;
+
+    Parameters(BalancingPolicy policy, double threshold) {
+      this.policy = policy;
+      this.threshold = threshold;
+    }
+
+    @Override
+    public String toString() {
+      return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
+          + "[" + policy + ", threshold=" + threshold + "]";
+    }
   }
 
-  /** set this balancer's configuration */
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-    movedBlocks.setWinWidth(conf);
+  static class Cli extends Configured implements Tool {
+    /** Parse arguments and then run Balancer */
+    @Override
+    public int run(String[] args) {
+      final long startTime = Util.now();
+      final Configuration conf = getConf();
+      WIN_WIDTH = conf.getLong(
+          DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 
+          DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
+
+      try {
+        checkReplicationPolicyCompatibility(conf);
+
+        final List<InetSocketAddress> namenodes = DFSUtil.getNNServiceRpcAddresses(conf);
+        return Balancer.run(namenodes, parse(args), conf);
+      } catch (IOException e) {
+        System.out.println(e + ".  Exiting ...");
+        return ReturnStatus.IO_EXCEPTION.code;
+      } catch (InterruptedException e) {
+        System.out.println(e + ".  Exiting ...");
+        return ReturnStatus.INTERRUPTED.code;
+      } finally {
+        System.out.println("Balancing took " + time2Str(Util.now()-startTime));
+      }
+    }
+
+    /** parse command line arguments */
+    static Parameters parse(String[] args) {
+      BalancingPolicy policy = Parameters.DEFALUT.policy;
+      double threshold = Parameters.DEFALUT.threshold;
+
+      if (args != null) {
+        try {
+          for(int i = 0; i < args.length; i++) {
+            if ("-threshold".equalsIgnoreCase(args[i])) {
+              i++;
+              try {
+                threshold = Double.parseDouble(args[i]);
+                if (threshold < 0 || threshold > 100) {
+                  throw new NumberFormatException(
+                      "Number out of range: threshold = " + threshold);
+                }
+                LOG.info( "Using a threshold of " + threshold );
+              } catch(NumberFormatException e) {
+                System.err.println(
+                    "Expecting a number in the range of [0.0, 100.0]: "
+                    + args[i]);
+                throw e;
+              }
+            } else if ("-policy".equalsIgnoreCase(args[i])) {
+              i++;
+              try {
+                policy = BalancingPolicy.parse(args[i]);
+              } catch(IllegalArgumentException e) {
+                System.err.println("Illegal policy name: " + args[i]);
+                throw e;
+              }
+            } else {
+              throw new IllegalArgumentException("args = "
+                  + Arrays.toString(args));
+            }
+          }
+        } catch(RuntimeException e) {
+          printUsage();
+          throw e;
+        }
+      }
+      
+      return new Parameters(policy, threshold);
+    }
+
+    private static void printUsage() {
+      System.out.println("Usage: java " + Balancer.class.getSimpleName());
+      System.out.println("    [-policy <policy>]\tthe balancing policy: "
+          + BalancingPolicy.Node.INSTANCE.getName() + " or " 
+          + BalancingPolicy.Pool.INSTANCE.getName());
+      System.out.println(
+          "    [-threshold <threshold>]\tPercentage of disk capacity");
+    }
   }
 
+  /**
+   * Run a balancer
+   * @param args Command line arguments
+   */
+  public static void main(String[] args) {
+    try {
+      System.exit(ToolRunner.run(null, new Cli(), args));
+    } catch (Throwable e) {
+      LOG.error(StringUtils.stringifyException(e));
+      System.exit(-1);
+    }
+  }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Fri Apr 29 18:16:32 2011
@@ -41,6 +41,8 @@ public interface HdfsConstants {
   /** Startup options */
   static public enum StartupOption{
     FORMAT  ("-format"),
+    CLUSTERID ("-clusterid"),
+    GENCLUSTERID ("-genclusterid"),
     REGULAR ("-regular"),
     BACKUP  ("-backup"),
     CHECKPOINT("-checkpoint"),
@@ -50,6 +52,10 @@ public interface HdfsConstants {
     IMPORT  ("-importCheckpoint");
     
     private String name = null;
+    
+    // Used only with format and upgrade options
+    private String clusterId = null;
+    
     private StartupOption(String arg) {this.name = arg;}
     public String getName() {return name;}
     public NamenodeRole toNodeRole() {
@@ -62,7 +68,14 @@ public interface HdfsConstants {
         return NamenodeRole.ACTIVE;
       }
     }
-
+    
+    public void setClusterId(String cid) {
+      clusterId = cid;
+    }
+    
+    public String getClusterId() {
+      return clusterId;
+    }
   }
 
   // Timeouts for communicating with DataNode for streaming writes/reads

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Fri Apr 29 18:16:32 2011
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.Random;
 import java.util.TreeSet;
 
+import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.jsp.JspWriter;
 
@@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.BlockReade
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -66,6 +68,7 @@ public class JspHelper {
   public static final String CURRENT_CONF = "current.conf";
   final static public String WEB_UGI_PROPERTY_NAME = "dfs.web.ugi";
   public static final String DELEGATION_PARAMETER_NAME = "delegation";
+  public static final String NAMENODE_ADDRESS = "nnaddr";
   static final String SET_DELEGATION = "&" + DELEGATION_PARAMETER_NAME +
                                               "=";
   private static final Log LOG = LogFactory.getLog(JspHelper.class);
@@ -181,7 +184,7 @@ public class JspHelper {
     return chosenNode;
   }
 
-  public static void streamBlockInAscii(InetSocketAddress addr, 
+  public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
       long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
       JspWriter out, Configuration conf) throws IOException {
@@ -193,9 +196,9 @@ public class JspHelper {
       long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);     
       
       // Use the block name for file name. 
-      String file = BlockReader.getFileName(addr, blockId);
+      String file = BlockReader.getFileName(addr, poolId, blockId);
       BlockReader blockReader = BlockReader.newBlockReader(s, file,
-        new Block(blockId, 0, genStamp), blockToken,
+        new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
         offsetIntoBlock, amtToRead, conf.getInt("io.file.buffer.size", 4096));
         
     byte[] buf = new byte[(int)amtToRead];
@@ -359,14 +362,16 @@ public class JspHelper {
 
   public static void printPathWithLinks(String dir, JspWriter out, 
                                         int namenodeInfoPort,
-                                        String tokenString
+                                        String tokenString,
+                                        String nnAddress
                                         ) throws IOException {
     try {
       String[] parts = dir.split(Path.SEPARATOR);
       StringBuilder tempPath = new StringBuilder(dir.length());
       out.print("<a href=\"browseDirectory.jsp" + "?dir="+ Path.SEPARATOR
           + "&namenodeInfoPort=" + namenodeInfoPort
-          + getDelegationTokenUrlParam(tokenString) + "\">" + Path.SEPARATOR
+          + getDelegationTokenUrlParam(tokenString) 
+          + getUrlParam(NAMENODE_ADDRESS, nnAddress) + "\">" + Path.SEPARATOR
           + "</a>");
       tempPath.append(Path.SEPARATOR);
       for (int i = 0; i < parts.length-1; i++) {
@@ -374,7 +379,8 @@ public class JspHelper {
           tempPath.append(parts[i]);
           out.print("<a href=\"browseDirectory.jsp" + "?dir="
               + tempPath.toString() + "&namenodeInfoPort=" + namenodeInfoPort
-              + getDelegationTokenUrlParam(tokenString));
+              + getDelegationTokenUrlParam(tokenString)
+              + getUrlParam(NAMENODE_ADDRESS, nnAddress));
           out.print("\">" + parts[i] + "</a>" + Path.SEPARATOR);
           tempPath.append(Path.SEPARATOR);
         }
@@ -391,7 +397,8 @@ public class JspHelper {
   public static void printGotoForm(JspWriter out,
                                    int namenodeInfoPort,
                                    String tokenString,
-                                   String file) throws IOException {
+                                   String file,
+                                   String nnAddress) throws IOException {
     out.print("<form action=\"browseDirectory.jsp\" method=\"get\" name=\"goto\">");
     out.print("Goto : ");
     out.print("<input name=\"dir\" type=\"text\" width=\"50\" id\"dir\" value=\""+ file+"\">");
@@ -402,6 +409,8 @@ public class JspHelper {
       out.print("<input name=\"" + DELEGATION_PARAMETER_NAME
           + "\" type=\"hidden\" value=\"" + tokenString + "\">");
     }
+    out.print("<input name=\""+ NAMENODE_ADDRESS +"\" type=\"hidden\" "
+        + "value=\"" + nnAddress  + "\">");
     out.print("</form>");
   }
   
@@ -475,16 +484,43 @@ public class JspHelper {
     return UserGroupInformation.createRemoteUser(strings[0]);
   }
 
+  private static String getNNServiceAddress(ServletContext context,
+      HttpServletRequest request) {
+    String namenodeAddressInUrl = request.getParameter(NAMENODE_ADDRESS);
+    InetSocketAddress namenodeAddress = null;
+    if (namenodeAddressInUrl != null) {
+      namenodeAddress = DFSUtil.getSocketAddress(namenodeAddressInUrl);
+    } else if (context != null) {
+      namenodeAddress = (InetSocketAddress) context
+          .getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
+    }
+    if (namenodeAddress != null) {
+      return (namenodeAddress.getAddress().getHostAddress() + ":" 
+          + namenodeAddress.getPort());
+    }
+    return null;
+  }
+
+  /**
+   * See
+   * {@link JspHelper#getUGI(ServletContext, HttpServletRequest, Configuration)}
+   * , ServletContext is passed as null.
+   */
+  public static UserGroupInformation getUGI(HttpServletRequest request,
+      Configuration conf) throws IOException {
+    return getUGI(null, request, conf);
+  }
+  
   /**
    * Get {@link UserGroupInformation} and possibly the delegation token out of
    * the request.
+   * @param context the ServletContext that is serving this request.
    * @param request the http request
    * @return a new user from the request
    * @throws AccessControlException if the request has no token
    */
-  public static UserGroupInformation getUGI(HttpServletRequest request,
-                                            Configuration conf
-                                           ) throws IOException {
+  public static UserGroupInformation getUGI(ServletContext context,
+      HttpServletRequest request, Configuration conf) throws IOException {
     UserGroupInformation ugi = null;
     if(UserGroupInformation.isSecurityEnabled()) {
       String user = request.getRemoteUser();
@@ -493,12 +529,12 @@ public class JspHelper {
         Token<DelegationTokenIdentifier> token = 
           new Token<DelegationTokenIdentifier>();
         token.decodeFromUrlString(tokenString);
-        InetSocketAddress serviceAddr = NameNode.getAddress(conf);
-        LOG.info("Setting service in token: "
-            + new Text(serviceAddr.getAddress().getHostAddress() + ":"
-                + serviceAddr.getPort()));
-        token.setService(new Text(serviceAddr.getAddress().getHostAddress()
-            + ":" + serviceAddr.getPort()));
+        String serviceAddress = getNNServiceAddress(context, request);
+        if (serviceAddress != null) {
+          LOG.info("Setting service in token: "
+              + new Text(serviceAddress));
+          token.setService(new Text(serviceAddress));
+        }
         ByteArrayInputStream buf = new ByteArrayInputStream(token
             .getIdentifier());
         DataInputStream in = new DataInputStream(buf);
@@ -549,5 +585,13 @@ public class JspHelper {
     }
   }
 
-
+  /**
+   * Returns the url parameter for the given bpid string.
+   * @param name parameter name
+   * @param val parameter value
+   * @return url parameter
+   */
+  public static String getUrlParam(String name, String val) {
+    return val == null ? "" : "&" + name + "=" + val;
+  }
 }