You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2009/05/13 19:02:58 UTC

svn commit: r774433 [3/3] - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/security/ src/hdfs/ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/balancer/ src/hdfs/org/apache/hadoop/...

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java Wed May 13 17:02:29 2009
@@ -40,6 +40,7 @@
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
@@ -105,7 +106,7 @@
   }
 
   public static void streamBlockInAscii(InetSocketAddress addr, long blockId, 
-                                 long genStamp, long blockSize, 
+                                 AccessToken accessToken, long genStamp, long blockSize, 
                                  long offsetIntoBlock, long chunkSizeToView, JspWriter out) 
     throws IOException {
     if (chunkSizeToView == 0) return;
@@ -118,7 +119,7 @@
       // Use the block name for file name. 
       DFSClient.BlockReader blockReader = 
         DFSClient.BlockReader.newBlockReader(s, addr.toString() + ":" + blockId,
-                                             blockId, genStamp ,offsetIntoBlock, 
+                                             blockId, accessToken, genStamp ,offsetIntoBlock, 
                                              amtToRead, 
                                              conf.getInt("io.file.buffer.size",
                                                          4096));

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed May 13 17:02:29 2009
@@ -38,6 +38,7 @@
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -53,6 +54,8 @@
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.AccessKey;
+import org.apache.hadoop.security.ExportedAccessKeys;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -434,6 +437,11 @@
     return namesystem.getBlocks(datanode, size); 
   }
 
+  /** {@inheritDoc} */
+  public ExportedAccessKeys getAccessKeys() throws IOException {
+    return namesystem.getAccessKeys();
+  }
+
   @Override // NamenodeProtocol
   public void errorReport(NamenodeRegistration registration,
                           int errorCode, 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Wed May 13 17:02:29 2009
@@ -430,6 +430,7 @@
           DFSClient.BlockReader.newBlockReader(s, targetAddr.toString() + ":" + 
                                                block.getBlockId(), 
                                                block.getBlockId(), 
+                                               lblock.getAccessToken(),
                                                block.getGenerationStamp(), 
                                                0, -1,
                                                conf.getInt("io.file.buffer.size", 4096));

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Wed May 13 17:02:29 2009
@@ -35,10 +35,10 @@
  **********************************************************************/
 public interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 19: SendHeartbeat returns an array of DatanodeCommand objects
-   *     in stead of a DatanodeCommand object.
+   * 20: SendHeartbeat may return KeyUpdateCommand
+   *     Register returns access keys inside DatanodeRegistration object
    */
-  public static final long versionID = 19L;
+  public static final long versionID = 20L;
   
   // error code
   final static int NOTIFY = 0;
@@ -56,6 +56,7 @@
   final static int DNA_REGISTER = 4;   // re-register
   final static int DNA_FINALIZE = 5;   // finalize previous upgrade
   final static int DNA_RECOVERBLOCK = 6;  // request a block recovery
+  final static int DNA_ACCESSKEYUPDATE = 7;  // update access key
 
   /** 
    * Register Datanode.

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java Wed May 13 17:02:29 2009
@@ -29,6 +29,7 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.security.ExportedAccessKeys;
 
 /** 
  * DatanodeRegistration class contains all information the name-node needs
@@ -46,6 +47,7 @@
   }
 
   public StorageInfo storageInfo;
+  public ExportedAccessKeys exportedKeys;
 
   /**
    * Default constructor.
@@ -60,6 +62,7 @@
   public DatanodeRegistration(String nodeName) {
     super(nodeName);
     this.storageInfo = new StorageInfo();
+    this.exportedKeys = new ExportedAccessKeys();
   }
   
   public void setInfoPort(int infoPort) {
@@ -115,6 +118,7 @@
     out.writeShort(ipcPort);
 
     storageInfo.write(out);
+    exportedKeys.write(out);
   }
 
   /** {@inheritDoc} */
@@ -125,5 +129,6 @@
     this.ipcPort = in.readShort() & 0x0000ffff;
 
     storageInfo.readFields(in);
+    exportedKeys.readFields(in);
   }
 }

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java?rev=774433&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java Wed May 13 17:02:29 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.security.ExportedAccessKeys;
+
+public class KeyUpdateCommand extends DatanodeCommand {
+  private ExportedAccessKeys keys;
+
+  KeyUpdateCommand() {
+    this(new ExportedAccessKeys());
+  }
+
+  public KeyUpdateCommand(ExportedAccessKeys keys) {
+    super(DatanodeProtocol.DNA_ACCESSKEYUPDATE);
+    this.keys = keys;
+  }
+
+  public ExportedAccessKeys getExportedKeys() {
+    return this.keys;
+  }
+
+  // ///////////////////////////////////////////////
+  // Writable
+  // ///////////////////////////////////////////////
+  static { // register a ctor
+    WritableFactories.setFactory(KeyUpdateCommand.class, new WritableFactory() {
+      public Writable newInstance() {
+        return new KeyUpdateCommand();
+      }
+    });
+  }
+
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    keys.write(out);
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    keys.readFields(in);
+  }
+}

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java Wed May 13 17:02:29 2009
@@ -23,6 +23,7 @@
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.ExportedAccessKeys;
 
 /*****************************************************************************
  * Protocol that a secondary NameNode uses to communicate with the NameNode.
@@ -34,12 +35,10 @@
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
    * 
-   * 3: Backup node support: versionRequest(), errorReport(), register(),
-   *      startCheckpoint(), endCheckpoint(), journalSize(), journal().
-   *    SecondaryNameNode methods deprecated:
-   *      getEditLogSize(), rollEditLog(), rollFSImage().
+   * 4: new method added: getAccessKeys()
+   *      
    */
