You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:39:04 UTC

svn commit: r1077086 [1/2] - in /hadoop/common/branches/branch-0.20-security-patches/src: core/org/apache/hadoop/security/ hdfs/ hdfs/org/apache/hadoop/hdfs/ hdfs/org/apache/hadoop/hdfs/protocol/ hdfs/org/apache/hadoop/hdfs/server/balancer/ hdfs/org/ap...

Author: omalley
Date: Fri Mar  4 03:39:02 2011
New Revision: 1077086

URL: http://svn.apache.org/viewvc?rev=1077086&view=rev
Log:
commit 06c9b3bab34e19faa6e62c8dfaf7756fabef5c58
Author: Jitendra Nath Pandey <ji...@yahoo-inc.com>
Date:   Tue Dec 22 00:16:39 2009 -0800

    HADOOP-4359 from https://issues.apache.org/jira/secure/attachment/12428711/HADOOP-4359-0_20.2.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    HADOOP-4359. Access Token: Support for data access authorization
    +    checking on DataNodes. (Jitendra Nath Pandey)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessKey.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessToken.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessTokenHandler.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/ExportedAccessKeys.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/InvalidAccessTokenException.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestAccessToken.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/hdfs-default.xml
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
    hadoop/common/branches/branch-0.20-security-patches/src/webapps/datanode/browseBlock.jsp
    hadoop/common/branches/branch-0.20-security-patches/src/webapps/datanode/tail.jsp

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessKey.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessKey.java?rev=1077086&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessKey.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessKey.java Fri Mar  4 03:39:02 2011
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import javax.crypto.Mac;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Key used for generating and verifying access tokens
+ */
+public class AccessKey implements Writable {
+  private long keyID;
+  private Text key;
+  private long expiryDate;
+  private Mac mac;
+
+  public AccessKey() {
+    this(0L, new Text(), 0L);
+  }
+
+  public AccessKey(long keyID, Text key, long expiryDate) {
+    this.keyID = keyID;
+    this.key = key;
+    this.expiryDate = expiryDate;
+  }
+
+  public long getKeyID() {
+    return keyID;
+  }
+
+  public Text getKey() {
+    return key;
+  }
+
+  public long getExpiryDate() {
+    return expiryDate;
+  }
+
+  public Mac getMac() {
+    return mac;
+  }
+
+  public void setMac(Mac mac) {
+    this.mac = mac;
+  }
+
+  static boolean isEqual(Object a, Object b) {
+    return a == null ? b == null : a.equals(b);
+  }
+
+  /** {@inheritDoc} */
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj instanceof AccessKey) {
+      AccessKey that = (AccessKey) obj;
+      return this.keyID == that.keyID && isEqual(this.key, that.key)
+          && this.expiryDate == that.expiryDate;
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public int hashCode() {
+    return key == null ? 0 : key.hashCode();
+  }
+
+  // ///////////////////////////////////////////////
+  // Writable
+  // ///////////////////////////////////////////////
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVLong(out, keyID);
+    key.write(out);
+    WritableUtils.writeVLong(out, expiryDate);
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    keyID = WritableUtils.readVLong(in);
+    key.readFields(in);
+    expiryDate = WritableUtils.readVLong(in);
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessToken.java?rev=1077086&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessToken.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessToken.java Fri Mar  4 03:39:02 2011
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+public class AccessToken implements Writable {
+  public static final AccessToken DUMMY_TOKEN = new AccessToken();
+  private Text tokenID;
+  private Text tokenAuthenticator;
+
+  public AccessToken() {
+    this(new Text(), new Text());
+  }
+
+  public AccessToken(Text tokenID, Text tokenAuthenticator) {
+    this.tokenID = tokenID;
+    this.tokenAuthenticator = tokenAuthenticator;
+  }
+
+  public Text getTokenID() {
+    return tokenID;
+  }
+
+  public Text getTokenAuthenticator() {
+    return tokenAuthenticator;
+  }
+
+  static boolean isEqual(Object a, Object b) {
+    return a == null ? b == null : a.equals(b);
+  }
+
+  /** {@inheritDoc} */
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj instanceof AccessToken) {
+      AccessToken that = (AccessToken) obj;
+      return isEqual(this.tokenID, that.tokenID)
+          && isEqual(this.tokenAuthenticator, that.tokenAuthenticator);
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public int hashCode() {
+    return tokenAuthenticator == null ? 0 : tokenAuthenticator.hashCode();
+  }
+
+  // ///////////////////////////////////////////////
+  // Writable
+  // ///////////////////////////////////////////////
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    tokenID.write(out);
+    tokenAuthenticator.write(out);
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    tokenID.readFields(in);
+    tokenAuthenticator.readFields(in);
+  }
+
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessTokenHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessTokenHandler.java?rev=1077086&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessTokenHandler.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessTokenHandler.java Fri Mar  4 03:39:02 2011
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.security.GeneralSecurityException;
+import java.security.SecureRandom;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * AccessTokenHandler can be instantiated in 2 modes, master mode and slave
+ * mode. Master can generate new access keys and export access keys to slaves,
+ * while slaves can only import and use access keys received from master. Both
+ * master and slave can generate and verify access tokens. Typically, master
+ * mode is used by NN and slave mode is used by DN.
+ */
+public class AccessTokenHandler {
+  private static final Log LOG = LogFactory.getLog(AccessTokenHandler.class);
+  public static final String STRING_ENABLE_ACCESS_TOKEN = "dfs.access.token.enable";
+  public static final String STRING_ACCESS_KEY_UPDATE_INTERVAL = "dfs.access.key.update.interval";
+  public static final String STRING_ACCESS_TOKEN_LIFETIME = "dfs.access.token.lifetime";
+
+  private final boolean isMaster;
+  /*
+   * keyUpdateInterval is the interval that NN updates its access keys. It
+   * should be set long enough so that all live DN's and Balancer should have
+   * sync'ed their access keys with NN at least once during each interval.
+   */
+  private final long keyUpdateInterval;
+  private final long tokenLifetime;
+  private long serialNo = new SecureRandom().nextLong();
+  private KeyGenerator keyGen;
+  private AccessKey currentKey;
+  private AccessKey nextKey;
+  private Map<Long, AccessKey> allKeys;
+
+  public static enum AccessMode {
+    READ, WRITE, COPY, REPLACE
+  };
+
+  /**
+   * Constructor
+   * 
+   * @param isMaster
+   * @param keyUpdateInterval
+   * @param tokenLifetime
+   * @throws IOException
+   */
+  public AccessTokenHandler(boolean isMaster, long keyUpdateInterval,
+      long tokenLifetime) throws IOException {
+    this.isMaster = isMaster;
+    this.keyUpdateInterval = keyUpdateInterval;
+    this.tokenLifetime = tokenLifetime;
+    this.allKeys = new HashMap<Long, AccessKey>();
+    if (isMaster) {
+      try {
+        generateKeys();
+        initMac(currentKey);
+      } catch (GeneralSecurityException e) {
+        throw (IOException) new IOException(
+            "Failed to create AccessTokenHandler").initCause(e);
+      }
+    }
+  }
+
+  /** Initialize access keys */
+  private synchronized void generateKeys() throws NoSuchAlgorithmException {
+    keyGen = KeyGenerator.getInstance("HmacSHA1");
+    /*
+     * Need to set estimated expiry dates for currentKey and nextKey so that if
+     * NN crashes, DN can still expire those keys. NN will stop using the newly
+     * generated currentKey after the first keyUpdateInterval, however it may
+     * still be used by DN and Balancer to generate new tokens before they get a
+     * chance to sync their keys with NN. Since we require keyUpdInterval to be
+     * long enough so that all live DN's and Balancer will sync their keys with
+     * NN at least once during the period, the estimated expiry date for
+     * currentKey is set to now() + 2 * keyUpdateInterval + tokenLifetime.
+     * Similarly, the estimated expiry date for nextKey is one keyUpdateInterval
+     * more.
+     */
+    serialNo++;
+    currentKey = new AccessKey(serialNo, new Text(keyGen.generateKey()
+        .getEncoded()), System.currentTimeMillis() + 2 * keyUpdateInterval
+        + tokenLifetime);
+    serialNo++;
+    nextKey = new AccessKey(serialNo, new Text(keyGen.generateKey()
+        .getEncoded()), System.currentTimeMillis() + 3 * keyUpdateInterval
+        + tokenLifetime);
+    allKeys.put(currentKey.getKeyID(), currentKey);
+    allKeys.put(nextKey.getKeyID(), nextKey);
+  }
+
+  /** Initialize Mac function */
+  private synchronized void initMac(AccessKey key) throws IOException {
+    try {
+      Mac mac = Mac.getInstance("HmacSHA1");
+      mac.init(new SecretKeySpec(key.getKey().getBytes(), "HmacSHA1"));
+      key.setMac(mac);
+    } catch (GeneralSecurityException e) {
+      throw (IOException) new IOException(
+          "Failed to initialize Mac for access key, keyID=" + key.getKeyID())
+          .initCause(e);
+    }
+  }
+
+  /** Export access keys, only to be used in master mode */
+  public synchronized ExportedAccessKeys exportKeys() {
+    if (!isMaster)
+      return null;
+    if (LOG.isDebugEnabled())
+      LOG.debug("Exporting access keys");
+    return new ExportedAccessKeys(true, keyUpdateInterval, tokenLifetime,
+        currentKey, allKeys.values().toArray(new AccessKey[0]));
+  }
+
+  private synchronized void removeExpiredKeys() {
+    long now = System.currentTimeMillis();
+    for (Iterator<Map.Entry<Long, AccessKey>> it = allKeys.entrySet()
+        .iterator(); it.hasNext();) {
+      Map.Entry<Long, AccessKey> e = it.next();
+      if (e.getValue().getExpiryDate() < now) {
+        it.remove();
+      }
+    }
+  }
+
+  /**
+   * Set access keys, only to be used in slave mode
+   */
+  public synchronized void setKeys(ExportedAccessKeys exportedKeys)
+      throws IOException {
+    if (isMaster || exportedKeys == null)
+      return;
+    LOG.info("Setting access keys");
+    removeExpiredKeys();
+    this.currentKey = exportedKeys.getCurrentKey();
+    initMac(currentKey);
+    AccessKey[] receivedKeys = exportedKeys.getAllKeys();
+    for (int i = 0; i < receivedKeys.length; i++) {
+      if (receivedKeys[i] == null)
+        continue;
+      this.allKeys.put(receivedKeys[i].getKeyID(), receivedKeys[i]);
+    }
+  }
+
+  /**
+   * Update access keys, only to be used in master mode
+   */
+  public synchronized void updateKeys() throws IOException {
+    if (!isMaster)
+      return;
+    LOG.info("Updating access keys");
+    removeExpiredKeys();
+    // set final expiry date of retiring currentKey
+    allKeys.put(currentKey.getKeyID(), new AccessKey(currentKey.getKeyID(),
+        currentKey.getKey(), System.currentTimeMillis() + keyUpdateInterval
+            + tokenLifetime));
+    // update the estimated expiry date of new currentKey
+    currentKey = new AccessKey(nextKey.getKeyID(), nextKey.getKey(), System
+        .currentTimeMillis()
+        + 2 * keyUpdateInterval + tokenLifetime);
+    initMac(currentKey);
+    allKeys.put(currentKey.getKeyID(), currentKey);
+    // generate a new nextKey
+    serialNo++;
+    nextKey = new AccessKey(serialNo, new Text(keyGen.generateKey()
+        .getEncoded()), System.currentTimeMillis() + 3 * keyUpdateInterval
+        + tokenLifetime);
+    allKeys.put(nextKey.getKeyID(), nextKey);
+  }
+
+  /** Check if token is well formed */
+  private synchronized Boolean verifyToken(long keyID, AccessToken token)
+      throws IOException {
+    AccessKey key = allKeys.get(keyID);
+    if (key == null) {
+      LOG.warn("Access key for keyID=" + keyID + " doesn't exist.");
+      return false;
+    }
+    if (key.getMac() == null) {
+      initMac(key);
+    }
+    Text tokenID = token.getTokenID();
+    Text authenticator = new Text(key.getMac().doFinal(tokenID.getBytes()));
+    return authenticator.equals(token.getTokenAuthenticator());
+  }
+
+  /** Generate an access token for current user */
+  public AccessToken generateToken(long blockID, EnumSet<AccessMode> modes)
+      throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+    String userID = (ugi == null ? null : ugi.getUserName());
+    return generateToken(userID, blockID, modes);
+  }
+
+  /** Generate an access token for a specified user */
+  public synchronized AccessToken generateToken(String userID, long blockID,
+      EnumSet<AccessMode> modes) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generating access token for user=" + userID + ", blockID="
+          + blockID + ", access modes=" + modes + ", keyID="
+          + currentKey.getKeyID());
+    }
+    if (modes == null || modes.isEmpty())
+      throw new IOException("access modes can't be null or empty");
+    ByteArrayOutputStream buf = new ByteArrayOutputStream(4096);
+    DataOutputStream out = new DataOutputStream(buf);
+    WritableUtils.writeVLong(out, System.currentTimeMillis() + tokenLifetime);
+    WritableUtils.writeVLong(out, currentKey.getKeyID());
+    WritableUtils.writeString(out, userID);
+    WritableUtils.writeVLong(out, blockID);
+    WritableUtils.writeVInt(out, modes.size());
+    for (AccessMode aMode : modes) {
+      WritableUtils.writeEnum(out, aMode);
+    }
+    Text tokenID = new Text(buf.toByteArray());
+    return new AccessToken(tokenID, new Text(currentKey.getMac().doFinal(
+        tokenID.getBytes())));
+  }
+
+  /** Check if access should be allowed. userID is not checked if null */
+  public Boolean checkAccess(AccessToken token, String userID, long blockID,
+      AccessMode mode) throws IOException {
+    long oExpiry = 0;
+    long oKeyID = 0;
+    String oUserID = null;
+    long oBlockID = 0;
+    EnumSet<AccessMode> oModes = EnumSet.noneOf(AccessMode.class);
+
+    try {
+      ByteArrayInputStream buf = new ByteArrayInputStream(token.getTokenID()
+          .getBytes());
+      DataInputStream in = new DataInputStream(buf);
+      oExpiry = WritableUtils.readVLong(in);
+      oKeyID = WritableUtils.readVLong(in);
+      oUserID = WritableUtils.readString(in);
+      oBlockID = WritableUtils.readVLong(in);
+      int length = WritableUtils.readVInt(in);
+      for (int i = 0; i < length; ++i) {
+        oModes.add(WritableUtils.readEnum(in, AccessMode.class));
+      }
+    } catch (IOException e) {
+      throw (IOException) new IOException(
+          "Unable to parse access token for user=" + userID + ", blockID="
+              + blockID + ", access mode=" + mode).initCause(e);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Verifying access token for user=" + userID + ", blockID="
+          + blockID + ", access mode=" + mode + ", keyID=" + oKeyID);
+    }
+    return (userID == null || userID.equals(oUserID)) && oBlockID == blockID
+        && System.currentTimeMillis() < oExpiry && oModes.contains(mode)
+        && verifyToken(oKeyID, token);
+  }
+
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/ExportedAccessKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/ExportedAccessKeys.java?rev=1077086&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/ExportedAccessKeys.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/ExportedAccessKeys.java Fri Mar  4 03:39:02 2011
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * Object for passing access keys
+ */
+public class ExportedAccessKeys implements Writable {
+  public static final ExportedAccessKeys DUMMY_KEYS = new ExportedAccessKeys();
+  private boolean isAccessTokenEnabled;
+  private long keyUpdateInterval;
+  private long tokenLifetime;
+  private AccessKey currentKey;
+  private AccessKey[] allKeys;
+
+  public ExportedAccessKeys() {
+    this(false, 0, 0, new AccessKey(), new AccessKey[0]);
+  }
+
+  ExportedAccessKeys(boolean isAccessTokenEnabled, long keyUpdateInterval,
+      long tokenLifetime, AccessKey currentKey, AccessKey[] allKeys) {
+    this.isAccessTokenEnabled = isAccessTokenEnabled;
+    this.keyUpdateInterval = keyUpdateInterval;
+    this.tokenLifetime = tokenLifetime;
+    this.currentKey = currentKey;
+    this.allKeys = allKeys;
+  }
+
+  public boolean isAccessTokenEnabled() {
+    return isAccessTokenEnabled;
+  }
+
+  public long getKeyUpdateInterval() {
+    return keyUpdateInterval;
+  }
+
+  public long getTokenLifetime() {
+    return tokenLifetime;
+  }
+
+  public AccessKey getCurrentKey() {
+    return currentKey;
+  }
+
+  public AccessKey[] getAllKeys() {
+    return allKeys;
+  }
+
+  static boolean isEqual(Object a, Object b) {
+    return a == null ? b == null : a.equals(b);
+  }
+
+  /** {@inheritDoc} */
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj instanceof ExportedAccessKeys) {
+      ExportedAccessKeys that = (ExportedAccessKeys) obj;
+      return this.isAccessTokenEnabled == that.isAccessTokenEnabled
+          && this.keyUpdateInterval == that.keyUpdateInterval
+          && this.tokenLifetime == that.tokenLifetime
+          && isEqual(this.currentKey, that.currentKey)
+          && Arrays.equals(this.allKeys, that.allKeys);
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public int hashCode() {
+    return currentKey == null ? 0 : currentKey.hashCode();
+  }
+
+  // ///////////////////////////////////////////////
+  // Writable
+  // ///////////////////////////////////////////////
+  static { // register a ctor
+    WritableFactories.setFactory(ExportedAccessKeys.class,
+        new WritableFactory() {
+          public Writable newInstance() {
+            return new ExportedAccessKeys();
+          }
+        });
+  }
+
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(isAccessTokenEnabled);
+    out.writeLong(keyUpdateInterval);
+    out.writeLong(tokenLifetime);
+    currentKey.write(out);
+    out.writeInt(allKeys.length);
+    for (int i = 0; i < allKeys.length; i++) {
+      allKeys[i].write(out);
+    }
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    isAccessTokenEnabled = in.readBoolean();
+    keyUpdateInterval = in.readLong();
+    tokenLifetime = in.readLong();
+    currentKey.readFields(in);
+    this.allKeys = new AccessKey[in.readInt()];
+    for (int i = 0; i < allKeys.length; i++) {
+      allKeys[i] = new AccessKey();
+      allKeys[i].readFields(in);
+    }
+  }
+
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/InvalidAccessTokenException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/InvalidAccessTokenException.java?rev=1077086&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/InvalidAccessTokenException.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/InvalidAccessTokenException.java Fri Mar  4 03:39:02 2011
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.IOException;
+
+/**
+ * Access token verification failed.
+ */
+public class InvalidAccessTokenException extends IOException {
+  private static final long serialVersionUID = 168L;
+
+  public InvalidAccessTokenException() {
+    super();
+  }
+
+  public InvalidAccessTokenException(String msg) {
+    super(msg);
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/hdfs-default.xml?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/hdfs-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/hdfs-default.xml Fri Mar  4 03:39:02 2011
@@ -190,6 +190,29 @@ creations/deletions), or "all".</descrip
 </property>
 
 <property>
+  <name>dfs.access.token.enable</name>
+  <value>false</value>
+  <description>
+    If "true", access tokens are used as capabilities for accessing datanodes.
+    If "false", no access tokens are checked on accessing datanodes.
+  </description>
+</property>
+
+<property>
+  <name>dfs.access.key.update.interval</name>
+  <value>600</value>
+  <description>
+    Interval in minutes at which namenode updates its access keys.
+  </description>
+</property>
+
+<property>
+  <name>dfs.access.token.lifetime</name>
+  <value>600</value>
+  <description>The lifetime of access tokens in minutes.</description>
+</property>
+
+<property>
   <name>dfs.data.dir</name>
   <value>${hadoop.tmp.dir}/dfs/data</value>
   <description>Determines where on the local filesystem an DFS data node

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Mar  4 03:39:02 2011
@@ -34,7 +34,9 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.security.InvalidAccessTokenException;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.*;
 
@@ -638,14 +640,21 @@ public class DFSClient implements FSCons
       ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout
       ) throws IOException {
     //get all block locations
-    final List<LocatedBlock> locatedblocks
+    List<LocatedBlock> locatedblocks
         = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE).getLocatedBlocks();
     final DataOutputBuffer md5out = new DataOutputBuffer();
     int bytesPerCRC = 0;
     long crcPerBlock = 0;
