You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2009/05/13 19:02:58 UTC
svn commit: r774433 [3/3] - in /hadoop/core/trunk: ./
src/core/org/apache/hadoop/security/ src/hdfs/
src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/
src/hdfs/org/apache/hadoop/hdfs/server/balancer/
src/hdfs/org/apache/hadoop/...
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java Wed May 13 17:02:29 2009
@@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessToken;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
@@ -105,7 +106,7 @@
}
public static void streamBlockInAscii(InetSocketAddress addr, long blockId,
- long genStamp, long blockSize,
+ AccessToken accessToken, long genStamp, long blockSize,
long offsetIntoBlock, long chunkSizeToView, JspWriter out)
throws IOException {
if (chunkSizeToView == 0) return;
@@ -118,7 +119,7 @@
// Use the block name for file name.
DFSClient.BlockReader blockReader =
DFSClient.BlockReader.newBlockReader(s, addr.toString() + ":" + blockId,
- blockId, genStamp ,offsetIntoBlock,
+ blockId, accessToken, genStamp ,offsetIntoBlock,
amtToRead,
conf.getInt("io.file.buffer.size",
4096));
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed May 13 17:02:29 2009
@@ -38,6 +38,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -53,6 +54,8 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.AccessKey;
+import org.apache.hadoop.security.ExportedAccessKeys;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -434,6 +437,11 @@
return namesystem.getBlocks(datanode, size);
}
+ /** {@inheritDoc} */
+ public ExportedAccessKeys getAccessKeys() throws IOException {
+ return namesystem.getAccessKeys();
+ }
+
@Override // NamenodeProtocol
public void errorReport(NamenodeRegistration registration,
int errorCode,
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Wed May 13 17:02:29 2009
@@ -430,6 +430,7 @@
DFSClient.BlockReader.newBlockReader(s, targetAddr.toString() + ":" +
block.getBlockId(),
block.getBlockId(),
+ lblock.getAccessToken(),
block.getGenerationStamp(),
0, -1,
conf.getInt("io.file.buffer.size", 4096));
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Wed May 13 17:02:29 2009
@@ -35,10 +35,10 @@
**********************************************************************/
public interface DatanodeProtocol extends VersionedProtocol {
/**
- * 19: SendHeartbeat returns an array of DatanodeCommand objects
- * in stead of a DatanodeCommand object.
+ * 20: SendHeartbeat may return KeyUpdateCommand
+ * Register returns access keys inside DatanodeRegistration object
*/
- public static final long versionID = 19L;
+ public static final long versionID = 20L;
// error code
final static int NOTIFY = 0;
@@ -56,6 +56,7 @@
final static int DNA_REGISTER = 4; // re-register
final static int DNA_FINALIZE = 5; // finalize previous upgrade
final static int DNA_RECOVERBLOCK = 6; // request a block recovery
+ final static int DNA_ACCESSKEYUPDATE = 7; // update access key
/**
* Register Datanode.
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java Wed May 13 17:02:29 2009
@@ -29,6 +29,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.security.ExportedAccessKeys;
/**
* DatanodeRegistration class contains all information the name-node needs
@@ -46,6 +47,7 @@
}
public StorageInfo storageInfo;
+ public ExportedAccessKeys exportedKeys;
/**
* Default constructor.
@@ -60,6 +62,7 @@
public DatanodeRegistration(String nodeName) {
super(nodeName);
this.storageInfo = new StorageInfo();
+ this.exportedKeys = new ExportedAccessKeys();
}
public void setInfoPort(int infoPort) {
@@ -115,6 +118,7 @@
out.writeShort(ipcPort);
storageInfo.write(out);
+ exportedKeys.write(out);
}
/** {@inheritDoc} */
@@ -125,5 +129,6 @@
this.ipcPort = in.readShort() & 0x0000ffff;
storageInfo.readFields(in);
+ exportedKeys.readFields(in);
}
}
Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java?rev=774433&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java Wed May 13 17:02:29 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.security.ExportedAccessKeys;
+
+public class KeyUpdateCommand extends DatanodeCommand {
+ private ExportedAccessKeys keys;
+
+ KeyUpdateCommand() {
+ this(new ExportedAccessKeys());
+ }
+
+ public KeyUpdateCommand(ExportedAccessKeys keys) {
+ super(DatanodeProtocol.DNA_ACCESSKEYUPDATE);
+ this.keys = keys;
+ }
+
+ public ExportedAccessKeys getExportedKeys() {
+ return this.keys;
+ }
+
+ // ///////////////////////////////////////////////
+ // Writable
+ // ///////////////////////////////////////////////
+ static { // register a ctor
+ WritableFactories.setFactory(KeyUpdateCommand.class, new WritableFactory() {
+ public Writable newInstance() {
+ return new KeyUpdateCommand();
+ }
+ });
+ }
+
+ /**
+ */
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ keys.write(out);
+ }
+
+ /**
+ */
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ keys.readFields(in);
+ }
+}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java Wed May 13 17:02:29 2009
@@ -23,6 +23,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.ExportedAccessKeys;
/*****************************************************************************
* Protocol that a secondary NameNode uses to communicate with the NameNode.
@@ -34,12 +35,10 @@
* (Only the latest change is reflected.
* The log of historical changes can be retrieved from the svn).
*
- * 3: Backup node support: versionRequest(), errorReport(), register(),
- * startCheckpoint(), endCheckpoint(), journalSize(), journal().
- * SecondaryNameNode methods deprecated:
- * getEditLogSize(), rollEditLog(), rollFSImage().
+ * 4: new method added: getAccessKeys()
+ *
*/
- public static final long versionID = 3L;
+ public static final long versionID = 4L;
// Error codes passed by errorReport().
final static int NOTIFY = 0;
@@ -70,6 +69,14 @@
throws IOException;
/**
+ * Get the current access keys
+ *
+ * @return ExportedAccessKeys containing current access keys
+ * @throws IOException
+ */
+ public ExportedAccessKeys getAccessKeys() throws IOException;
+
+ /**
* Get the size of the current edit log (in bytes).
* @return The number of bytes in the current edit log.
* @throws IOException
Added: hadoop/core/trunk/src/test/core/org/apache/hadoop/security/TestAccessToken.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/core/org/apache/hadoop/security/TestAccessToken.java?rev=774433&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/core/org/apache/hadoop/security/TestAccessToken.java (added)
+++ hadoop/core/trunk/src/test/core/org/apache/hadoop/security/TestAccessToken.java Wed May 13 17:02:29 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.util.EnumSet;
+
+import org.apache.hadoop.io.TestWritable;
+
+import junit.framework.TestCase;
+
+/** Unit tests for access tokens */
+public class TestAccessToken extends TestCase {
+ long accessKeyUpdateInterval = 10 * 60 * 1000; // 10 mins
+ long accessTokenLifetime = 2 * 60 * 1000; // 2 mins
+ long blockID1 = 0L;
+ long blockID2 = 10L;
+ long blockID3 = -108L;
+
+ /** test Writable */
+ public void testWritable() throws Exception {
+ TestWritable.testWritable(ExportedAccessKeys.DUMMY_KEYS);
+ AccessTokenHandler handler = new AccessTokenHandler(true,
+ accessKeyUpdateInterval, accessTokenLifetime);
+ ExportedAccessKeys keys = handler.exportKeys();
+ TestWritable.testWritable(keys);
+ TestWritable.testWritable(AccessToken.DUMMY_TOKEN);
+ AccessToken token = handler.generateToken(blockID3, EnumSet
+ .allOf(AccessTokenHandler.AccessMode.class));
+ TestWritable.testWritable(token);
+ }
+
+ private void tokenGenerationAndVerification(AccessTokenHandler master,
+ AccessTokenHandler slave) throws Exception {
+ // single-mode tokens
+ for (AccessTokenHandler.AccessMode mode : AccessTokenHandler.AccessMode
+ .values()) {
+ // generated by master
+ AccessToken token1 = master.generateToken(blockID1, EnumSet.of(mode));
+ assertTrue(master.checkAccess(token1, null, blockID1, mode));
+ assertTrue(slave.checkAccess(token1, null, blockID1, mode));
+ // generated by slave
+ AccessToken token2 = slave.generateToken(blockID2, EnumSet.of(mode));
+ assertTrue(master.checkAccess(token2, null, blockID2, mode));
+ assertTrue(slave.checkAccess(token2, null, blockID2, mode));
+ }
+ // multi-mode tokens
+ AccessToken mtoken = master.generateToken(blockID3, EnumSet
+ .allOf(AccessTokenHandler.AccessMode.class));
+ for (AccessTokenHandler.AccessMode mode : AccessTokenHandler.AccessMode
+ .values()) {
+ assertTrue(master.checkAccess(mtoken, null, blockID3, mode));
+ assertTrue(slave.checkAccess(mtoken, null, blockID3, mode));
+ }
+ }
+
+ /** test access key and token handling */
+ public void testAccessTokenHandler() throws Exception {
+ AccessTokenHandler masterHandler = new AccessTokenHandler(true,
+ accessKeyUpdateInterval, accessTokenLifetime);
+ AccessTokenHandler slaveHandler = new AccessTokenHandler(false,
+ accessKeyUpdateInterval, accessTokenLifetime);
+ ExportedAccessKeys keys = masterHandler.exportKeys();
+ slaveHandler.setKeys(keys);
+ tokenGenerationAndVerification(masterHandler, slaveHandler);
+ // key updating
+ masterHandler.updateKeys();
+ tokenGenerationAndVerification(masterHandler, slaveHandler);
+ keys = masterHandler.exportKeys();
+ slaveHandler.setKeys(keys);
+ tokenGenerationAndVerification(masterHandler, slaveHandler);
+ }
+
+}
Modified: hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Wed May 13 17:02:29 2009
@@ -44,6 +44,7 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessToken;
import org.apache.hadoop.util.DataChecksum;
/**
@@ -183,6 +184,7 @@
Text.writeString(sendOut, "cl");// clientID
sendOut.writeBoolean(false); // no src node info
sendOut.writeInt(0); // number of downstream targets
+ AccessToken.DUMMY_TOKEN.write(sendOut);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
// bad bytes per checksum
@@ -218,6 +220,7 @@
Text.writeString(sendOut, "cl");// clientID
sendOut.writeBoolean(false); // no src node info
sendOut.writeInt(0);
+ AccessToken.DUMMY_TOKEN.write(sendOut);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt((int)512);
sendOut.writeInt(4); // size of packet
@@ -245,6 +248,7 @@
Text.writeString(sendOut, "cl");// clientID
sendOut.writeBoolean(false); // no src node info
sendOut.writeInt(0);
+ AccessToken.DUMMY_TOKEN.write(sendOut);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt((int)512); // checksum size
sendOut.writeInt(8); // size of packet
@@ -274,6 +278,7 @@
sendOut.writeLong(fileLen);
recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
Text.writeString(sendOut, "cl");
+ AccessToken.DUMMY_TOKEN.write(sendOut);
sendRecvData("Wrong block ID " + newBlockId + " for read", false);
// negative block start offset
@@ -285,6 +290,7 @@
sendOut.writeLong(-1L);
sendOut.writeLong(fileLen);
Text.writeString(sendOut, "cl");
+ AccessToken.DUMMY_TOKEN.write(sendOut);
sendRecvData("Negative start-offset for read for block " +
firstBlock.getBlockId(), false);
@@ -297,6 +303,7 @@
sendOut.writeLong(fileLen);
sendOut.writeLong(fileLen);
Text.writeString(sendOut, "cl");
+ AccessToken.DUMMY_TOKEN.write(sendOut);
sendRecvData("Wrong start-offset for reading block " +
firstBlock.getBlockId(), false);
@@ -311,6 +318,7 @@
sendOut.writeLong(0);
sendOut.writeLong(-1-random.nextInt(oneMil));
Text.writeString(sendOut, "cl");
+ AccessToken.DUMMY_TOKEN.write(sendOut);
sendRecvData("Negative length for reading block " +
firstBlock.getBlockId(), false);
@@ -325,6 +333,7 @@
sendOut.writeLong(0);
sendOut.writeLong(fileLen + 1);
Text.writeString(sendOut, "cl");
+ AccessToken.DUMMY_TOKEN.write(sendOut);
sendRecvData("Wrong length for reading block " +
firstBlock.getBlockId(), false);
@@ -337,6 +346,7 @@
sendOut.writeLong(0);
sendOut.writeLong(fileLen);
Text.writeString(sendOut, "cl");
+ AccessToken.DUMMY_TOKEN.write(sendOut);
readFile(fileSys, file, fileLen);
}
}
Modified: hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Wed May 13 17:02:29 2009
@@ -34,13 +34,13 @@
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessTokenHandler;
import junit.framework.TestCase;
/**
* This class tests if a balancer schedules tasks correctly.
*/
public class TestBalancer extends TestCase {
- private static final Configuration CONF = new Configuration();
final private static long CAPACITY = 500L;
final private static String RACK0 = "/rack0";
final private static String RACK1 = "/rack1";
@@ -56,14 +56,18 @@
private Random r = new Random();
static {
- CONF.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
- CONF.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
- CONF.setLong("dfs.heartbeat.interval", 1L);
- CONF.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
- CONF.setLong("dfs.balancer.movedWinWidth", 2000L);
Balancer.setBlockMoveWaitTime(1000L) ;
}
+ private void initConf(Configuration conf) {
+ conf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false);
+ conf.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ conf.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
+ conf.setLong("dfs.heartbeat.interval", 1L);
+ conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+ conf.setLong("dfs.balancer.movedWinWidth", 2000L);
+ }
+
/* create a file with a length of <code>fileLen</code> */
private void createFile(long fileLen, short replicationFactor)
throws IOException {
@@ -77,11 +81,11 @@
/* fill up a cluster with <code>numNodes</code> datanodes
* whose used space to be <code>size</code>
*/
- private Block[] generateBlocks(long size, short numNodes) throws IOException {
- cluster = new MiniDFSCluster( CONF, numNodes, true, null);
+ private Block[] generateBlocks(Configuration conf, long size, short numNodes) throws IOException {
+ cluster = new MiniDFSCluster( conf, numNodes, true, null);
try {
cluster.waitActive();
- client = DFSClient.createNamenode(CONF);
+ client = DFSClient.createNamenode(conf);
short replicationFactor = (short)(numNodes-1);
long fileLen = size/replicationFactor;
@@ -140,7 +144,7 @@
* then redistribute blocks according the required distribution.
* Afterwards a balancer is running to balance the cluster.
*/
- private void testUnevenDistribution(
+ private void testUnevenDistribution(Configuration conf,
long distribution[], long capacities[], String[] racks) throws Exception {
int numDatanodes = distribution.length;
if (capacities.length != numDatanodes || racks.length != numDatanodes) {
@@ -154,18 +158,18 @@
}
// fill the cluster
- Block[] blocks = generateBlocks(totalUsedSpace, (short)numDatanodes);
+ Block[] blocks = generateBlocks(conf, totalUsedSpace, (short)numDatanodes);
// redistribute blocks
Block[][] blocksDN = distributeBlocks(
blocks, (short)(numDatanodes-1), distribution);
// restart the cluster: do NOT format the cluster
- CONF.set("dfs.safemode.threshold.pct", "0.0f");
- cluster = new MiniDFSCluster(0, CONF, numDatanodes,
+ conf.set("dfs.safemode.threshold.pct", "0.0f");
+ cluster = new MiniDFSCluster(0, conf, numDatanodes,
false, true, null, racks, capacities);
cluster.waitActive();
- client = DFSClient.createNamenode(CONF);
+ client = DFSClient.createNamenode(conf);
cluster.injectBlocks(blocksDN);
@@ -173,7 +177,7 @@
for(long capacity:capacities) {
totalCapacity += capacity;
}
- runBalancer(totalUsedSpace, totalCapacity);
+ runBalancer(conf, totalUsedSpace, totalCapacity);
}
/* wait for one heartbeat */
@@ -194,15 +198,15 @@
* @param newCapacity new node's capacity
* @param new
*/
- private void test(long[] capacities, String[] racks,
+ private void test(Configuration conf, long[] capacities, String[] racks,
long newCapacity, String newRack) throws Exception {
int numOfDatanodes = capacities.length;
assertEquals(numOfDatanodes, racks.length);
- cluster = new MiniDFSCluster(0, CONF, capacities.length, true, true, null,
+ cluster = new MiniDFSCluster(0, conf, capacities.length, true, true, null,
racks, capacities);
try {
cluster.waitActive();
- client = DFSClient.createNamenode(CONF);
+ client = DFSClient.createNamenode(conf);
long totalCapacity=0L;
for(long capacity:capacities) {
@@ -212,25 +216,25 @@
long totalUsedSpace = totalCapacity*3/10;
createFile(totalUsedSpace/numOfDatanodes, (short)numOfDatanodes);
// start up an empty node with the same capacity and on the same rack
- cluster.startDataNodes(CONF, 1, true, null,
+ cluster.startDataNodes(conf, 1, true, null,
new String[]{newRack}, new long[]{newCapacity});
totalCapacity += newCapacity;
// run balancer and validate results
- runBalancer(totalUsedSpace, totalCapacity);
+ runBalancer(conf, totalUsedSpace, totalCapacity);
} finally {
cluster.shutdown();
}
}
/* Start balancer and check if the cluster is balanced after the run */
- private void runBalancer( long totalUsedSpace, long totalCapacity )
+ private void runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity )
throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity);
// start rebalancing
- balancer = new Balancer(CONF);
+ balancer = new Balancer(conf);
balancer.run(new String[0]);
waitForHeartBeat(totalUsedSpace, totalCapacity);
@@ -258,18 +262,27 @@
/** Test a cluster with even distribution,
* then a new empty node is added to the cluster*/
public void testBalancer0() throws Exception {
+ Configuration conf = new Configuration();
+ initConf(conf);
/** one-node cluster test*/
// add an empty node with half of the CAPACITY & the same rack
- test(new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
+ test(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
/** two-node cluster test */
- test(new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ test(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
CAPACITY, RACK2);
+
+ /** End-to-end testing of access token, involving NN, DN, and Balancer */
+ Configuration newConf = new Configuration(conf);
+ newConf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, true);
+ test(newConf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
}
/** Test unevenly distributed cluster */
public void testBalancer1() throws Exception {
- testUnevenDistribution(
+ Configuration conf = new Configuration();
+ initConf(conf);
+ testUnevenDistribution(conf,
new long[] {50*CAPACITY/100, 10*CAPACITY/100},
new long[]{CAPACITY, CAPACITY},
new String[] {RACK0, RACK1});
Modified: hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Wed May 13 17:02:29 2009
@@ -47,6 +47,7 @@
import org.apache.hadoop.hdfs.server.datanode.BlockTransferThrottler;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessToken;
/**
* This class tests if block replacement request to data nodes work correctly.
*/
@@ -231,6 +232,7 @@
out.writeLong(block.getGenerationStamp());
Text.writeString(out, source.getStorageID());
sourceProxy.write(out);
+ AccessToken.DUMMY_TOKEN.write(out);
out.flush();
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());
Modified: hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Wed May 13 17:02:29 2009
@@ -31,6 +31,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.AccessToken;
import junit.framework.TestCase;
@@ -119,6 +120,7 @@
Text.writeString( out, "" );
out.writeBoolean(false); // Not sending src node information
out.writeInt(0);
+ AccessToken.DUMMY_TOKEN.write(out);
// write check header
out.writeByte( 1 );
Modified: hadoop/core/trunk/src/webapps/datanode/browseBlock.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/datanode/browseBlock.jsp?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/datanode/browseBlock.jsp (original)
+++ hadoop/core/trunk/src/webapps/datanode/browseBlock.jsp Wed May 13 17:02:29 2009
@@ -31,6 +31,8 @@
import="org.apache.hadoop.io.*"
import="org.apache.hadoop.conf.*"
import="org.apache.hadoop.net.DNS"
+ import="org.apache.hadoop.security.AccessToken"
+ import="org.apache.hadoop.security.AccessTokenHandler"
import="org.apache.hadoop.util.*"
import="java.text.DateFormat"
%>
@@ -204,6 +206,26 @@
}
blockId = Long.parseLong(blockIdStr);
+ final DFSClient dfs = new DFSClient(datanode.getNameNodeAddr(), JspHelper.conf);
+
+ AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+ if (JspHelper.conf
+ .getBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false)) {
+ List<LocatedBlock> blks = dfs.namenode.getBlockLocations(filename, 0,
+ Long.MAX_VALUE).getLocatedBlocks();
+ if (blks == null || blks.size() == 0) {
+ out.print("Can't locate file blocks");
+ dfs.close();
+ return;
+ }
+ for (int i = 0; i < blks.size(); i++) {
+ if (blks.get(i).getBlock().getBlockId() == blockId) {
+ accessToken = blks.get(i).getAccessToken();
+ break;
+ }
+ }
+ }
+
String blockGenStamp = null;
long genStamp = 0;
blockGenStamp = req.getParameter("genstamp");
@@ -251,7 +273,6 @@
out.print("<hr>");
//Determine the prev & next blocks
- final DFSClient dfs = new DFSClient(datanode.getNameNodeAddr(), JspHelper.conf);
long nextStartOffset = 0;
long nextBlockSize = 0;
String nextBlockIdStr = null;
@@ -366,7 +387,7 @@
try {
JspHelper.streamBlockInAscii(
new InetSocketAddress(req.getServerName(), datanodePort), blockId,
- genStamp, blockSize, startOffset, chunkSizeToView, out);
+ accessToken, genStamp, blockSize, startOffset, chunkSizeToView, out);
} catch (Exception e){
out.print(e);
}
Modified: hadoop/core/trunk/src/webapps/datanode/tail.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/datanode/tail.jsp?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/datanode/tail.jsp (original)
+++ hadoop/core/trunk/src/webapps/datanode/tail.jsp Wed May 13 17:02:29 2009
@@ -31,6 +31,7 @@
import="org.apache.hadoop.io.*"
import="org.apache.hadoop.conf.*"
import="org.apache.hadoop.net.DNS"
+ import="org.apache.hadoop.security.AccessToken"
import="org.apache.hadoop.util.*"
import="org.apache.hadoop.net.NetUtils"
import="java.text.DateFormat"
@@ -93,6 +94,7 @@
LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
long blockSize = lastBlk.getBlock().getNumBytes();
long blockId = lastBlk.getBlock().getBlockId();
+ AccessToken accessToken = lastBlk.getAccessToken();
long genStamp = lastBlk.getBlock().getGenerationStamp();
DatanodeInfo chosenNode;
try {
@@ -107,7 +109,7 @@
final long startOffset = blockSize >= chunkSizeToView? blockSize - chunkSizeToView: 0;
out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
- JspHelper.streamBlockInAscii(addr, blockId, genStamp, blockSize, startOffset, chunkSizeToView, out);
+ JspHelper.streamBlockInAscii(addr, blockId, accessToken, genStamp, blockSize, startOffset, chunkSizeToView, out);
out.print("</textarea>");
dfs.close();
}