You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2011/04/29 20:16:38 UTC
svn commit: r1097905 [2/14] - in /hadoop/hdfs/trunk: ./ bin/
src/c++/libhdfs/ src/contrib/hdfsproxy/
src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/ja...
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Apr 29 18:16:32 2011
@@ -67,9 +67,9 @@ public interface ClientProtocol extends
* Compared to the previous version the following changes have been introduced:
* (Only the latest change is reflected.
* The log of historical changes can be retrieved from the svn).
- * 66: Add getAdditionalDatanode(..)
+ * 67: Add block pool ID to Block
*/
- public static final long versionID = 66L;
+ public static final long versionID = 67L;
///////////////////////////////////////
// File contents
@@ -126,7 +126,7 @@ public interface ClientProtocol extends
* <p>
* Blocks have a maximum size. Clients that intend to create
* multi-block files must also use
- * {@link #addBlock(String, String, Block, DatanodeInfo[])}
+ * {@link #addBlock(String, String, ExtendedBlock, DatanodeInfo[])}
*
* @param src path of the file being created.
* @param masked masked permission.
@@ -259,7 +259,7 @@ public interface ClientProtocol extends
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
* @throws IOException If an I/O error occurred
*/
- public void abandonBlock(Block b, String src, String holder)
+ public void abandonBlock(ExtendedBlock b, String src, String holder)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException;
@@ -293,7 +293,7 @@ public interface ClientProtocol extends
* @throws IOException If an I/O error occurred
*/
public LocatedBlock addBlock(String src, String clientName,
- @Nullable Block previous, @Nullable DatanodeInfo[] excludeNodes)
+ @Nullable ExtendedBlock previous, @Nullable DatanodeInfo[] excludeNodes)
throws AccessControlException, FileNotFoundException,
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
IOException;
@@ -316,7 +316,7 @@ public interface ClientProtocol extends
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
* @throws IOException If an I/O error occurred
*/
- public LocatedBlock getAdditionalDatanode(final String src, final Block blk,
+ public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
final int numAdditionalNodes, final String clientName
) throws AccessControlException, FileNotFoundException,
@@ -329,7 +329,7 @@ public interface ClientProtocol extends
* The function returns whether the file has been closed successfully.
* If the function returns false, the caller should try again.
*
- * close() also commits the last block of the file by reporting
+ * close() also commits the last block of file by reporting
* to the name-node the actual generation stamp and the length
* of the block that the client has transmitted to data-nodes.
*
@@ -344,7 +344,7 @@ public interface ClientProtocol extends
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
* @throws IOException If an I/O error occurred
*/
- public boolean complete(String src, String clientName, Block last)
+ public boolean complete(String src, String clientName, ExtendedBlock last)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException;
@@ -554,6 +554,8 @@ public interface ClientProtocol extends
* <li> [3] contains number of under replicated blocks in the system.</li>
* <li> [4] contains number of blocks with a corrupt replica. </li>
* <li> [5] contains number of blocks without any good replicas left. </li>
+ * <li> [5] contains number of blocks without any good replicas left. </li>
+ * <li> [6] contains the total used space of the block pool. </li>
* </ul>
* Use public constants like {@link #GET_STATS_CAPACITY_IDX} in place of
* actual numbers to index into the array.
@@ -854,8 +856,8 @@ public interface ClientProtocol extends
* @return a located block with a new generation stamp and an access token
* @throws IOException if any error occurs
*/
- public LocatedBlock updateBlockForPipeline(Block block, String clientName)
- throws IOException;
+ public LocatedBlock updateBlockForPipeline(ExtendedBlock block,
+ String clientName) throws IOException;
/**
* Update a pipeline for a block under construction
@@ -866,8 +868,8 @@ public interface ClientProtocol extends
* @param newNodes datanodes in the pipeline
* @throws IOException if any error occurs
*/
- public void updatePipeline(String clientName, Block oldBlock,
- Block newBlock, DatanodeID[] newNodes)
+ public void updatePipeline(String clientName, ExtendedBlock oldBlock,
+ ExtendedBlock newBlock, DatanodeID[] newNodes)
throws IOException;
/**
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Fri Apr 29 18:16:32 2011
@@ -51,10 +51,11 @@ public interface DataTransferProtocol {
* when protocol changes. It is not very obvious.
*/
/*
- * Version 22:
- * Add a new feature to replace datanode on failure.
+ * Version 23:
+ * Changed the protocol methods to use ExtendedBlock instead
+ * of Block.
*/
- public static final int DATA_TRANSFER_VERSION = 22;
+ public static final int DATA_TRANSFER_VERSION = 23;
/** Operation */
public enum Op {
@@ -238,7 +239,7 @@ public interface DataTransferProtocol {
}
/** Send OP_READ_BLOCK */
- public static void opReadBlock(DataOutputStream out, Block blk,
+ public static void opReadBlock(DataOutputStream out, ExtendedBlock blk,
long blockOffset, long blockLen, String clientName,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
@@ -253,7 +254,7 @@ public interface DataTransferProtocol {
}
/** Send OP_WRITE_BLOCK */
- public static void opWriteBlock(DataOutputStream out, Block blk,
+ public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk,
int pipelineSize, BlockConstructionStage stage, long newGs,
long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
@@ -277,7 +278,7 @@ public interface DataTransferProtocol {
}
/** Send {@link Op#TRANSFER_BLOCK} */
- public static void opTransferBlock(DataOutputStream out, Block blk,
+ public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk,
String client, DatanodeInfo[] targets,
Token<BlockTokenIdentifier> blockToken) throws IOException {
op(out, Op.TRANSFER_BLOCK);
@@ -291,7 +292,7 @@ public interface DataTransferProtocol {
/** Send OP_REPLACE_BLOCK */
public static void opReplaceBlock(DataOutputStream out,
- Block blk, String storageId, DatanodeInfo src,
+ ExtendedBlock blk, String storageId, DatanodeInfo src,
Token<BlockTokenIdentifier> blockToken) throws IOException {
op(out, Op.REPLACE_BLOCK);
@@ -303,7 +304,7 @@ public interface DataTransferProtocol {
}
/** Send OP_COPY_BLOCK */
- public static void opCopyBlock(DataOutputStream out, Block blk,
+ public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
op(out, Op.COPY_BLOCK);
@@ -314,7 +315,7 @@ public interface DataTransferProtocol {
}
/** Send OP_BLOCK_CHECKSUM */
- public static void opBlockChecksum(DataOutputStream out, Block blk,
+ public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
op(out, Op.BLOCK_CHECKSUM);
@@ -377,7 +378,7 @@ public interface DataTransferProtocol {
/** Receive OP_READ_BLOCK */
private void opReadBlock(DataInputStream in) throws IOException {
- final Block blk = new Block();
+ final ExtendedBlock blk = new ExtendedBlock();
blk.readId(in);
final long offset = in.readLong();
final long length = in.readLong();
@@ -390,13 +391,13 @@ public interface DataTransferProtocol {
/**
* Abstract OP_READ_BLOCK method. Read a block.
*/
- protected abstract void opReadBlock(DataInputStream in, Block blk,
+ protected abstract void opReadBlock(DataInputStream in, ExtendedBlock blk,
long offset, long length, String client,
Token<BlockTokenIdentifier> blockToken) throws IOException;
/** Receive OP_WRITE_BLOCK */
private void opWriteBlock(DataInputStream in) throws IOException {
- final Block blk = new Block();
+ final ExtendedBlock blk = new ExtendedBlock();
blk.readId(in);
final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
final BlockConstructionStage stage =
@@ -418,7 +419,7 @@ public interface DataTransferProtocol {
* Abstract OP_WRITE_BLOCK method.
* Write a block.
*/
- protected abstract void opWriteBlock(DataInputStream in, Block blk,
+ protected abstract void opWriteBlock(DataInputStream in, ExtendedBlock blk,
int pipelineSize, BlockConstructionStage stage, long newGs,
long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
@@ -426,7 +427,7 @@ public interface DataTransferProtocol {
/** Receive {@link Op#TRANSFER_BLOCK} */
private void opTransferBlock(DataInputStream in) throws IOException {
- final Block blk = new Block();
+ final ExtendedBlock blk = new ExtendedBlock();
blk.readId(in);
final String client = Text.readString(in);
final DatanodeInfo targets[] = readDatanodeInfos(in);
@@ -440,14 +441,14 @@ public interface DataTransferProtocol {
* For {@link BlockConstructionStage#TRANSFER_RBW}
* or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
*/
- protected abstract void opTransferBlock(DataInputStream in, Block blk,
+ protected abstract void opTransferBlock(DataInputStream in, ExtendedBlock blk,
String client, DatanodeInfo[] targets,
Token<BlockTokenIdentifier> blockToken)
throws IOException;
/** Receive OP_REPLACE_BLOCK */
private void opReplaceBlock(DataInputStream in) throws IOException {
- final Block blk = new Block();
+ final ExtendedBlock blk = new ExtendedBlock();
blk.readId(in);
final String sourceId = Text.readString(in); // read del hint
final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
@@ -461,12 +462,12 @@ public interface DataTransferProtocol {
* It is used for balancing purpose; send to a destination
*/
protected abstract void opReplaceBlock(DataInputStream in,
- Block blk, String sourceId, DatanodeInfo src,
+ ExtendedBlock blk, String sourceId, DatanodeInfo src,
Token<BlockTokenIdentifier> blockToken) throws IOException;
/** Receive OP_COPY_BLOCK */
private void opCopyBlock(DataInputStream in) throws IOException {
- final Block blk = new Block();
+ final ExtendedBlock blk = new ExtendedBlock();
blk.readId(in);
final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
@@ -477,13 +478,13 @@ public interface DataTransferProtocol {
* Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
* a proxy source.
*/
- protected abstract void opCopyBlock(DataInputStream in, Block blk,
+ protected abstract void opCopyBlock(DataInputStream in, ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken)
throws IOException;
/** Receive OP_BLOCK_CHECKSUM */
private void opBlockChecksum(DataInputStream in) throws IOException {
- final Block blk = new Block();
+ final ExtendedBlock blk = new ExtendedBlock();
blk.readId(in);
final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
@@ -495,7 +496,7 @@ public interface DataTransferProtocol {
* Get the checksum of a block
*/
protected abstract void opBlockChecksum(DataInputStream in,
- Block blk, Token<BlockTokenIdentifier> blockToken)
+ ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
throws IOException;
/** Read an array of {@link DatanodeInfo} */
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Fri Apr 29 18:16:32 2011
@@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.io.WritableComparable;
@@ -76,6 +77,18 @@ public class DatanodeID implements Writa
this.ipcPort = ipcPort;
}
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setInfoPort(int infoPort) {
+ this.infoPort = infoPort;
+ }
+
+ public void setIpcPort(int ipcPort) {
+ this.ipcPort = ipcPort;
+ }
+
/**
* @return hostname:portNumber.
*/
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Fri Apr 29 18:16:32 2011
@@ -24,6 +24,7 @@ import java.util.Date;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
@@ -48,6 +49,7 @@ public class DatanodeInfo extends Datano
protected long capacity;
protected long dfsUsed;
protected long remaining;
+ protected long blockPoolUsed;
protected long lastUpdate;
protected int xceiverCount;
protected String location = NetworkTopology.DEFAULT_RACK;
@@ -89,6 +91,7 @@ public class DatanodeInfo extends Datano
this.capacity = from.getCapacity();
this.dfsUsed = from.getDfsUsed();
this.remaining = from.getRemaining();
+ this.blockPoolUsed = from.getBlockPoolUsed();
this.lastUpdate = from.getLastUpdate();
this.xceiverCount = from.getXceiverCount();
this.location = from.getNetworkLocation();
@@ -101,6 +104,7 @@ public class DatanodeInfo extends Datano
this.capacity = 0L;
this.dfsUsed = 0L;
this.remaining = 0L;
+ this.blockPoolUsed = 0L;
this.lastUpdate = 0L;
this.xceiverCount = 0;
this.adminState = null;
@@ -118,6 +122,9 @@ public class DatanodeInfo extends Datano
/** The used space by the data node. */
public long getDfsUsed() { return dfsUsed; }
+ /** The used space by the block pool on data node. */
+ public long getBlockPoolUsed() { return blockPoolUsed; }
+
/** The used space by the data node. */
public long getNonDfsUsed() {
long nonDFSUsed = capacity - dfsUsed - remaining;
@@ -126,23 +133,20 @@ public class DatanodeInfo extends Datano
/** The used space by the data node as percentage of present capacity */
public float getDfsUsedPercent() {
- if (capacity <= 0) {
- return 100;
- }
-
- return ((float)dfsUsed * 100.0f)/(float)capacity;
+ return DFSUtil.getPercentUsed(dfsUsed, capacity);
}
/** The raw free space. */
public long getRemaining() { return remaining; }
+ /** Used space by the block pool as percentage of present capacity */
+ public float getBlockPoolUsedPercent() {
+ return DFSUtil.getPercentUsed(blockPoolUsed, capacity);
+ }
+
/** The remaining space as percentage of configured capacity. */
public float getRemainingPercent() {
- if (capacity <= 0) {
- return 0;
- }
-
- return ((float)remaining * 100.0f)/(float)capacity;
+ return DFSUtil.getPercentRemaining(remaining, capacity);
}
/** The time when this information was accurate. */
@@ -161,6 +165,11 @@ public class DatanodeInfo extends Datano
this.remaining = remaining;
}
+ /** Sets block pool used space */
+ public void setBlockPoolUsed(long bpUsed) {
+ this.blockPoolUsed = bpUsed;
+ }
+
/** Sets time when this information was accurate. */
public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
@@ -342,6 +351,7 @@ public class DatanodeInfo extends Datano
out.writeLong(capacity);
out.writeLong(dfsUsed);
out.writeLong(remaining);
+ out.writeLong(blockPoolUsed);
out.writeLong(lastUpdate);
out.writeInt(xceiverCount);
Text.writeString(out, location);
@@ -359,6 +369,7 @@ public class DatanodeInfo extends Datano
this.capacity = in.readLong();
this.dfsUsed = in.readLong();
this.remaining = in.readLong();
+ this.blockPoolUsed = in.readLong();
this.lastUpdate = in.readLong();
this.xceiverCount = in.readInt();
this.location = Text.readString(in);
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Fri Apr 29 18:16:32 2011
@@ -88,7 +88,7 @@ public interface FSConstants {
// Version is reflected in the data storage file.
// Versions are negative.
// Decrement LAYOUT_VERSION to define a new version.
- public static final int LAYOUT_VERSION = -34;
+ public static final int LAYOUT_VERSION = -35;
// Current version:
- // -31, -32 and -33 are reserved for 0.20.203, 0.20.204 and 0.22.
+ // -35: Adding support for block pools and multiple namenodes
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Fri Apr 29 18:16:32 2011
@@ -42,7 +42,7 @@ public class LocatedBlock implements Wri
});
}
- private Block b;
+ private ExtendedBlock b;
private long offset; // offset of the first byte of the block in the file
private DatanodeInfo[] locs;
// corrupt flag is true if all of the replicas of a block are corrupt.
@@ -51,27 +51,23 @@ public class LocatedBlock implements Wri
private boolean corrupt;
private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
- /**
- */
public LocatedBlock() {
- this(new Block(), new DatanodeInfo[0], 0L, false);
+ this(new ExtendedBlock(), new DatanodeInfo[0], 0L, false);
}
- /**
- */
- public LocatedBlock(Block b, DatanodeInfo[] locs) {
+ public LocatedBlock(String bpid, Block b, DatanodeInfo[] locs) {
+ this(new ExtendedBlock(bpid, b), locs, -1, false); // startOffset is unknown
+ }
+
+ public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
this(b, locs, -1, false); // startOffset is unknown
}
- /**
- */
- public LocatedBlock(Block b, DatanodeInfo[] locs, long startOffset) {
+ public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset) {
this(b, locs, startOffset, false);
}
- /**
- */
- public LocatedBlock(Block b, DatanodeInfo[] locs, long startOffset,
+ public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset,
boolean corrupt) {
this.b = b;
this.offset = startOffset;
@@ -93,7 +89,7 @@ public class LocatedBlock implements Wri
/**
*/
- public Block getBlock() {
+ public ExtendedBlock getBlock() {
return b;
}
@@ -141,7 +137,7 @@ public class LocatedBlock implements Wri
blockToken.readFields(in);
this.corrupt = in.readBoolean();
offset = in.readLong();
- this.b = new Block();
+ this.b = new ExtendedBlock();
b.readFields(in);
int count = in.readInt();
this.locs = new DatanodeInfo[count];
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java Fri Apr 29 18:16:32 2011
@@ -37,19 +37,21 @@ public class BlockTokenIdentifier extend
private long expiryDate;
private int keyId;
private String userId;
+ private String blockPoolId;
private long blockId;
private EnumSet<AccessMode> modes;
private byte [] cache;
public BlockTokenIdentifier() {
- this(null, 0, EnumSet.noneOf(AccessMode.class));
+ this(null, null, 0, EnumSet.noneOf(AccessMode.class));
}
- public BlockTokenIdentifier(String userId, long blockId,
+ public BlockTokenIdentifier(String userId, String bpid, long blockId,
EnumSet<AccessMode> modes) {
this.cache = null;
this.userId = userId;
+ this.blockPoolId = bpid;
this.blockId = blockId;
this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
}
@@ -62,7 +64,8 @@ public class BlockTokenIdentifier extend
@Override
public UserGroupInformation getUser() {
if (userId == null || "".equals(userId)) {
- return UserGroupInformation.createRemoteUser(Long.toString(blockId));
+ String user = blockPoolId + ":" + Long.toString(blockId);
+ return UserGroupInformation.createRemoteUser(user);
}
return UserGroupInformation.createRemoteUser(userId);
}
@@ -89,6 +92,10 @@ public class BlockTokenIdentifier extend
return userId;
}
+ public String getBlockPoolId() {
+ return blockPoolId;
+ }
+
public long getBlockId() {
return blockId;
}
@@ -101,6 +108,7 @@ public class BlockTokenIdentifier extend
public String toString() {
return "block_token_identifier (expiryDate=" + this.getExpiryDate()
+ ", keyId=" + this.getKeyId() + ", userId=" + this.getUserId()
+ + ", blockPoolId=" + this.getBlockPoolId()
+ ", blockId=" + this.getBlockId() + ", access modes="
+ this.getAccessModes() + ")";
}
@@ -117,7 +125,9 @@ public class BlockTokenIdentifier extend
if (obj instanceof BlockTokenIdentifier) {
BlockTokenIdentifier that = (BlockTokenIdentifier) obj;
return this.expiryDate == that.expiryDate && this.keyId == that.keyId
- && isEqual(this.userId, that.userId) && this.blockId == that.blockId
+ && isEqual(this.userId, that.userId)
+ && isEqual(this.blockPoolId, that.blockPoolId)
+ && this.blockId == that.blockId
&& isEqual(this.modes, that.modes);
}
return false;
@@ -126,7 +136,8 @@ public class BlockTokenIdentifier extend
/** {@inheritDoc} */
public int hashCode() {
return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()
- ^ (userId == null ? 0 : userId.hashCode());
+ ^ (userId == null ? 0 : userId.hashCode())
+ ^ (blockPoolId == null ? 0 : blockPoolId.hashCode());
}
public void readFields(DataInput in) throws IOException {
@@ -134,6 +145,7 @@ public class BlockTokenIdentifier extend
expiryDate = WritableUtils.readVLong(in);
keyId = WritableUtils.readVInt(in);
userId = WritableUtils.readString(in);
+ blockPoolId = WritableUtils.readString(in);
blockId = WritableUtils.readVLong(in);
int length = WritableUtils.readVInt(in);
for (int i = 0; i < length; i++) {
@@ -145,6 +157,7 @@ public class BlockTokenIdentifier extend
WritableUtils.writeVLong(out, expiryDate);
WritableUtils.writeVInt(out, keyId);
WritableUtils.writeString(out, userId);
+ WritableUtils.writeString(out, blockPoolId);
WritableUtils.writeVLong(out, blockId);
WritableUtils.writeVInt(out, modes.size());
for (AccessMode aMode : modes) {
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java Fri Apr 29 18:16:32 2011
@@ -31,7 +31,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
@@ -174,7 +174,7 @@ public class BlockTokenSecretManager ext
}
/** Generate an block token for current user */
- public Token<BlockTokenIdentifier> generateToken(Block block,
+ public Token<BlockTokenIdentifier> generateToken(ExtendedBlock block,
EnumSet<AccessMode> modes) throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String userID = (ugi == null ? null : ugi.getShortUserName());
@@ -182,10 +182,10 @@ public class BlockTokenSecretManager ext
}
/** Generate a block token for a specified user */
- public Token<BlockTokenIdentifier> generateToken(String userId, Block block,
- EnumSet<AccessMode> modes) throws IOException {
+ public Token<BlockTokenIdentifier> generateToken(String userId,
+ ExtendedBlock block, EnumSet<AccessMode> modes) throws IOException {
BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
- .getBlockId(), modes);
+ .getBlockPoolId(), block.getBlockId(), modes);
return new Token<BlockTokenIdentifier>(id, this);
}
@@ -194,8 +194,8 @@ public class BlockTokenSecretManager ext
* method doesn't check if token password is correct. It should be used only
* when token password has already been verified (e.g., in the RPC layer).
*/
- public void checkAccess(BlockTokenIdentifier id, String userId, Block block,
- AccessMode mode) throws InvalidToken {
+ public void checkAccess(BlockTokenIdentifier id, String userId,
+ ExtendedBlock block, AccessMode mode) throws InvalidToken {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking access for user=" + userId + ", block=" + block
+ ", access mode=" + mode + " using " + id.toString());
@@ -204,6 +204,10 @@ public class BlockTokenSecretManager ext
throw new InvalidToken("Block token with " + id.toString()
+ " doesn't belong to user " + userId);
}
+ if (!id.getBlockPoolId().equals(block.getBlockPoolId())) {
+ throw new InvalidToken("Block token with " + id.toString()
+ + " doesn't apply to block " + block);
+ }
if (id.getBlockId() != block.getBlockId()) {
throw new InvalidToken("Block token with " + id.toString()
+ " doesn't apply to block " + block);
@@ -220,7 +224,7 @@ public class BlockTokenSecretManager ext
/** Check if access should be allowed. userID is not checked if null */
public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
- Block block, AccessMode mode) throws InvalidToken {
+ ExtendedBlock block, AccessMode mode) throws InvalidToken {
BlockTokenIdentifier id = new BlockTokenIdentifier();
try {
id.readFields(new DataInputStream(new ByteArrayInputStream(token
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Apr 29 18:16:32 2011
@@ -24,16 +24,14 @@ import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
-import java.util.EnumSet;
import java.util.Formatter;
import java.util.HashMap;
import java.util.HashSet;
@@ -46,45 +44,31 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
-import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -187,27 +171,19 @@ import org.apache.hadoop.util.ToolRunner
*/
@InterfaceAudience.Private
-public class Balancer implements Tool {
- private static final Log LOG =
- LogFactory.getLog(Balancer.class.getName());
+public class Balancer {
+ static final Log LOG = LogFactory.getLog(Balancer.class);
final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
+ private static long WIN_WIDTH = 5400*1000L; // 1.5 hour
/** The maximum number of concurrent blocks moves for
* balancing purpose at a datanode
*/
public static final int MAX_NUM_CONCURRENT_MOVES = 5;
- private Configuration conf;
-
- private double threshold = 10D;
- private NamenodeProtocol namenode;
- private ClientProtocol client;
- private FileSystem fs;
- private boolean isBlockTokenEnabled;
- private boolean shouldRun;
- private long keyUpdaterInterval;
- private BlockTokenSecretManager blockTokenSecretManager;
- private Daemon keyupdaterthread = null; // AccessKeyUpdater thread
+ private final NameNodeConnector nnc;
+ private final BalancingPolicy policy;
+ private final double threshold;
private final static Random rnd = new Random();
// all data node lists
@@ -233,8 +209,6 @@ public class Balancer implements Tool {
private NetworkTopology cluster = new NetworkTopology();
- private double avgUtilization = 0.0D;
-
final static private int MOVER_THREAD_POOL_SIZE = 1000;
final private ExecutorService moverExecutor =
Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
@@ -242,6 +216,7 @@ public class Balancer implements Tool {
final private ExecutorService dispatcherExecutor =
Executors.newFixedThreadPool(DISPATCHER_THREAD_POOL_SIZE);
+
/* This class keeps track of a scheduled block move */
private class PendingBlockMove {
private BalancerBlock block;
@@ -369,14 +344,9 @@ public class Balancer implements Tool {
/* Send a block replace request to the output stream*/
private void sendRequest(DataOutputStream out) throws IOException {
- Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
- if (isBlockTokenEnabled) {
- accessToken = blockTokenSecretManager.generateToken(null, block
- .getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
- BlockTokenSecretManager.AccessMode.COPY));
- }
- DataTransferProtocol.Sender.opReplaceBlock(out,
- block.getBlock(), source.getStorageID(),
+ final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
+ final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
+ DataTransferProtocol.Sender.opReplaceBlock(out, eb, source.getStorageID(),
proxySource.getDatanode(), accessToken);
}
@@ -487,30 +457,33 @@ public class Balancer implements Tool {
}
}
- /* Return the utilization of a datanode */
- static private double getUtilization(DatanodeInfo datanode) {
- return ((double)datanode.getDfsUsed())/datanode.getCapacity()*100;
- }
/* A class that keeps track of a datanode in Balancer */
private static class BalancerDatanode {
final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
- protected DatanodeInfo datanode;
- private double utilization;
- protected long maxSizeToMove;
+ final DatanodeInfo datanode;
+ final double utilization;
+ final long maxSize2Move;
protected long scheduledSize = 0L;
// blocks being moved but not confirmed yet
private List<PendingBlockMove> pendingBlocks =
new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES);
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "[" + getName()
+ + ", utilization=" + utilization + "]";
+ }
+
/* Constructor
* Depending on avgutil & threshold, calculate maximum bytes to move
*/
- private BalancerDatanode(
- DatanodeInfo node, double avgUtil, double threshold) {
+ private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold) {
datanode = node;
- utilization = Balancer.getUtilization(node);
-
+ utilization = policy.getUtilization(node);
+ final double avgUtil = policy.getAvgUtilization();
+ long maxSizeToMove;
+
if (utilization >= avgUtil+threshold
|| utilization <= avgUtil-threshold) {
maxSizeToMove = (long)(threshold*datanode.getCapacity()/100);
@@ -521,7 +494,7 @@ public class Balancer implements Tool {
if (utilization < avgUtil ) {
maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
}
- maxSizeToMove = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+ this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
}
/** Get the datanode */
@@ -541,12 +514,12 @@ public class Balancer implements Tool {
/** Decide if still need to move more bytes */
protected boolean isMoveQuotaFull() {
- return scheduledSize<maxSizeToMove;
+ return scheduledSize<maxSize2Move;
}
/** Return the total number of bytes that need to be moved */
protected long availableSizeToMove() {
- return maxSizeToMove-scheduledSize;
+ return maxSize2Move-scheduledSize;
}
/* increment scheduled size */
@@ -604,8 +577,8 @@ public class Balancer implements Tool {
= new ArrayList<BalancerBlock>();
/* constructor */
- private Source(DatanodeInfo node, double avgUtil, double threshold) {
- super(node, avgUtil, threshold);
+ private Source(DatanodeInfo node, BalancingPolicy policy, double threshold) {
+ super(node, policy, threshold);
}
/** Add a node task */
@@ -626,7 +599,7 @@ public class Balancer implements Tool {
* Return the total size of the received blocks in the number of bytes.
*/
private long getBlockList() throws IOException {
- BlockWithLocations[] newBlocks = namenode.getBlocks(datanode,
+ BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(datanode,
Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks();
long bytesReceived = 0;
for (BlockWithLocations blk : newBlocks) {
@@ -780,160 +753,25 @@ public class Balancer implements Tool {
/* Check that this Balancer is compatible with the Block Placement Policy
* used by the Namenode.
*/
- private void checkReplicationPolicyCompatibility(Configuration conf) throws UnsupportedActionException {
+ private static void checkReplicationPolicyCompatibility(Configuration conf
+ ) throws UnsupportedActionException {
if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() !=
BlockPlacementPolicyDefault.class) {
throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
}
}
-
- /** Default constructor */
- Balancer() throws UnsupportedActionException {
- }
-
- /** Construct a balancer from the given configuration */
- Balancer(Configuration conf) throws UnsupportedActionException {
- checkReplicationPolicyCompatibility(conf);
- setConf(conf);
- }
-
- /** Construct a balancer from the given configuration and threshold */
- Balancer(Configuration conf, double threshold) throws UnsupportedActionException {
- checkReplicationPolicyCompatibility(conf);
- setConf(conf);
- this.threshold = threshold;
- }
/**
- * Run a balancer
- * @param args
- */
- public static void main(String[] args) {
- try {
- System.exit( ToolRunner.run(null, new Balancer(), args) );
- } catch (Throwable e) {
- LOG.error(StringUtils.stringifyException(e));
- System.exit(-1);
- }
-
- }
-
- private static void printUsage() {
- System.out.println("Usage: java Balancer");
- System.out.println(" [-threshold <threshold>]\t"
- +"percentage of disk capacity");
- }
-
- /* parse argument to get the threshold */
- private double parseArgs(String[] args) {
- double threshold=0;
- int argsLen = (args == null) ? 0 : args.length;
- if (argsLen==0) {
- threshold = 10;
- } else {
- if (argsLen != 2 || !"-threshold".equalsIgnoreCase(args[0])) {
- printUsage();
- throw new IllegalArgumentException(Arrays.toString(args));
- } else {
- try {
- threshold = Double.parseDouble(args[1]);
- if (threshold < 0 || threshold >100) {
- throw new NumberFormatException();
- }
- LOG.info( "Using a threshold of " + threshold );
- } catch(NumberFormatException e) {
- System.err.println(
- "Expect a double parameter in the range of [0, 100]: "+ args[1]);
- printUsage();
- throw e;
- }
- }
- }
- return threshold;
- }
-
- /* Initialize balancer. It sets the value of the threshold, and
+ * Construct a balancer.
+ * Initialize balancer. It sets the value of the threshold, and
* builds the communication proxies to
* namenode as a client and a secondary namenode and retry proxies
* when connection fails.
*/
- private void init(double threshold) throws IOException {
- this.threshold = threshold;
- this.namenode = createNamenode(conf);
- this.client = DFSClient.createNamenode(conf);
- this.fs = FileSystem.get(conf);
- ExportedBlockKeys keys = namenode.getBlockKeys();
- this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
- if (isBlockTokenEnabled) {
- long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
- long blockTokenLifetime = keys.getTokenLifetime();
- LOG.info("Block token params received from NN: keyUpdateInterval="
- + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
- + blockTokenLifetime / (60 * 1000) + " min(s)");
- this.blockTokenSecretManager = new BlockTokenSecretManager(false,
- blockKeyUpdateInterval, blockTokenLifetime);
- this.blockTokenSecretManager.setKeys(keys);
- /*
- * Balancer should sync its block keys with NN more frequently than NN
- * updates its block keys
- */
- this.keyUpdaterInterval = blockKeyUpdateInterval / 4;
- LOG.info("Balancer will update its block keys every "
- + keyUpdaterInterval / (60 * 1000) + " minute(s)");
- this.keyupdaterthread = new Daemon(new BlockKeyUpdater());
- this.shouldRun = true;
- this.keyupdaterthread.start();
- }
- }
-
- /**
- * Periodically updates access keys.
- */
- class BlockKeyUpdater implements Runnable {
-
- public void run() {
- while (shouldRun) {
- try {
- blockTokenSecretManager.setKeys(namenode.getBlockKeys());
- } catch (Exception e) {
- LOG.error(StringUtils.stringifyException(e));
- }
- try {
- Thread.sleep(keyUpdaterInterval);
- } catch (InterruptedException ie) {
- }
- }
- }
- }
-
- /* Build a NamenodeProtocol connection to the namenode and
- * set up the retry policy */
- private static NamenodeProtocol createNamenode(Configuration conf)
- throws IOException {
- InetSocketAddress nameNodeAddr = NameNode.getServiceAddress(conf, true);
- RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
- 5, 200, TimeUnit.MILLISECONDS);
- Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
- new HashMap<Class<? extends Exception>, RetryPolicy>();
- RetryPolicy methodPolicy = RetryPolicies.retryByException(
- timeoutPolicy, exceptionToPolicyMap);
- Map<String,RetryPolicy> methodNameToPolicyMap =
- new HashMap<String, RetryPolicy>();
- methodNameToPolicyMap.put("getBlocks", methodPolicy);
- methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
-
- UserGroupInformation ugi;
- ugi = UserGroupInformation.getCurrentUser();
-
- return (NamenodeProtocol) RetryProxy.create(
- NamenodeProtocol.class,
- RPC.getProxy(NamenodeProtocol.class,
- NamenodeProtocol.versionID,
- nameNodeAddr,
- ugi,
- conf,
- NetUtils.getDefaultSocketFactory(conf)),
- methodNameToPolicyMap);
+ Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
+ this.threshold = p.threshold;
+ this.policy = p.policy;
+ this.nnc = theblockpool;
}
/* Shuffle datanode array */
@@ -946,13 +784,6 @@ public class Balancer implements Tool {
}
}
- /* get all live datanodes of a cluster and their disk usage
- * decide the number of bytes need to be moved
- */
- private long initNodes() throws IOException {
- return initNodes(client.getDatanodeReport(DatanodeReportType.LIVE));
- }
-
/* Given a data node set, build a network topology and decide
* over-utilized datanodes, above average utilized datanodes,
* below average utilized datanodes, and underutilized datanodes.
@@ -968,15 +799,13 @@ public class Balancer implements Tool {
*/
private long initNodes(DatanodeInfo[] datanodes) {
// compute average utilization
- long totalCapacity=0L, totalUsedSpace=0L;
for (DatanodeInfo datanode : datanodes) {
if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
continue; // ignore decommissioning or decommissioned nodes
}
- totalCapacity += datanode.getCapacity();
- totalUsedSpace += datanode.getDfsUsed();
+ policy.accumulateSpaces(datanode);
}
- this.avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
+ policy.initAvgUtilization();
/*create network topology and all data node lists:
* overloaded, above-average, below-average, and underloaded
@@ -991,19 +820,20 @@ public class Balancer implements Tool {
}
cluster.add(datanode);
BalancerDatanode datanodeS;
- if (getUtilization(datanode) > avgUtilization) {
- datanodeS = new Source(datanode, avgUtilization, threshold);
+ final double avg = policy.getAvgUtilization();
+ if (policy.getUtilization(datanode) > avg) {
+ datanodeS = new Source(datanode, policy, threshold);
if (isAboveAvgUtilized(datanodeS)) {
this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
} else {
assert(isOverUtilized(datanodeS)) :
datanodeS.getName()+ "is not an overUtilized node";
this.overUtilizedDatanodes.add((Source)datanodeS);
- overLoadedBytes += (long)((datanodeS.utilization-avgUtilization
+ overLoadedBytes += (long)((datanodeS.utilization-avg
-threshold)*datanodeS.datanode.getCapacity()/100.0);
}
} else {
- datanodeS = new BalancerDatanode(datanode, avgUtilization, threshold);
+ datanodeS = new BalancerDatanode(datanode, policy, threshold);
if ( isBelowOrEqualAvgUtilized(datanodeS)) {
this.belowAvgUtilizedDatanodes.add(datanodeS);
} else {
@@ -1011,7 +841,7 @@ public class Balancer implements Tool {
+ datanodeS.getName() + ")=" + isUnderUtilized(datanodeS)
+ ", utilization=" + datanodeS.utilization;
this.underUtilizedDatanodes.add(datanodeS);
- underLoadedBytes += (long)((avgUtilization-threshold-
+ underLoadedBytes += (long)((avg-threshold-
datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
}
}
@@ -1019,7 +849,7 @@ public class Balancer implements Tool {
}
//logging
- logImbalancedNodes();
+ logNodes();
assert (this.datanodes.size() ==
overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
@@ -1031,25 +861,20 @@ public class Balancer implements Tool {
}
/* log the over utilized & under utilized nodes */
- private void logImbalancedNodes() {
- StringBuilder msg = new StringBuilder();
- msg.append(overUtilizedDatanodes.size());
- msg.append(" over utilized nodes:");
- for (Source node : overUtilizedDatanodes) {
- msg.append( " " );
- msg.append( node.getName() );
- }
- LOG.info(msg);
- msg = new StringBuilder();
- msg.append(underUtilizedDatanodes.size());
- msg.append(" under utilized nodes: ");
- for (BalancerDatanode node : underUtilizedDatanodes) {
- msg.append( " " );
- msg.append( node.getName() );
- }
- LOG.info(msg);
+ private void logNodes() {
+ logNodes("over-utilized", overUtilizedDatanodes);
+ if (LOG.isTraceEnabled()) {
+ logNodes("above-average", aboveAvgUtilizedDatanodes);
+ logNodes("below-average", belowAvgUtilizedDatanodes);
+ }
+ logNodes("underutilized", underUtilizedDatanodes);
}
-
+
+ private static <T extends BalancerDatanode> void logNodes(
+ String name, Collection<T> nodes) {
+ LOG.info(nodes.size() + " " + name + ": " + nodes);
+ }
+
/* Decide all <source, target> pairs and
* the number of bytes to move from a source to a target
* Maximum bytes to be moved per node is
@@ -1313,7 +1138,6 @@ public class Balancer implements Tool {
*/
private static class MovedBlocks {
private long lastCleanupTime = System.currentTimeMillis();
- private static long winWidth = 5400*1000L; // 1.5 hour
final private static int CUR_WIN = 0;
final private static int OLD_WIN = 1;
final private static int NUM_WINS = 2;
@@ -1326,13 +1150,6 @@ public class Balancer implements Tool {
movedBlocks.add(new HashMap<Block,BalancerBlock>());
}
- /* set the win width */
- private void setWinWidth(Configuration conf) {
- winWidth = conf.getLong(
- DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
- DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
- }
-
/* add a block thus marking a block to be moved */
synchronized private void add(BalancerBlock block) {
movedBlocks.get(CUR_WIN).put(block.getBlock(), block);
@@ -1353,7 +1170,7 @@ public class Balancer implements Tool {
synchronized private void cleanup() {
long curTime = System.currentTimeMillis();
// check if old win is older than winWidth
- if (lastCleanupTime + winWidth <= curTime) {
+ if (lastCleanupTime + WIN_WIDTH <= curTime) {
// purge the old window
movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN));
movedBlocks.set(CUR_WIN, new HashMap<Block, BalancerBlock>());
@@ -1419,7 +1236,7 @@ public class Balancer implements Tool {
this.datanodes.clear();
this.sources.clear();
this.targets.clear();
- this.avgUtilization = 0.0D;
+ this.policy.reset();
cleanGlobalBlockList();
this.movedBlocks.cleanup();
}
@@ -1439,182 +1256,172 @@ public class Balancer implements Tool {
/* Return true if the given datanode is overUtilized */
private boolean isOverUtilized(BalancerDatanode datanode) {
- return datanode.utilization > (avgUtilization+threshold);
+ return datanode.utilization > (policy.getAvgUtilization()+threshold);
}
/* Return true if the given datanode is above average utilized
* but not overUtilized */
private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
- return (datanode.utilization <= (avgUtilization+threshold))
- && (datanode.utilization > avgUtilization);
+ final double avg = policy.getAvgUtilization();
+ return (datanode.utilization <= (avg+threshold))
+ && (datanode.utilization > avg);
}
/* Return true if the given datanode is underUtilized */
private boolean isUnderUtilized(BalancerDatanode datanode) {
- return datanode.utilization < (avgUtilization-threshold);
+ return datanode.utilization < (policy.getAvgUtilization()-threshold);
}
/* Return true if the given datanode is below average utilized
* but not underUtilized */
private boolean isBelowOrEqualAvgUtilized(BalancerDatanode datanode) {
- return (datanode.utilization >= (avgUtilization-threshold))
- && (datanode.utilization <= avgUtilization);
+ final double avg = policy.getAvgUtilization();
+ return (datanode.utilization >= (avg-threshold))
+ && (datanode.utilization <= avg);
}
// Exit status
- final public static int SUCCESS = 1;
- final public static int ALREADY_RUNNING = -1;
- final public static int NO_MOVE_BLOCK = -2;
- final public static int NO_MOVE_PROGRESS = -3;
- final public static int IO_EXCEPTION = -4;
- final public static int ILLEGAL_ARGS = -5;
- /** main method of Balancer
- * @param args arguments to a Balancer
- * @throws Exception exception that occured during datanode balancing
- */
- public int run(String[] args) throws Exception {
- long startTime = Util.now();
- OutputStream out = null;
+ enum ReturnStatus {
+ SUCCESS(1),
+ IN_PROGRESS(0),
+ ALREADY_RUNNING(-1),
+ NO_MOVE_BLOCK(-2),
+ NO_MOVE_PROGRESS(-3),
+ IO_EXCEPTION(-4),
+ ILLEGAL_ARGS(-5),
+ INTERRUPTED(-6);
+
+ final int code;
+
+ ReturnStatus(int code) {
+ this.code = code;
+ }
+ }
+
+ /** Run an iteration for all datanodes. */
+ private ReturnStatus run(int iteration, Formatter formatter) {
try {
- // initialize a balancer
- init(parseArgs(args));
+ /* get all live datanodes of a cluster and their disk usage
+ * decide the number of bytes need to be moved
+ */
+ final long bytesLeftToMove = initNodes(nnc.client.getDatanodeReport(DatanodeReportType.LIVE));
+ if (bytesLeftToMove == 0) {
+ System.out.println("The cluster is balanced. Exiting...");
+ return ReturnStatus.SUCCESS;
+ } else {
+ LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
+ + " to make the cluster balanced." );
+ }
- /* Check if there is another balancer running.
- * Exit if there is another one running.
+ /* Decide all the nodes that will participate in the block move and
+ * the number of bytes that need to be moved from one node to another
+ * in this iteration. Maximum bytes to be moved per node is
+ * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
*/
- out = checkAndMarkRunningBalancer();
- if (out == null) {
- System.out.println("Another balancer is running. Exiting...");
- return ALREADY_RUNNING;
+ final long bytesToMove = chooseNodes();
+ if (bytesToMove == 0) {
+ System.out.println("No block can be moved. Exiting...");
+ return ReturnStatus.NO_MOVE_BLOCK;
+ } else {
+ LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
+ " in this iteration");
}
- Formatter formatter = new Formatter(System.out);
- System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
- int iterations = 0;
- while (true ) {
- /* get all live datanodes of a cluster and their disk usage
- * decide the number of bytes need to be moved
- */
- long bytesLeftToMove = initNodes();
- if (bytesLeftToMove == 0) {
- System.out.println("The cluster is balanced. Exiting...");
- return SUCCESS;
- } else {
- LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
- +" bytes to make the cluster balanced." );
- }
-
- /* Decide all the nodes that will participate in the block move and
- * the number of bytes that need to be moved from one node to another
- * in this iteration. Maximum bytes to be moved per node is
- * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
- */
- long bytesToMove = chooseNodes();
- if (bytesToMove == 0) {
- System.out.println("No block can be moved. Exiting...");
- return NO_MOVE_BLOCK;
- } else {
- LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
- "bytes in this iteration");
- }
-
- formatter.format("%-24s %10d %19s %18s %17s\n",
- DateFormat.getDateTimeInstance().format(new Date()),
- iterations,
- StringUtils.byteDesc(bytesMoved.get()),
- StringUtils.byteDesc(bytesLeftToMove),
- StringUtils.byteDesc(bytesToMove)
- );
-
- /* For each pair of <source, target>, start a thread that repeatedly
- * decide a block to be moved and its proxy source,
- * then initiates the move until all bytes are moved or no more block
- * available to move.
- * Exit no byte has been moved for 5 consecutive iterations.
- */
- if (dispatchBlockMoves() > 0) {
- notChangedIterations = 0;
- } else {
- notChangedIterations++;
- if (notChangedIterations >= 5) {
- System.out.println(
- "No block has been moved for 5 iterations. Exiting...");
- return NO_MOVE_PROGRESS;
- }
- }
-
- // clean all lists
- resetData();
-
- try {
- Thread.sleep(2000*conf.getLong(
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT));
- } catch (InterruptedException ignored) {
+ formatter.format("%-24s %10d %19s %18s %17s\n",
+ DateFormat.getDateTimeInstance().format(new Date()),
+ iteration,
+ StringUtils.byteDesc(bytesMoved.get()),
+ StringUtils.byteDesc(bytesLeftToMove),
+ StringUtils.byteDesc(bytesToMove)
+ );
+
+ /* For each pair of <source, target>, start a thread that repeatedly
+ * decide a block to be moved and its proxy source,
+ * then initiates the move until all bytes are moved or no more block
+ * available to move.
+ * Exit no byte has been moved for 5 consecutive iterations.
+ */
+ if (dispatchBlockMoves() > 0) {
+ notChangedIterations = 0;
+ } else {
+ notChangedIterations++;
+ if (notChangedIterations >= 5) {
+ System.out.println(
+ "No block has been moved for 5 iterations. Exiting...");
+ return ReturnStatus.NO_MOVE_PROGRESS;
}
-
- iterations++;
}
- } catch (IllegalArgumentException ae) {
- return ILLEGAL_ARGS;
+
+ // clean all lists
+ resetData();
+ return ReturnStatus.IN_PROGRESS;
+ } catch (IllegalArgumentException e) {
+ System.out.println(e + ". Exiting ...");
+ return ReturnStatus.ILLEGAL_ARGS;
} catch (IOException e) {
- System.out.println("Received an IO exception: " + e.getMessage() +
- " . Exiting...");
- return IO_EXCEPTION;
+ System.out.println(e + ". Exiting ...");
+ return ReturnStatus.IO_EXCEPTION;
+ } catch (InterruptedException e) {
+ System.out.println(e + ". Exiting ...");
+ return ReturnStatus.INTERRUPTED;
} finally {
// shutdown thread pools
dispatcherExecutor.shutdownNow();
moverExecutor.shutdownNow();
-
- shouldRun = false;
- try {
- if (keyupdaterthread != null) keyupdaterthread.interrupt();
- } catch (Exception e) {
- LOG.warn("Exception shutting down access key updater thread", e);
- }
- // close the output file
- IOUtils.closeStream(out);
- if (fs != null) {
- try {
- fs.delete(BALANCER_ID_PATH, true);
- } catch(IOException ignored) {
- }
- }
- System.out.println("Balancing took " +
- time2Str(Util.now()-startTime));
}
}
- private Path BALANCER_ID_PATH = new Path("/system/balancer.id");
- /* The idea for making sure that there is no more than one balancer
- * running in an HDFS is to create a file in the HDFS, writes the IP address
- * of the machine on which the balancer is running to the file, but did not
- * close the file until the balancer exits.
- * This prevents the second balancer from running because it can not
- * creates the file while the first one is running.
- *
- * This method checks if there is any running balancer and
- * if no, mark yes if no.
- * Note that this is an atomic operation.
- *
- * Return null if there is a running balancer; otherwise the output stream
- * to the newly created file.
+ /**
+ * Balance all namenodes.
+ * For each iteration,
+ * for each namenode,
+ * execute a {@link Balancer} to work through all datanodes once.
*/
- private OutputStream checkAndMarkRunningBalancer() throws IOException {
+ static int run(List<InetSocketAddress> namenodes, final Parameters p,
+ Configuration conf) throws IOException, InterruptedException {
+ final long sleeptime = 2000*conf.getLong(
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+ LOG.info("namenodes = " + namenodes);
+ LOG.info("p = " + p);
+
+ final Formatter formatter = new Formatter(System.out);
+ System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
+
+ final List<NameNodeConnector> connectors
+ = new ArrayList<NameNodeConnector>(namenodes.size());
try {
- DataOutputStream out = fs.create(BALANCER_ID_PATH);
- out. writeBytes(InetAddress.getLocalHost().getHostName());
- out.flush();
- return out;
- } catch(RemoteException e) {
- if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
- return null;
- } else {
- throw e;
+ for(InetSocketAddress isa : namenodes) {
+ connectors.add(new NameNodeConnector(isa, conf));
+ }
+
+ boolean done = false;
+ for(int iteration = 0; !done; iteration++) {
+ done = true;
+ Collections.shuffle(connectors);
+ for(NameNodeConnector nnc : connectors) {
+ final Balancer b = new Balancer(nnc, p, conf);
+ final ReturnStatus r = b.run(iteration, formatter);
+ if (r == ReturnStatus.IN_PROGRESS) {
+ done = false;
+ } else if (r != ReturnStatus.SUCCESS) {
+ //must be an error statue, return.
+ return r.code;
+ }
+ }
+
+ if (!done) {
+ Thread.sleep(sleeptime);
+ }
+ }
+ } finally {
+ for(NameNodeConnector nnc : connectors) {
+ nnc.close();
}
}
+ return ReturnStatus.SUCCESS.code;
}
-
+
/* Given elaspedTime in ms, return a printable string */
private static String time2Str(long elapsedTime) {
String unit;
@@ -1635,15 +1442,116 @@ public class Balancer implements Tool {
return time+" "+unit;
}
- /** return this balancer's configuration */
- public Configuration getConf() {
- return conf;
+ static class Parameters {
+ static final Parameters DEFALUT = new Parameters(
+ BalancingPolicy.Node.INSTANCE, 10.0);
+
+ final BalancingPolicy policy;
+ final double threshold;
+
+ Parameters(BalancingPolicy policy, double threshold) {
+ this.policy = policy;
+ this.threshold = threshold;
+ }
+
+ @Override
+ public String toString() {
+ return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
+ + "[" + policy + ", threshold=" + threshold + "]";
+ }
}
- /** set this balancer's configuration */
- public void setConf(Configuration conf) {
- this.conf = conf;
- movedBlocks.setWinWidth(conf);
+ static class Cli extends Configured implements Tool {
+ /** Parse arguments and then run Balancer */
+ @Override
+ public int run(String[] args) {
+ final long startTime = Util.now();
+ final Configuration conf = getConf();
+ WIN_WIDTH = conf.getLong(
+ DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
+ DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
+
+ try {
+ checkReplicationPolicyCompatibility(conf);
+
+ final List<InetSocketAddress> namenodes = DFSUtil.getNNServiceRpcAddresses(conf);
+ return Balancer.run(namenodes, parse(args), conf);
+ } catch (IOException e) {
+ System.out.println(e + ". Exiting ...");
+ return ReturnStatus.IO_EXCEPTION.code;
+ } catch (InterruptedException e) {
+ System.out.println(e + ". Exiting ...");
+ return ReturnStatus.INTERRUPTED.code;
+ } finally {
+ System.out.println("Balancing took " + time2Str(Util.now()-startTime));
+ }
+ }
+
+ /** parse command line arguments */
+ static Parameters parse(String[] args) {
+ BalancingPolicy policy = Parameters.DEFALUT.policy;
+ double threshold = Parameters.DEFALUT.threshold;
+
+ if (args != null) {
+ try {
+ for(int i = 0; i < args.length; i++) {
+ if ("-threshold".equalsIgnoreCase(args[i])) {
+ i++;
+ try {
+ threshold = Double.parseDouble(args[i]);
+ if (threshold < 0 || threshold > 100) {
+ throw new NumberFormatException(
+ "Number out of range: threshold = " + threshold);
+ }
+ LOG.info( "Using a threshold of " + threshold );
+ } catch(NumberFormatException e) {
+ System.err.println(
+ "Expecting a number in the range of [0.0, 100.0]: "
+ + args[i]);
+ throw e;
+ }
+ } else if ("-policy".equalsIgnoreCase(args[i])) {
+ i++;
+ try {
+ policy = BalancingPolicy.parse(args[i]);
+ } catch(IllegalArgumentException e) {
+ System.err.println("Illegal policy name: " + args[i]);
+ throw e;
+ }
+ } else {
+ throw new IllegalArgumentException("args = "
+ + Arrays.toString(args));
+ }
+ }
+ } catch(RuntimeException e) {
+ printUsage();
+ throw e;
+ }
+ }
+
+ return new Parameters(policy, threshold);
+ }
+
+ private static void printUsage() {
+ System.out.println("Usage: java " + Balancer.class.getSimpleName());
+ System.out.println(" [-policy <policy>]\tthe balancing policy: "
+ + BalancingPolicy.Node.INSTANCE.getName() + " or "
+ + BalancingPolicy.Pool.INSTANCE.getName());
+ System.out.println(
+ " [-threshold <threshold>]\tPercentage of disk capacity");
+ }
}
+ /**
+ * Run a balancer
+ * @param args Command line arguments
+ */
+ public static void main(String[] args) {
+ try {
+ System.exit(ToolRunner.run(null, new Cli(), args));
+ } catch (Throwable e) {
+ LOG.error(StringUtils.stringifyException(e));
+ System.exit(-1);
+ }
+ }
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Fri Apr 29 18:16:32 2011
@@ -41,6 +41,8 @@ public interface HdfsConstants {
/** Startup options */
static public enum StartupOption{
FORMAT ("-format"),
+ CLUSTERID ("-clusterid"),
+ GENCLUSTERID ("-genclusterid"),
REGULAR ("-regular"),
BACKUP ("-backup"),
CHECKPOINT("-checkpoint"),
@@ -50,6 +52,10 @@ public interface HdfsConstants {
IMPORT ("-importCheckpoint");
private String name = null;
+
+ // Used only with format and upgrade options
+ private String clusterId = null;
+
private StartupOption(String arg) {this.name = arg;}
public String getName() {return name;}
public NamenodeRole toNodeRole() {
@@ -62,7 +68,14 @@ public interface HdfsConstants {
return NamenodeRole.ACTIVE;
}
}
-
+
+ public void setClusterId(String cid) {
+ clusterId = cid;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
}
// Timeouts for communicating with DataNode for streaming writes/reads
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Fri Apr 29 18:16:32 2011
@@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.Random;
import java.util.TreeSet;
+import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.jsp.JspWriter;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.BlockReade
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -66,6 +68,7 @@ public class JspHelper {
public static final String CURRENT_CONF = "current.conf";
final static public String WEB_UGI_PROPERTY_NAME = "dfs.web.ugi";
public static final String DELEGATION_PARAMETER_NAME = "delegation";
+ public static final String NAMENODE_ADDRESS = "nnaddr";
static final String SET_DELEGATION = "&" + DELEGATION_PARAMETER_NAME +
"=";
private static final Log LOG = LogFactory.getLog(JspHelper.class);
@@ -181,7 +184,7 @@ public class JspHelper {
return chosenNode;
}
- public static void streamBlockInAscii(InetSocketAddress addr,
+ public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
long blockSize, long offsetIntoBlock, long chunkSizeToView,
JspWriter out, Configuration conf) throws IOException {
@@ -193,9 +196,9 @@ public class JspHelper {
long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
// Use the block name for file name.
- String file = BlockReader.getFileName(addr, blockId);
+ String file = BlockReader.getFileName(addr, poolId, blockId);
BlockReader blockReader = BlockReader.newBlockReader(s, file,
- new Block(blockId, 0, genStamp), blockToken,
+ new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
offsetIntoBlock, amtToRead, conf.getInt("io.file.buffer.size", 4096));
byte[] buf = new byte[(int)amtToRead];
@@ -359,14 +362,16 @@ public class JspHelper {
public static void printPathWithLinks(String dir, JspWriter out,
int namenodeInfoPort,
- String tokenString
+ String tokenString,
+ String nnAddress
) throws IOException {
try {
String[] parts = dir.split(Path.SEPARATOR);
StringBuilder tempPath = new StringBuilder(dir.length());
out.print("<a href=\"browseDirectory.jsp" + "?dir="+ Path.SEPARATOR
+ "&namenodeInfoPort=" + namenodeInfoPort
- + getDelegationTokenUrlParam(tokenString) + "\">" + Path.SEPARATOR
+ + getDelegationTokenUrlParam(tokenString)
+ + getUrlParam(NAMENODE_ADDRESS, nnAddress) + "\">" + Path.SEPARATOR
+ "</a>");
tempPath.append(Path.SEPARATOR);
for (int i = 0; i < parts.length-1; i++) {
@@ -374,7 +379,8 @@ public class JspHelper {
tempPath.append(parts[i]);
out.print("<a href=\"browseDirectory.jsp" + "?dir="
+ tempPath.toString() + "&namenodeInfoPort=" + namenodeInfoPort
- + getDelegationTokenUrlParam(tokenString));
+ + getDelegationTokenUrlParam(tokenString)
+ + getUrlParam(NAMENODE_ADDRESS, nnAddress));
out.print("\">" + parts[i] + "</a>" + Path.SEPARATOR);
tempPath.append(Path.SEPARATOR);
}
@@ -391,7 +397,8 @@ public class JspHelper {
public static void printGotoForm(JspWriter out,
int namenodeInfoPort,
String tokenString,
- String file) throws IOException {
+ String file,
+ String nnAddress) throws IOException {
out.print("<form action=\"browseDirectory.jsp\" method=\"get\" name=\"goto\">");
out.print("Goto : ");
out.print("<input name=\"dir\" type=\"text\" width=\"50\" id\"dir\" value=\""+ file+"\">");
@@ -402,6 +409,8 @@ public class JspHelper {
out.print("<input name=\"" + DELEGATION_PARAMETER_NAME
+ "\" type=\"hidden\" value=\"" + tokenString + "\">");
}
+ out.print("<input name=\""+ NAMENODE_ADDRESS +"\" type=\"hidden\" "
+ + "value=\"" + nnAddress + "\">");
out.print("</form>");
}
@@ -475,16 +484,43 @@ public class JspHelper {
return UserGroupInformation.createRemoteUser(strings[0]);
}
+ private static String getNNServiceAddress(ServletContext context,
+ HttpServletRequest request) {
+ String namenodeAddressInUrl = request.getParameter(NAMENODE_ADDRESS);
+ InetSocketAddress namenodeAddress = null;
+ if (namenodeAddressInUrl != null) {
+ namenodeAddress = DFSUtil.getSocketAddress(namenodeAddressInUrl);
+ } else if (context != null) {
+ namenodeAddress = (InetSocketAddress) context
+ .getAttribute(NameNode.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
+ }
+ if (namenodeAddress != null) {
+ return (namenodeAddress.getAddress().getHostAddress() + ":"
+ + namenodeAddress.getPort());
+ }
+ return null;
+ }
+
+ /**
+ * See
+ * {@link JspHelper#getUGI(ServletContext, HttpServletRequest, Configuration)}
+ * , ServletContext is passed as null.
+ */
+ public static UserGroupInformation getUGI(HttpServletRequest request,
+ Configuration conf) throws IOException {
+ return getUGI(null, request, conf);
+ }
+
/**
* Get {@link UserGroupInformation} and possibly the delegation token out of
* the request.
+ * @param context the ServletContext that is serving this request.
* @param request the http request
* @return a new user from the request
* @throws AccessControlException if the request has no token
*/
- public static UserGroupInformation getUGI(HttpServletRequest request,
- Configuration conf
- ) throws IOException {
+ public static UserGroupInformation getUGI(ServletContext context,
+ HttpServletRequest request, Configuration conf) throws IOException {
UserGroupInformation ugi = null;
if(UserGroupInformation.isSecurityEnabled()) {
String user = request.getRemoteUser();
@@ -493,12 +529,12 @@ public class JspHelper {
Token<DelegationTokenIdentifier> token =
new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(tokenString);
- InetSocketAddress serviceAddr = NameNode.getAddress(conf);
- LOG.info("Setting service in token: "
- + new Text(serviceAddr.getAddress().getHostAddress() + ":"
- + serviceAddr.getPort()));
- token.setService(new Text(serviceAddr.getAddress().getHostAddress()
- + ":" + serviceAddr.getPort()));
+ String serviceAddress = getNNServiceAddress(context, request);
+ if (serviceAddress != null) {
+ LOG.info("Setting service in token: "
+ + new Text(serviceAddress));
+ token.setService(new Text(serviceAddress));
+ }
ByteArrayInputStream buf = new ByteArrayInputStream(token
.getIdentifier());
DataInputStream in = new DataInputStream(buf);
@@ -549,5 +585,13 @@ public class JspHelper {
}
}
-
+ /**
+ * Returns the url parameter for the given bpid string.
+ * @param name parameter name
+ * @param val parameter value
+ * @return url parameter
+ */
+ public static String getUrlParam(String name, String val) {
+ return val == null ? "" : "&" + name + "=" + val;
+ }
}