+    boolean refetchBlocks = false;
+    int lastRetriedIndex = -1;
 
     //get block checksum for each block
     for(int i = 0; i < locatedblocks.size(); i++) {
+      if (refetchBlocks) {  // refetch to get fresh tokens
+        locatedblocks = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE)
+            .getLocatedBlocks();
+        refetchBlocks = false;
+      }
       LocatedBlock lb = locatedblocks.get(i);
       final Block block = lb.getBlock();
       final DatanodeInfo[] datanodes = lb.getLocations();
@@ -677,12 +686,28 @@ public class DFSClient implements FSCons
           out.write(DataTransferProtocol.OP_BLOCK_CHECKSUM);
           out.writeLong(block.getBlockId());
           out.writeLong(block.getGenerationStamp());
+          lb.getAccessToken().write(out);
           out.flush();
          
           final short reply = in.readShort();
           if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
-            throw new IOException("Bad response " + reply + " for block "
-                + block + " from datanode " + datanodes[j].getName());
+            if (reply == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN
+                && i > lastRetriedIndex) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+                    + "for file " + src + " for block " + block
+                    + " from datanode " + datanodes[j].getName()
+                    + ". Will retry the block once.");
+              }
+              lastRetriedIndex = i;
+              done = true; // actually it's not done; but we'll retry
+              i--; // repeat at i-th block
+              refetchBlocks = true;
+              break;
+            } else {
+              throw new IOException("Bad response " + reply + " for block "
+                  + block + " from datanode " + datanodes[j].getName());
+            }
           }
 
           //read byte-per-checksum