-  public static final long versionID = 3L;
+  public static final long versionID = 4L;
 
   // Error codes passed by errorReport().
   final static int NOTIFY = 0;
@@ -70,6 +69,14 @@
   throws IOException;
 
   /**
+   * Get the current access keys
+   * 
+   * @return ExportedAccessKeys containing current access keys
+   * @throws IOException 
+   */
+  public ExportedAccessKeys getAccessKeys() throws IOException;
+
+  /**
    * Get the size of the current edit log (in bytes).
    * @return The number of bytes in the current edit log.
    * @throws IOException

Added: hadoop/core/trunk/src/test/core/org/apache/hadoop/security/TestAccessToken.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/core/org/apache/hadoop/security/TestAccessToken.java?rev=774433&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/core/org/apache/hadoop/security/TestAccessToken.java (added)
+++ hadoop/core/trunk/src/test/core/org/apache/hadoop/security/TestAccessToken.java Wed May 13 17:02:29 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.util.EnumSet;
+
+import org.apache.hadoop.io.TestWritable;
+
+import junit.framework.TestCase;
+
+/** Unit tests for access tokens */
+public class TestAccessToken extends TestCase {
+  long accessKeyUpdateInterval = 10 * 60 * 1000; // 10 mins
+  long accessTokenLifetime = 2 * 60 * 1000; // 2 mins
+  long blockID1 = 0L;
+  long blockID2 = 10L;
+  long blockID3 = -108L;
+
+  /** test Writable */
+  public void testWritable() throws Exception {
+    TestWritable.testWritable(ExportedAccessKeys.DUMMY_KEYS);
+    AccessTokenHandler handler = new AccessTokenHandler(true,
+        accessKeyUpdateInterval, accessTokenLifetime);
+    ExportedAccessKeys keys = handler.exportKeys();
+    TestWritable.testWritable(keys);
+    TestWritable.testWritable(AccessToken.DUMMY_TOKEN);
+    AccessToken token = handler.generateToken(blockID3, EnumSet
+        .allOf(AccessTokenHandler.AccessMode.class));
+    TestWritable.testWritable(token);
+  }
+
+  private void tokenGenerationAndVerification(AccessTokenHandler master,
+      AccessTokenHandler slave) throws Exception {
+    // single-mode tokens
+    for (AccessTokenHandler.AccessMode mode : AccessTokenHandler.AccessMode
+        .values()) {
+      // generated by master
+      AccessToken token1 = master.generateToken(blockID1, EnumSet.of(mode));
+      assertTrue(master.checkAccess(token1, null, blockID1, mode));
+      assertTrue(slave.checkAccess(token1, null, blockID1, mode));
+      // generated by slave
+      AccessToken token2 = slave.generateToken(blockID2, EnumSet.of(mode));
+      assertTrue(master.checkAccess(token2, null, blockID2, mode));
+      assertTrue(slave.checkAccess(token2, null, blockID2, mode));
+    }
+    // multi-mode tokens
+    AccessToken mtoken = master.generateToken(blockID3, EnumSet
+        .allOf(AccessTokenHandler.AccessMode.class));
+    for (AccessTokenHandler.AccessMode mode : AccessTokenHandler.AccessMode
+        .values()) {
+      assertTrue(master.checkAccess(mtoken, null, blockID3, mode));
+      assertTrue(slave.checkAccess(mtoken, null, blockID3, mode));
+    }
+  }
+
+  /** test access key and token handling */
+  public void testAccessTokenHandler() throws Exception {
+    AccessTokenHandler masterHandler = new AccessTokenHandler(true,
+        accessKeyUpdateInterval, accessTokenLifetime);
+    AccessTokenHandler slaveHandler = new AccessTokenHandler(false,
+        accessKeyUpdateInterval, accessTokenLifetime);
+    ExportedAccessKeys keys = masterHandler.exportKeys();
+    slaveHandler.setKeys(keys);
+    tokenGenerationAndVerification(masterHandler, slaveHandler);
+    // key updating
+    masterHandler.updateKeys();
+    tokenGenerationAndVerification(masterHandler, slaveHandler);
+    keys = masterHandler.exportKeys();
+    slaveHandler.setKeys(keys);
+    tokenGenerationAndVerification(masterHandler, slaveHandler);
+  }
+
+}