@@ -1325,24 +1350,26 @@ public class DFSClient implements FSCons
       checksumSize = this.checksum.getChecksumSize();
     }
 
-    public static BlockReader newBlockReader(Socket sock, String file, long blockId, 
+    public static BlockReader newBlockReader(Socket sock, String file, long blockId, AccessToken accessToken, 
         long genStamp, long startOffset, long len, int bufferSize) throws IOException {
-      return newBlockReader(sock, file, blockId, genStamp, startOffset, len, bufferSize,
+      return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
           true);
     }
 
     /** Java Doc required */
     public static BlockReader newBlockReader( Socket sock, String file, long blockId, 
+                                       AccessToken accessToken,
                                        long genStamp,
                                        long startOffset, long len,
                                        int bufferSize, boolean verifyChecksum)
                                        throws IOException {
-      return newBlockReader(sock, file, blockId, genStamp, startOffset,
+      return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset,
                             len, bufferSize, verifyChecksum, "");
     }
 
     public static BlockReader newBlockReader( Socket sock, String file,
                                        long blockId, 
+                                       AccessToken accessToken,
                                        long genStamp,
                                        long startOffset, long len,
                                        int bufferSize, boolean verifyChecksum,
@@ -1360,6 +1387,7 @@ public class DFSClient implements FSCons
       out.writeLong( startOffset );
       out.writeLong( len );
       Text.writeString(out, clientName);
+      accessToken.write(out);
       out.flush();
       
       //
@@ -1370,10 +1398,16 @@ public class DFSClient implements FSCons
           new BufferedInputStream(NetUtils.getInputStream(sock), 
                                   bufferSize));
       
-      if ( in.readShort() != DataTransferProtocol.OP_STATUS_SUCCESS ) {
-        throw new IOException("Got error in response to OP_READ_BLOCK " +
-                              "for file " + file + 
-                              " for block " + blockId);
+      short status = in.readShort();
+      if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
+        if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+          throw new InvalidAccessTokenException(
+              "Got access token error in response to OP_READ_BLOCK "
+                  + "for file " + file + " for block " + blockId);
+        } else {
+          throw new IOException("Got error in response to OP_READ_BLOCK "
+              + "for file " + file + " for block " + blockId);
+        }
       }
       DataChecksum checksum = DataChecksum.newDataChecksum( in );
       //Warning when we get CHECKSUM_NULL?
@@ -1520,7 +1554,7 @@ public class DFSClient implements FSCons
      * @return located block
      * @throws IOException
      */
-    private LocatedBlock getBlockAt(long offset) throws IOException {
+    private synchronized LocatedBlock getBlockAt(long offset) throws IOException {
       assert (locatedBlocks != null) : "locatedBlocks is null";
       // search cached blocks first
       int targetBlockIdx = locatedBlocks.findBlock(offset);
@@ -1540,6 +1574,32 @@ public class DFSClient implements FSCons
       return blk;
     }
 
+    /** Fetch a block from namenode and cache it */
+    private synchronized void fetchAndCacheBlockAt(long offset) throws IOException {
+      int targetBlockIdx = locatedBlocks.findBlock(offset);
+      if (targetBlockIdx < 0) { // block is not cached
+        targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+      }
+      // fetch blocks
+      LocatedBlocks newBlocks;
+      newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
+      if (newBlocks == null) {
+        throw new IOException("Could not find target position " + offset);
+      }
+      locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+    }
+
+    /** Fetch a block without caching */
+    private LocatedBlock fetchBlockAt(long offset) throws IOException {
+      LocatedBlocks newBlocks;
+      newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
+      if (newBlocks == null) {
+        throw new IOException("Could not find target position " + offset);
+      }
+      int index = newBlocks.findBlock(offset);
+      return newBlocks.get(index);
+    }
+    
     /**
      * Get blocks in the specified range.
      * Fetch them from the namenode if not cached.
@@ -1601,17 +1661,18 @@ public class DFSClient implements FSCons
       }
 
       //
-      // Compute desired block
-      //
-      LocatedBlock targetBlock = getBlockAt(target);
-      assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
-      long offsetIntoBlock = target - targetBlock.getStartOffset();
-
-      //
       // Connect to best DataNode for desired Block, with potential offset
       //
       DatanodeInfo chosenNode = null;
-      while (s == null) {
+      int refetchToken = 1; // only need to get a new access token once
+      while (true) {
+        //
+        // Compute desired block
+        //
+        LocatedBlock targetBlock = getBlockAt(target);
+        assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
+        long offsetIntoBlock = target - targetBlock.getStartOffset();
+
         DNAddrPair retval = chooseDataNode(targetBlock);
         chosenNode = retval.info;
         InetSocketAddress targetAddr = retval.addr;
@@ -1621,17 +1682,33 @@ public class DFSClient implements FSCons
           NetUtils.connect(s, targetAddr, socketTimeout);
           s.setSoTimeout(socketTimeout);
           Block blk = targetBlock.getBlock();
+          AccessToken accessToken = targetBlock.getAccessToken();
           
           blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
+              accessToken, 
               blk.getGenerationStamp(),
               offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
               buffersize, verifyChecksum, clientName);
           return chosenNode;
         } catch (IOException ex) {
-          // Put chosen node into dead list, continue
           LOG.debug("Failed to connect to " + targetAddr + ":" 
                     + StringUtils.stringifyException(ex));
-          addToDeadNodes(chosenNode);
+          if (ex instanceof InvalidAccessTokenException && refetchToken-- > 0) {
+            /*
+             * Get a new access token and retry. Retry is needed in 2 cases. 1)
+             * When both NN and DN re-started while DFSClient holding a cached
+             * access token. 2) In the case that NN fails to update its
+             * access key at pre-set interval (by a wide margin) and
+             * subsequently restarts. In this case, DN re-registers itself with
+             * NN and receives a new access key, but DN will delete the old
+             * access key from its memory since it's considered expired based on
+             * the estimated expiration date.
+             */
+            fetchAndCacheBlockAt(target);
+          } else {
+            // Put chosen node into dead list, continue
+            addToDeadNodes(chosenNode);
+          }
           if (s != null) {
             try {
               s.close();
@@ -1641,7 +1718,6 @@ public class DFSClient implements FSCons
           s = null;
         }
       }