Modified: hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Wed May 13 17:02:29 2009
@@ -44,6 +44,7 @@
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.util.DataChecksum;
 
 /**
@@ -183,6 +184,7 @@
     Text.writeString(sendOut, "cl");// clientID
     sendOut.writeBoolean(false); // no src node info
     sendOut.writeInt(0);           // number of downstream targets
+    AccessToken.DUMMY_TOKEN.write(sendOut);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     
     // bad bytes per checksum
@@ -218,6 +220,7 @@
     Text.writeString(sendOut, "cl");// clientID
     sendOut.writeBoolean(false); // no src node info
     sendOut.writeInt(0);
+    AccessToken.DUMMY_TOKEN.write(sendOut);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt((int)512);
     sendOut.writeInt(4);           // size of packet
@@ -245,6 +248,7 @@
     Text.writeString(sendOut, "cl");// clientID
     sendOut.writeBoolean(false); // no src node info
     sendOut.writeInt(0);
+    AccessToken.DUMMY_TOKEN.write(sendOut);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt((int)512);    // checksum size
     sendOut.writeInt(8);           // size of packet
@@ -274,6 +278,7 @@
     sendOut.writeLong(fileLen);
     recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
     Text.writeString(sendOut, "cl");
+    AccessToken.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Wrong block ID " + newBlockId + " for read", false); 
 
     // negative block start offset
@@ -285,6 +290,7 @@
     sendOut.writeLong(-1L);
     sendOut.writeLong(fileLen);
     Text.writeString(sendOut, "cl");
+    AccessToken.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Negative start-offset for read for block " + 
                  firstBlock.getBlockId(), false);
 
@@ -297,6 +303,7 @@
     sendOut.writeLong(fileLen);
     sendOut.writeLong(fileLen);
     Text.writeString(sendOut, "cl");
+    AccessToken.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Wrong start-offset for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -311,6 +318,7 @@
     sendOut.writeLong(0);
     sendOut.writeLong(-1-random.nextInt(oneMil));
     Text.writeString(sendOut, "cl");
+    AccessToken.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Negative length for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -325,6 +333,7 @@
     sendOut.writeLong(0);
     sendOut.writeLong(fileLen + 1);
     Text.writeString(sendOut, "cl");
+    AccessToken.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Wrong length for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -337,6 +346,7 @@
     sendOut.writeLong(0);
     sendOut.writeLong(fileLen);
     Text.writeString(sendOut, "cl");
+    AccessToken.DUMMY_TOKEN.write(sendOut);
     readFile(fileSys, file, fileLen);
   }
 }

Modified: hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Wed May 13 17:02:29 2009
@@ -34,13 +34,13 @@
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessTokenHandler;
 
 import junit.framework.TestCase;
 /**
  * This class tests if a balancer schedules tasks correctly.
  */
 public class TestBalancer extends TestCase {
-  private static final Configuration CONF = new Configuration();
   final private static long CAPACITY = 500L;
   final private static String RACK0 = "/rack0";
   final private static String RACK1 = "/rack1";
@@ -56,14 +56,18 @@
   private Random r = new Random();
 
   static {
-    CONF.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
-    CONF.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
-    CONF.setLong("dfs.heartbeat.interval", 1L);
-    CONF.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
-    CONF.setLong("dfs.balancer.movedWinWidth", 2000L);
     Balancer.setBlockMoveWaitTime(1000L) ;
   }
 
+  private void initConf(Configuration conf) {
+    conf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false);
+    conf.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    conf.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
+    conf.setLong("dfs.heartbeat.interval", 1L);
+    conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    conf.setLong("dfs.balancer.movedWinWidth", 2000L);
+  }
+
   /* create a file with a length of <code>fileLen</code> */
   private void createFile(long fileLen, short replicationFactor)
   throws IOException {
@@ -77,11 +81,11 @@
   /* fill up a cluster with <code>numNodes</code> datanodes 
    * whose used space to be <code>size</code>
    */
-  private Block[] generateBlocks(long size, short numNodes) throws IOException {
-    cluster = new MiniDFSCluster( CONF, numNodes, true, null);
+  private Block[] generateBlocks(Configuration conf, long size, short numNodes) throws IOException {
+    cluster = new MiniDFSCluster( conf, numNodes, true, null);
     try {
       cluster.waitActive();
-      client = DFSClient.createNamenode(CONF);
+      client = DFSClient.createNamenode(conf);
 
       short replicationFactor = (short)(numNodes-1);
       long fileLen = size/replicationFactor;
@@ -140,7 +144,7 @@
    * then redistribute blocks according the required distribution.
    * Afterwards a balancer is running to balance the cluster.
    */
-  private void testUnevenDistribution(
+  private void testUnevenDistribution(Configuration conf,
       long distribution[], long capacities[], String[] racks) throws Exception {
     int numDatanodes = distribution.length;
     if (capacities.length != numDatanodes || racks.length != numDatanodes) {
@@ -154,18 +158,18 @@
     }
 
     // fill the cluster
-    Block[] blocks = generateBlocks(totalUsedSpace, (short)numDatanodes);
+    Block[] blocks = generateBlocks(conf, totalUsedSpace, (short)numDatanodes);
 
     // redistribute blocks
     Block[][] blocksDN = distributeBlocks(
         blocks, (short)(numDatanodes-1), distribution);
 
     // restart the cluster: do NOT format the cluster
-    CONF.set("dfs.safemode.threshold.pct", "0.0f"); 
-    cluster = new MiniDFSCluster(0, CONF, numDatanodes,
+    conf.set("dfs.safemode.threshold.pct", "0.0f"); 
+    cluster = new MiniDFSCluster(0, conf, numDatanodes,
         false, true, null, racks, capacities);
     cluster.waitActive();
-    client = DFSClient.createNamenode(CONF);
+    client = DFSClient.createNamenode(conf);
 
     cluster.injectBlocks(blocksDN);
 
@@ -173,7 +177,7 @@
     for(long capacity:capacities) {
       totalCapacity += capacity;
     }
-    runBalancer(totalUsedSpace, totalCapacity);
+    runBalancer(conf, totalUsedSpace, totalCapacity);
   }
 
   /* wait for one heartbeat */
@@ -194,15 +198,15 @@
    * @param newCapacity new node's capacity
    * @param new 
    */
-  private void test(long[] capacities, String[] racks, 
+  private void test(Configuration conf, long[] capacities, String[] racks, 
       long newCapacity, String newRack) throws Exception {
     int numOfDatanodes = capacities.length;
     assertEquals(numOfDatanodes, racks.length);
-    cluster = new MiniDFSCluster(0, CONF, capacities.length, true, true, null, 
+    cluster = new MiniDFSCluster(0, conf, capacities.length, true, true, null, 
         racks, capacities);
     try {
       cluster.waitActive();
-      client = DFSClient.createNamenode(CONF);
+      client = DFSClient.createNamenode(conf);
 
       long totalCapacity=0L;
       for(long capacity:capacities) {
@@ -212,25 +216,25 @@
       long totalUsedSpace = totalCapacity*3/10;
       createFile(totalUsedSpace/numOfDatanodes, (short)numOfDatanodes);
       // start up an empty node with the same capacity and on the same rack
-      cluster.startDataNodes(CONF, 1, true, null,
+      cluster.startDataNodes(conf, 1, true, null,
           new String[]{newRack}, new long[]{newCapacity});
 
       totalCapacity += newCapacity;
 
       // run balancer and validate results
-      runBalancer(totalUsedSpace, totalCapacity);
+      runBalancer(conf, totalUsedSpace, totalCapacity);
     } finally {
       cluster.shutdown();
     }
   }
 
   /* Start balancer and check if the cluster is balanced after the run */
-  private void runBalancer( long totalUsedSpace, long totalCapacity )
+  private void runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity )
   throws Exception {
     waitForHeartBeat(totalUsedSpace, totalCapacity);
 
     // start rebalancing
-    balancer = new Balancer(CONF);
+    balancer = new Balancer(conf);
     balancer.run(new String[0]);
 
     waitForHeartBeat(totalUsedSpace, totalCapacity);
@@ -258,18 +262,27 @@
   /** Test a cluster with even distribution, 
    * then a new empty node is added to the cluster*/
   public void testBalancer0() throws Exception {
+    Configuration conf = new Configuration();
+    initConf(conf);
     /** one-node cluster test*/
     // add an empty node with half of the CAPACITY & the same rack
-    test(new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
+    test(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
 
     /** two-node cluster test */
-    test(new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+    test(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
         CAPACITY, RACK2);
+    
+    /** End-to-end testing of access token, involving NN, DN, and Balancer */
+    Configuration newConf = new Configuration(conf);
+    newConf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, true);
+    test(newConf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
   }
 
   /** Test unevenly distributed cluster */
   public void testBalancer1() throws Exception {
-    testUnevenDistribution(
+    Configuration conf = new Configuration();
+    initConf(conf);
+    testUnevenDistribution(conf,
         new long[] {50*CAPACITY/100, 10*CAPACITY/100},
         new long[]{CAPACITY, CAPACITY},
         new String[] {RACK0, RACK1});

Modified: hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Wed May 13 17:02:29 2009
@@ -47,6 +47,7 @@
 import org.apache.hadoop.hdfs.server.datanode.BlockTransferThrottler;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessToken;
 /**
  * This class tests if block replacement request to data nodes work correctly.
  */
@@ -231,6 +232,7 @@
     out.writeLong(block.getGenerationStamp());
     Text.writeString(out, source.getStorageID());
     sourceProxy.write(out);
+    AccessToken.DUMMY_TOKEN.write(out);
     out.flush();
     // receiveResponse
     DataInputStream reply = new DataInputStream(sock.getInputStream());

Modified: hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Wed May 13 17:02:29 2009
@@ -31,6 +31,7 @@
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.AccessToken;
 
 import junit.framework.TestCase;
 
@@ -119,6 +120,7 @@
       Text.writeString( out, "" );
       out.writeBoolean(false); // Not sending src node information
       out.writeInt(0);
+      AccessToken.DUMMY_TOKEN.write(out);
       
       // write check header
       out.writeByte( 1 );

Modified: hadoop/core/trunk/src/webapps/datanode/browseBlock.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/datanode/browseBlock.jsp?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/datanode/browseBlock.jsp (original)
+++ hadoop/core/trunk/src/webapps/datanode/browseBlock.jsp Wed May 13 17:02:29 2009
@@ -31,6 +31,8 @@
   import="org.apache.hadoop.io.*"
   import="org.apache.hadoop.conf.*"
   import="org.apache.hadoop.net.DNS"
+  import="org.apache.hadoop.security.AccessToken"
+  import="org.apache.hadoop.security.AccessTokenHandler"
   import="org.apache.hadoop.util.*"
   import="java.text.DateFormat"
 %>
@@ -204,6 +206,26 @@
     }
     blockId = Long.parseLong(blockIdStr);
 
+    final DFSClient dfs = new DFSClient(datanode.getNameNodeAddr(), JspHelper.conf);
+    
+    AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+    if (JspHelper.conf
+        .getBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false)) {
+      List<LocatedBlock> blks = dfs.namenode.getBlockLocations(filename, 0,
+          Long.MAX_VALUE).getLocatedBlocks();
+      if (blks == null || blks.size() == 0) {
+        out.print("Can't locate file blocks");
+        dfs.close();
+        return;
+      }
+      for (int i = 0; i < blks.size(); i++) {
+        if (blks.get(i).getBlock().getBlockId() == blockId) {
+          accessToken = blks.get(i).getAccessToken();
+          break;
+        }
+      }
+    }
+    
     String blockGenStamp = null;
     long genStamp = 0;
     blockGenStamp = req.getParameter("genstamp");
@@ -251,7 +273,6 @@
     out.print("<hr>");
 
     //Determine the prev & next blocks
-    final DFSClient dfs = new DFSClient(datanode.getNameNodeAddr(), JspHelper.conf);
     long nextStartOffset = 0;
     long nextBlockSize = 0;
     String nextBlockIdStr = null;
@@ -366,7 +387,7 @@
     try {
     JspHelper.streamBlockInAscii(
             new InetSocketAddress(req.getServerName(), datanodePort), blockId, 
-            genStamp, blockSize, startOffset, chunkSizeToView, out);
+            accessToken, genStamp, blockSize, startOffset, chunkSizeToView, out);
     } catch (Exception e){
         out.print(e);
     }

Modified: hadoop/core/trunk/src/webapps/datanode/tail.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/datanode/tail.jsp?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/datanode/tail.jsp (original)
+++ hadoop/core/trunk/src/webapps/datanode/tail.jsp Wed May 13 17:02:29 2009
@@ -31,6 +31,7 @@
   import="org.apache.hadoop.io.*"
   import="org.apache.hadoop.conf.*"
   import="org.apache.hadoop.net.DNS"
+  import="org.apache.hadoop.security.AccessToken"
   import="org.apache.hadoop.util.*"
   import="org.apache.hadoop.net.NetUtils"
   import="java.text.DateFormat"
@@ -93,6 +94,7 @@
     LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
     long blockSize = lastBlk.getBlock().getNumBytes();
     long blockId = lastBlk.getBlock().getBlockId();
+    AccessToken accessToken = lastBlk.getAccessToken();
     long genStamp = lastBlk.getBlock().getGenerationStamp();
     DatanodeInfo chosenNode;
     try {
@@ -107,7 +109,7 @@
     final long startOffset = blockSize >= chunkSizeToView? blockSize - chunkSizeToView: 0;
 
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
-    JspHelper.streamBlockInAscii(addr, blockId, genStamp, blockSize, startOffset, chunkSizeToView, out);
+    JspHelper.streamBlockInAscii(addr, blockId, accessToken, genStamp, blockSize, startOffset, chunkSizeToView, out);
     out.print("</textarea>");
     dfs.close();
   }