-      return chosenNode;
     }
 
     /**
@@ -1811,6 +1887,7 @@ public class DFSClient implements FSCons
       Socket dn = null;
       int numAttempts = block.getLocations().length;
       IOException ioe = null;
+      int refetchToken = 1; // only need to get a new access token once
       
       while (dn == null && numAttempts-- > 0 ) {
         DNAddrPair retval = chooseDataNode(block);
@@ -1822,11 +1899,13 @@ public class DFSClient implements FSCons
           dn = socketFactory.createSocket();
           NetUtils.connect(dn, targetAddr, socketTimeout);
           dn.setSoTimeout(socketTimeout);
+          AccessToken accessToken = block.getAccessToken();
               
           int len = (int) (end - start + 1);
               
           reader = BlockReader.newBlockReader(dn, src, 
                                               block.getBlock().getBlockId(),
+                                              accessToken,
                                               block.getBlock().getGenerationStamp(),
                                               start, len, buffersize, 
                                               verifyChecksum, clientName);
@@ -1844,10 +1923,20 @@ public class DFSClient implements FSCons
           reportChecksumFailure(src, block.getBlock(), chosenNode);
         } catch (IOException e) {
           ioe = e;
-          LOG.warn("Failed to connect to " + targetAddr + 
-                   " for file " + src + 
-                   " for block " + block.getBlock().getBlockId() + ":"  +
-                   StringUtils.stringifyException(e));
+          if (e instanceof InvalidAccessTokenException && refetchToken-- > 0) {
+            LOG.info("Invalid access token when connecting to " + targetAddr
+                + " for file " + src + " for block "
+                + block.getBlock() + ":"
+                + StringUtils.stringifyException(e)
+                + ", get a new access token and retry...");
+            block = fetchBlockAt(block.getStartOffset());
+            numAttempts = block.getLocations().length;
+            continue;
+          } else {
+            LOG.warn("Failed to connect to " + targetAddr + " for file " + src
+                + " for block " + block.getBlock() + ":"
+                + StringUtils.stringifyException(e));
+          }
         } finally {
           IOUtils.closeStream(reader);
           IOUtils.closeSocket(dn);
@@ -2081,6 +2170,7 @@ public class DFSClient implements FSCons
     private DataOutputStream blockStream;
     private DataInputStream blockReplyStream;
     private Block block;
+    private AccessToken accessToken;
     final private long blockSize;
     private DataChecksum checksum;
     private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
@@ -2600,6 +2690,7 @@ public class DFSClient implements FSCons
         //
         if (newBlock != null) {
           block = newBlock.getBlock();
+          accessToken = newBlock.getAccessToken();
           nodes = newBlock.getLocations();
         }
 
@@ -2785,6 +2876,7 @@ public class DFSClient implements FSCons
         long startTime = System.currentTimeMillis();
         lb = locateFollowingBlock(startTime);
         block = lb.getBlock();
+        accessToken = lb.getAccessToken();
         nodes = lb.getLocations();
   
         //
@@ -2861,6 +2953,7 @@ public class DFSClient implements FSCons
         for (int i = 1; i < nodes.length; i++) {
           nodes[i].write(out);
         }
+        accessToken.write(out);
         checksum.writeHeader( out );
         out.flush();
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Mar  4 03:39:02 2011
@@ -40,9 +40,9 @@ public interface ClientProtocol extends 
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 41: saveNamespace introduced.
+   * 42: All LocatedBlock objects contain access tokens
    */
-  public static final long versionID = 41L;
+  public static final long versionID = 42L;
   
   ///////////////////////////////////////
   // File contents

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Fri Mar  4 03:39:02 2011
@@ -31,15 +31,11 @@ public interface DataTransferProtocol {
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 14:
-   *    OP_REPLACE_BLOCK is sent from the Balancer server to the destination,
-   *    including the block id, source, and proxy.
-   *    OP_COPY_BLOCK is sent from the destination to the proxy, which contains
-   *    only the block id.
-   *    A reply to OP_COPY_BLOCK sends the block content.
-   *    A reply to OP_REPLACE_BLOCK includes an operation status.
+   * Version 15:
+   *    Added a new status OP_STATUS_ERROR_ACCESS_TOKEN
+   *    Access token is now required on all DN operations
    */
-  public static final int DATA_TRANSFER_VERSION = 14;
+  public static final int DATA_TRANSFER_VERSION = 15;
 
   // Processed at datanode stream-handler
   public static final byte OP_WRITE_BLOCK = (byte) 80;
@@ -54,7 +50,8 @@ public interface DataTransferProtocol {
   public static final int OP_STATUS_ERROR_CHECKSUM = 2;  
   public static final int OP_STATUS_ERROR_INVALID = 3;  
   public static final int OP_STATUS_ERROR_EXISTS = 4;  
-  public static final int OP_STATUS_CHECKSUM_OK = 5;  
+  public static final int OP_STATUS_ERROR_ACCESS_TOKEN = 5;
+  public static final int OP_STATUS_CHECKSUM_OK = 6;
 
 
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Fri Mar  4 03:39:02 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.protocol;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.security.AccessToken;
 
 import java.io.*;
 
@@ -43,6 +44,7 @@ public class LocatedBlock implements Wri
   // else false. If block has few corrupt replicas, they are filtered and 
   // their locations are not part of this object
   private boolean corrupt;
+  private AccessToken accessToken = new AccessToken();
 
   /**
    */
@@ -76,6 +78,14 @@ public class LocatedBlock implements Wri
     }
   }
 
+  public AccessToken getAccessToken() {
+    return accessToken;
+  }
+
+  public void setAccessToken(AccessToken token) {
+    this.accessToken = token;
+  }
+
   /**
    */
   public Block getBlock() {
@@ -112,6 +122,7 @@ public class LocatedBlock implements Wri
   // Writable
   ///////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
+    accessToken.write(out);
     out.writeBoolean(corrupt);
     out.writeLong(offset);
     b.write(out);
@@ -122,6 +133,7 @@ public class LocatedBlock implements Wri
   }
 
   public void readFields(DataInput in) throws IOException {
+    accessToken.readFields(in);
     this.corrupt = in.readBoolean();
     offset = in.readLong();
     this.b = new Block();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Mar  4 03:39:02 2011
@@ -33,6 +33,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -71,8 +72,12 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.security.AccessTokenHandler;
+import org.apache.hadoop.security.ExportedAccessKeys;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -190,6 +195,11 @@ public class Balancer implements Tool {
   private NamenodeProtocol namenode;
   private ClientProtocol client;
   private FileSystem fs;
+  private boolean isAccessTokenEnabled;
+  private boolean shouldRun;
+  private long keyUpdaterInterval;
+  private AccessTokenHandler accessTokenHandler;
+  private Daemon keyupdaterthread = null; // AccessKeyUpdater thread
   private final static Random rnd = new Random();
   
   // all data node lists
@@ -359,6 +369,13 @@ public class Balancer implements Tool {
       out.writeLong(block.getBlock().getGenerationStamp());
       Text.writeString(out, source.getStorageID());
       proxySource.write(out);
+      AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+      if (isAccessTokenEnabled) {
+        accessToken = accessTokenHandler.generateToken(null, block.getBlock()
+            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.REPLACE,
+            AccessTokenHandler.AccessMode.COPY));
+      }
+      accessToken.write(out);
       out.flush();
     }
     
@@ -366,6 +383,8 @@ public class Balancer implements Tool {
     private void receiveResponse(DataInputStream in) throws IOException {
       short status = in.readShort();
       if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
+        if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN)
+          throw new IOException("block move failed due to access token error");
         throw new IOException("block move is failed");
       }
     }
@@ -841,6 +860,48 @@ public class Balancer implements Tool {
     this.namenode = createNamenode(conf);
     this.client = DFSClient.createNamenode(conf);
     this.fs = FileSystem.get(conf);
+    ExportedAccessKeys keys = namenode.getAccessKeys();
+    this.isAccessTokenEnabled = keys.isAccessTokenEnabled();
+    if (isAccessTokenEnabled) {
+      long accessKeyUpdateInterval = keys.getKeyUpdateInterval();
+      long accessTokenLifetime = keys.getTokenLifetime();
+      LOG.info("Access token params received from NN: keyUpdateInterval="
+          + accessKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+          + accessTokenLifetime / (60 * 1000) + " min(s)");
+      this.accessTokenHandler = new AccessTokenHandler(false,
+          accessKeyUpdateInterval, accessTokenLifetime);
+      this.accessTokenHandler.setKeys(keys);
+      /*
+       * Balancer should sync its access keys with NN more frequently than NN
+       * updates its access keys
+       */
+      this.keyUpdaterInterval = accessKeyUpdateInterval / 4;
+      LOG.info("Balancer will update its access keys every "
+          + keyUpdaterInterval / (60 * 1000) + " minute(s)");
+      this.keyupdaterthread = new Daemon(new AccessKeyUpdater());
+      this.shouldRun = true;
+      this.keyupdaterthread.start();
+    }
+  }
+  
+  /**
+   * Periodically updates access keys.
+   */
+  class AccessKeyUpdater implements Runnable {
+
+    public void run() {
+      while (shouldRun) {
+        try {
+          accessTokenHandler.setKeys(namenode.getAccessKeys());
+        } catch (Exception e) {
+          LOG.error(StringUtils.stringifyException(e));
+        }
+        try {
+          Thread.sleep(keyUpdaterInterval);
+        } catch (InterruptedException ie) {
+        }
+      }
+    }
   }
   
   /* Build a NamenodeProtocol connection to the namenode and
@@ -857,6 +918,7 @@ public class Balancer implements Tool {
     Map<String,RetryPolicy> methodNameToPolicyMap =
         new HashMap<String, RetryPolicy>();
     methodNameToPolicyMap.put("getBlocks", methodPolicy);
+    methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
 
     UserGroupInformation ugi;
     try {
@@ -1502,6 +1564,12 @@ public class Balancer implements Tool {
       dispatcherExecutor.shutdownNow();
       moverExecutor.shutdownNow();
 
+      shouldRun = false;
+      try {
+        if (keyupdaterthread != null) keyupdaterthread.interrupt();
+      } catch (Exception e) {
+        LOG.warn("Exception shutting down access key updater thread", e);
+      }
       // close the output file
       IOUtils.closeStream(out); 
       if (fs != null) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Mar  4 03:39:02 2011
@@ -34,6 +34,7 @@ import java.security.SecureRandom;
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -71,6 +72,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
@@ -82,6 +84,9 @@ import org.apache.hadoop.ipc.RemoteExcep
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.security.AccessTokenHandler;
+import org.apache.hadoop.security.ExportedAccessKeys;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.authorize.ConfiguredPolicy;
 import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -189,6 +194,9 @@ public class DataNode extends Configured
   int socketWriteTimeout = 0;  
   boolean transferToAllowed = true;
   int writePacketSize = 0;
+  boolean isAccessTokenEnabled;
+  AccessTokenHandler accessTokenHandler;
+  boolean isAccessTokenInitialized = false;
   
   public DataBlockScanner blockScanner = null;
   public Daemon blockScannerThread = null;
@@ -556,6 +564,27 @@ public class DataNode extends Configured
           + ". Expecting " + storage.getStorageID());
     }
     
+    if (!isAccessTokenInitialized) {
+      /* first time registering with NN */
+      ExportedAccessKeys keys = dnRegistration.exportedKeys;
+      this.isAccessTokenEnabled = keys.isAccessTokenEnabled();
+      if (isAccessTokenEnabled) {
+        long accessKeyUpdateInterval = keys.getKeyUpdateInterval();
+        long accessTokenLifetime = keys.getTokenLifetime();
+        LOG.info("Access token params received from NN: keyUpdateInterval="
+            + accessKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+            + accessTokenLifetime / (60 * 1000) + " min(s)");
+        this.accessTokenHandler = new AccessTokenHandler(false,
+            accessKeyUpdateInterval, accessTokenLifetime);
+      }
+      isAccessTokenInitialized = true;
+    }
+
+    if (isAccessTokenEnabled) {
+      accessTokenHandler.setKeys(dnRegistration.exportedKeys);
+      dnRegistration.exportedKeys = ExportedAccessKeys.DUMMY_KEYS;
+    }
+
     // random short delay - helps scatter the BR from all DNs
     scheduleBlockReport(initialBlockReportDelay);
   }
@@ -920,6 +949,12 @@ public class DataNode extends Configured
     case DatanodeProtocol.DNA_RECOVERBLOCK:
       recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
       break;
+    case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
+      LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
+      if (isAccessTokenEnabled) {
+        accessTokenHandler.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
+      }
+      break;
     default:
       LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
     }
@@ -1176,6 +1211,12 @@ public class DataNode extends Configured
         for (int i = 1; i < targets.length; i++) {
           targets[i].write(out);
         }
+        AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+        if (isAccessTokenEnabled) {
+          accessToken = accessTokenHandler.generateToken(null, b.getBlockId(),
+              EnumSet.of(AccessTokenHandler.AccessMode.WRITE));
+        }
+        accessToken.write(out);
         // send data & checksum
         blockSender.sendBlock(out, baseStream, null);
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Mar  4 03:39:02 2011
@@ -38,6 +38,8 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.security.AccessTokenHandler;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
@@ -153,12 +155,26 @@ class DataXceiver implements Runnable, F
     long startOffset = in.readLong();
     long length = in.readLong();
     String clientName = Text.readString(in);
-    // send the block
+    AccessToken accessToken = new AccessToken();
+    accessToken.readFields(in);
     OutputStream baseStream = NetUtils.getOutputStream(s, 
         datanode.socketWriteTimeout);
     DataOutputStream out = new DataOutputStream(
                  new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
     
+    if (datanode.isAccessTokenEnabled
+        && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
+            AccessTokenHandler.AccessMode.READ)) {
+      try {
+        out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+        out.flush();
+        throw new IOException("Access token verification failed, on client "
+            + "request for reading block " + block);
+      } finally {
+        IOUtils.closeStream(out);
+      }
+    }
+    // send the block
     BlockSender blockSender = null;
     final String clientTraceFmt =
       clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
@@ -246,10 +262,28 @@ class DataXceiver implements Runnable, F
       tmp.readFields(in);
       targets[i] = tmp;
     }
+    AccessToken accessToken = new AccessToken();
+    accessToken.readFields(in);
+    DataOutputStream replyOut = null;   // stream to prev target
+    replyOut = new DataOutputStream(
+                   NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+    if (datanode.isAccessTokenEnabled
+        && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
+            .getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
+      try {
+        if (client.length() != 0) {
+          Text.writeString(replyOut, datanode.dnRegistration.getName());
+          replyOut.flush();
+        }
+        throw new IOException("Access token verification failed, on client "
+            + "request for writing block " + block);
+      } finally {
+        IOUtils.closeStream(replyOut);
+      }
+    }
 
     DataOutputStream mirrorOut = null;  // stream to next target
     DataInputStream mirrorIn = null;    // reply from next target
-    DataOutputStream replyOut = null;   // stream to prev target
     Socket mirrorSock = null;           // socket to next target
     BlockReceiver blockReceiver = null; // responsible for data handling
     String mirrorNode = null;           // the name:port of next target
@@ -261,10 +295,6 @@ class DataXceiver implements Runnable, F
           s.getLocalSocketAddress().toString(),
           isRecovery, client, srcDataNode, datanode);
 
-      // get a connection back to the previous target
-      replyOut = new DataOutputStream(
-                     NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
-
       //
       // Open network conn to backup machine, if 
       // appropriate
@@ -304,6 +334,7 @@ class DataXceiver implements Runnable, F
           for ( int i = 1; i < targets.length; i++ ) {
             targets[i].write( mirrorOut );
           }
+          accessToken.write(mirrorOut);
 
           blockReceiver.writeChecksumHeader(mirrorOut);
           mirrorOut.flush();
@@ -429,8 +460,24 @@ class DataXceiver implements Runnable, F
    */
   void getBlockChecksum(DataInputStream in) throws IOException {
     final Block block = new Block(in.readLong(), 0 , in.readLong());
+    AccessToken accessToken = new AccessToken();
+    accessToken.readFields(in);
+    DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
+        datanode.socketWriteTimeout));
+    if (datanode.isAccessTokenEnabled
+        && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
+            .getBlockId(), AccessTokenHandler.AccessMode.READ)) {
+      try {
+        out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+        out.flush();
+        throw new IOException(
+            "Access token verification failed, on getBlockChecksum() "
+                + "for block " + block);
+      } finally {
+        IOUtils.closeStream(out);
+      }
+    }
 
-    DataOutputStream out = null;
     final MetaDataInputStream metadataIn = datanode.data.getMetaDataInputStream(block);
     final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
         metadataIn, BUFFER_SIZE));
@@ -452,8 +499,6 @@ class DataXceiver implements Runnable, F
       }
 
       //write reply
-      out = new DataOutputStream(
-          NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
       out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
       out.writeInt(bytesPerCRC);
       out.writeLong(crcPerBlock);
@@ -476,10 +521,24 @@ class DataXceiver implements Runnable, F
     // Read in the header
     long blockId = in.readLong(); // read block id
     Block block = new Block(blockId, 0, in.readLong());
+    AccessToken accessToken = new AccessToken();
+    accessToken.readFields(in);
+    if (datanode.isAccessTokenEnabled
+        && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
+            AccessTokenHandler.AccessMode.COPY)) {
+      LOG.warn("Invalid access token in request from "
+          + s.getRemoteSocketAddress() + " for copying block " + block);
+      sendResponse(s,
+          (short) DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
+          datanode.socketWriteTimeout);
+      return;
+    }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
       LOG.info("Not able to copy block " + blockId + " to " 
           + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
+      sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR, 
+          datanode.socketWriteTimeout);
       return;
     }
 
@@ -498,6 +557,8 @@ class DataXceiver implements Runnable, F
       reply = new DataOutputStream(new BufferedOutputStream(
           baseStream, SMALL_BUFFER_SIZE));
 
+      // send status first
+      reply.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
       // send block content to the target
       long read = blockSender.sendBlock(reply, baseStream, 
                                         dataXceiverServer.balanceThrottler);
@@ -538,6 +599,17 @@ class DataXceiver implements Runnable, F
     String sourceID = Text.readString(in); // read del hint
     DatanodeInfo proxySource = new DatanodeInfo(); // read proxy source
     proxySource.readFields(in);
+    AccessToken accessToken = new AccessToken();
+    accessToken.readFields(in);
+    if (datanode.isAccessTokenEnabled
+        && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
+            AccessTokenHandler.AccessMode.REPLACE)) {
+      LOG.warn("Invalid access token in request from "
+          + s.getRemoteSocketAddress() + " for replacing block " + block);
+      sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
+          datanode.socketWriteTimeout);
+      return;
+    }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
       LOG.warn("Not able to receive block " + blockId + " from " 
@@ -571,11 +643,22 @@ class DataXceiver implements Runnable, F
       proxyOut.writeByte(DataTransferProtocol.OP_COPY_BLOCK); // op code
       proxyOut.writeLong(block.getBlockId()); // block id
       proxyOut.writeLong(block.getGenerationStamp()); // block id
+      accessToken.write(proxyOut);
       proxyOut.flush();
 
       // receive the response from the proxy
       proxyReply = new DataInputStream(new BufferedInputStream(
           NetUtils.getInputStream(proxySock), BUFFER_SIZE));
+      short status = proxyReply.readShort();
+      if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
+        if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+          throw new IOException("Copy block " + block + " from "
+              + proxySock.getRemoteSocketAddress()
+              + " failed due to access token error");
+        }
+        throw new IOException("Copy block " + block + " from "
+            + proxySock.getRemoteSocketAddress() + " failed");
+      }
       // open a block receiver and check if the block does not exist
       blockReceiver = new BlockReceiver(
           block, proxyReply, proxySock.getRemoteSocketAddress().toString(),

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Fri Mar  4 03:39:02 2011
@@ -90,6 +90,7 @@ public class DatanodeDescriptor extends 
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   protected boolean isAlive = false;
+  protected boolean needKeyUpdate = false;
 
   /** A queue of blocks to be replicated by this datanode */
   private BlockQueue replicateBlocks = new BlockQueue();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Mar  4 03:39:02 2011
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.AccessTokenHandler;
+import org.apache.hadoop.security.ExportedAccessKeys;
 import org.apache.hadoop.security.PermissionChecker;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.fs.ContentSummary;
@@ -130,6 +133,10 @@ public class FSNamesystem implements FSC
   private FSNamesystemMetrics myFSMetrics;
   private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
   private int totalLoad = 0;
+  boolean isAccessTokenEnabled;
+  AccessTokenHandler accessTokenHandler;
+  private long accessKeyUpdateInterval;
+  private long accessTokenLifetime;
 
   volatile long pendingReplicationBlocksCount = 0L;
   volatile long corruptReplicaBlocksCount = 0L;
@@ -329,6 +336,10 @@ public class FSNamesystem implements FSC
 
     this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
                                            conf.get("dfs.hosts.exclude",""));
+    if (isAccessTokenEnabled) {
+      accessTokenHandler = new AccessTokenHandler(true,
+          accessKeyUpdateInterval, accessTokenLifetime);
+    }
     this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
         conf.getInt("dfs.namenode.decommission.interval", 30),
         conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));
@@ -438,6 +449,18 @@ public class FSNamesystem implements FSC
                                          20*(int)(heartbeatInterval/1000));
     this.accessTimePrecision = conf.getLong("dfs.access.time.precision", 0);
     this.supportAppends = conf.getBoolean("dfs.support.append", false);
+    this.isAccessTokenEnabled = conf.getBoolean(
+        AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false);
+    if (isAccessTokenEnabled) {
+      this.accessKeyUpdateInterval = conf.getLong(
+          AccessTokenHandler.STRING_ACCESS_KEY_UPDATE_INTERVAL, 600) * 60 * 1000L; // 10 hrs
+      this.accessTokenLifetime = conf.getLong(
+          AccessTokenHandler.STRING_ACCESS_TOKEN_LIFETIME, 600) * 60 * 1000L; // 10 hrs
+    }
+    LOG.info("isAccessTokenEnabled=" + isAccessTokenEnabled
+        + " accessKeyUpdateInterval=" + accessKeyUpdateInterval / (60 * 1000)
+        + " min(s), accessTokenLifetime=" + accessTokenLifetime / (60 * 1000)
+        + " min(s)");
   }
 
   /**
@@ -654,6 +677,16 @@ public class FSNamesystem implements FSC
   }
   
   /**
+   * Get access keys
+   * 
+   * @return current access keys
+   */
+  ExportedAccessKeys getAccessKeys() {
+    return isAccessTokenEnabled ? accessTokenHandler.exportKeys()
+        : ExportedAccessKeys.DUMMY_KEYS;
+  }
+
+  /**
    * Get all valid locations of the block & add the block to results
    * return the length of the added block; 0 if the block is not added
    */
@@ -844,8 +877,13 @@ public class FSNamesystem implements FSC
             machineSet[numNodes++] = dn;
         }
       }
-      results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos,
-                  blockCorrupt));
+      LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
+          blockCorrupt);
+      if (isAccessTokenEnabled) {
+        b.setAccessToken(accessTokenHandler.generateToken(b.getBlock()
+            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.READ)));
+      }
+      results.add(b); 
       curPos += blocks[curBlk].getNumBytes();
       curBlk++;
     } while (curPos < endOff 
@@ -1193,6 +1231,10 @@ public class FSNamesystem implements FSC
 
           lb = new LocatedBlock(last, targets, 
                                 fileLength-storedBlock.getNumBytes());
+          if (isAccessTokenEnabled) {
+            lb.setAccessToken(accessTokenHandler.generateToken(lb.getBlock()
+                .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+          }
 
           // Remove block from replication queue.
           updateNeededReplications(last, 0, 0);
@@ -1302,7 +1344,12 @@ public class FSNamesystem implements FSC
     }
         
     // Create next block
-    return new LocatedBlock(newBlock, targets, fileLength);
+    LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);
+    if (isAccessTokenEnabled) {
+      b.setAccessToken(accessTokenHandler.generateToken(b.getBlock()
+          .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+    }
+    return b;
   }
 
   /**
@@ -2055,6 +2102,7 @@ public class FSNamesystem implements FSC
                                       nodeReg.getInfoPort(),
                                       nodeReg.getIpcPort());
     nodeReg.updateRegInfo(dnReg);
+    nodeReg.exportedKeys = getAccessKeys();
       
     NameNode.stateChangeLog.info(
                                  "BLOCK* NameSystem.registerDatanode: "
@@ -2254,7 +2302,7 @@ public class FSNamesystem implements FSC
           return new DatanodeCommand[] {cmd};
         }
       
-        ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(2);
+        ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);
         //check pending replication
         cmd = nodeinfo.getReplicationCommand(
               maxReplicationStreams - xmitsInProgress);
@@ -2266,6 +2314,11 @@ public class FSNamesystem implements FSC
         if (cmd != null) {
           cmds.add(cmd);
         }
+        // check access key update
+        if (isAccessTokenEnabled && nodeinfo.needKeyUpdate) {
+          cmds.add(new KeyUpdateCommand(accessTokenHandler.exportKeys()));
+          nodeinfo.needKeyUpdate = false;
+        }
         if (!cmds.isEmpty()) {
           return cmds.toArray(new DatanodeCommand[cmds.size()]);
         }
@@ -2297,21 +2350,44 @@ public class FSNamesystem implements FSC
       totalLoad -= node.getXceiverCount();
     }
   }
+
+  /**
+   * Update access keys.
+   */
+  void updateAccessKey() throws IOException {
+    this.accessTokenHandler.updateKeys();
+    synchronized (heartbeats) {
+      for (DatanodeDescriptor nodeInfo : heartbeats) {
+        nodeInfo.needKeyUpdate = true;
+      }
+    }
+  }
+
   /**
-   * Periodically calls heartbeatCheck().
+   * Periodically calls heartbeatCheck() and updateAccessKey()
    */
   class HeartbeatMonitor implements Runnable {
+    private long lastHeartbeatCheck;
+    private long lastAccessKeyUpdate;
     /**
      */
     public void run() {
       while (fsRunning) {
         try {
-          heartbeatCheck();
+          long now = now();
+          if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
+            heartbeatCheck();
+            lastHeartbeatCheck = now;
+          }
+          if (isAccessTokenEnabled && (lastAccessKeyUpdate + accessKeyUpdateInterval < now)) {
+            updateAccessKey();
+            lastAccessKeyUpdate = now;
+          }
         } catch (Exception e) {
           FSNamesystem.LOG.error(StringUtils.stringifyException(e));
         }
         try {
-          Thread.sleep(heartbeatRecheckInterval);
+          Thread.sleep(5000);  // 5 seconds
         } catch (InterruptedException ie) {
         }
       }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java Fri Mar  4 03:39:02 2011
@@ -116,7 +116,7 @@ public class JspHelper {
     return chosenNode;
   }
   public void streamBlockInAscii(InetSocketAddress addr, long blockId, 
-                                 long genStamp, long blockSize, 
+                                 AccessToken accessToken, long genStamp, long blockSize, 
                                  long offsetIntoBlock, long chunkSizeToView, JspWriter out) 
     throws IOException {
     if (chunkSizeToView == 0) return;
@@ -129,7 +129,7 @@ public class JspHelper {
       // Use the block name for file name. 
       DFSClient.BlockReader blockReader = 
         DFSClient.BlockReader.newBlockReader(s, addr.toString() + ":" + blockId,
-                                             blockId, genStamp ,offsetIntoBlock, 
+                                             blockId, accessToken, genStamp ,offsetIntoBlock, 
                                              amtToRead, 
                                              conf.getInt("io.file.buffer.size",
                                                          4096));

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Mar  4 03:39:02 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
@@ -47,6 +48,8 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.AccessKey;
+import org.apache.hadoop.security.ExportedAccessKeys;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -729,6 +732,11 @@ public class NameNode implements ClientP
     }
   }
 
+  /** {@inheritDoc} */
+  public ExportedAccessKeys getAccessKeys() throws IOException {
+    return namesystem.getAccessKeys();
+  }
+
   /**
    */
   public void errorReport(DatanodeRegistration nodeReg,

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Fri Mar  4 03:39:02 2011
@@ -425,6 +425,7 @@ public class NamenodeFsck {
           DFSClient.BlockReader.newBlockReader(s, targetAddr.toString() + ":" + 
                                                block.getBlockId(), 
                                                block.getBlockId(), 
+                                               lblock.getAccessToken(),
                                                block.getGenerationStamp(), 
                                                0, -1,
                                                conf.getInt("io.file.buffer.size", 4096));

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Fri Mar  4 03:39:02 2011
@@ -35,10 +35,10 @@ import org.apache.hadoop.ipc.VersionedPr
  **********************************************************************/
 public interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 19: SendHeartbeat returns an array of DatanodeCommand objects
-   *     in stead of a DatanodeCommand object.
+   * 20: SendHeartbeat may return KeyUpdateCommand
+   *     Register returns access keys inside DatanodeRegistration object
    */
-  public static final long versionID = 19L;
+  public static final long versionID = 20L;
   
   // error code
   final static int NOTIFY = 0;
@@ -57,6 +57,7 @@ public interface DatanodeProtocol extend
   final static int DNA_REGISTER = 4;   // re-register
   final static int DNA_FINALIZE = 5;   // finalize previous upgrade
   final static int DNA_RECOVERBLOCK = 6;  // request a block recovery
+  final static int DNA_ACCESSKEYUPDATE = 7;  // update access key
 
   /** 
    * Register Datanode.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java Fri Mar  4 03:39:02 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.security.ExportedAccessKeys;
 
 /** 
  * DatanodeRegistration class conatins all information the Namenode needs
@@ -46,6 +47,7 @@ public class DatanodeRegistration extend
   }
 
   public StorageInfo storageInfo;
+  public ExportedAccessKeys exportedKeys;
 
   /**
    * Default constructor.
@@ -60,6 +62,7 @@ public class DatanodeRegistration extend
   public DatanodeRegistration(String nodeName) {
     super(nodeName);
     this.storageInfo = new StorageInfo();
+    this.exportedKeys = new ExportedAccessKeys();
   }
   
   public void setInfoPort(int infoPort) {
@@ -112,6 +115,7 @@ public class DatanodeRegistration extend
     out.writeInt(storageInfo.getLayoutVersion());
     out.writeInt(storageInfo.getNamespaceID());
     out.writeLong(storageInfo.getCTime());
+    exportedKeys.write(out);
   }
 
   /** {@inheritDoc} */
@@ -124,5 +128,6 @@ public class DatanodeRegistration extend
     storageInfo.layoutVersion = in.readInt();
     storageInfo.namespaceID = in.readInt();
     storageInfo.cTime = in.readLong();
+    exportedKeys.readFields(in);
   }
 }