You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:39:04 UTC
svn commit: r1077086 [1/2] - in
/hadoop/common/branches/branch-0.20-security-patches/src:
core/org/apache/hadoop/security/ hdfs/ hdfs/org/apache/hadoop/hdfs/
hdfs/org/apache/hadoop/hdfs/protocol/
hdfs/org/apache/hadoop/hdfs/server/balancer/ hdfs/org/ap...
Author: omalley
Date: Fri Mar 4 03:39:02 2011
New Revision: 1077086
URL: http://svn.apache.org/viewvc?rev=1077086&view=rev
Log:
commit 06c9b3bab34e19faa6e62c8dfaf7756fabef5c58
Author: Jitendra Nath Pandey <ji...@yahoo-inc.com>
Date: Tue Dec 22 00:16:39 2009 -0800
HADOOP-4359 from https://issues.apache.org/jira/secure/attachment/12428711/HADOOP-4359-0_20.2.patch
+++ b/YAHOO-CHANGES.txt
+ HADOOP-4359. Access Token: Support for data access authorization
+ checking on DataNodes. (Jitendra Nath Pandey)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessKey.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessToken.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessTokenHandler.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/ExportedAccessKeys.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/InvalidAccessTokenException.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestAccessToken.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/hdfs-default.xml
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
hadoop/common/branches/branch-0.20-security-patches/src/webapps/datanode/browseBlock.jsp
hadoop/common/branches/branch-0.20-security-patches/src/webapps/datanode/tail.jsp
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessKey.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessKey.java?rev=1077086&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessKey.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessKey.java Fri Mar 4 03:39:02 2011
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import javax.crypto.Mac;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Key used for generating and verifying access tokens
+ */
+public class AccessKey implements Writable {
+ private long keyID;
+ private Text key;
+ private long expiryDate;
+ private Mac mac;
+
+ public AccessKey() {
+ this(0L, new Text(), 0L);
+ }
+
+ public AccessKey(long keyID, Text key, long expiryDate) {
+ this.keyID = keyID;
+ this.key = key;
+ this.expiryDate = expiryDate;
+ }
+
+ public long getKeyID() {
+ return keyID;
+ }
+
+ public Text getKey() {
+ return key;
+ }
+
+ public long getExpiryDate() {
+ return expiryDate;
+ }
+
+ public Mac getMac() {
+ return mac;
+ }
+
+ public void setMac(Mac mac) {
+ this.mac = mac;
+ }
+
+ static boolean isEqual(Object a, Object b) {
+ return a == null ? b == null : a.equals(b);
+ }
+
+ /** {@inheritDoc} */
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj instanceof AccessKey) {
+ AccessKey that = (AccessKey) obj;
+ return this.keyID == that.keyID && isEqual(this.key, that.key)
+ && this.expiryDate == that.expiryDate;
+ }
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ public int hashCode() {
+ return key == null ? 0 : key.hashCode();
+ }
+
+ // ///////////////////////////////////////////////
+ // Writable
+ // ///////////////////////////////////////////////
+ /**
+ */
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVLong(out, keyID);
+ key.write(out);
+ WritableUtils.writeVLong(out, expiryDate);
+ }
+
+ /**
+ */
+ public void readFields(DataInput in) throws IOException {
+ keyID = WritableUtils.readVLong(in);
+ key.readFields(in);
+ expiryDate = WritableUtils.readVLong(in);
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessToken.java?rev=1077086&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessToken.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessToken.java Fri Mar 4 03:39:02 2011
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+public class AccessToken implements Writable {
+ public static final AccessToken DUMMY_TOKEN = new AccessToken();
+ private Text tokenID;
+ private Text tokenAuthenticator;
+
+ public AccessToken() {
+ this(new Text(), new Text());
+ }
+
+ public AccessToken(Text tokenID, Text tokenAuthenticator) {
+ this.tokenID = tokenID;
+ this.tokenAuthenticator = tokenAuthenticator;
+ }
+
+ public Text getTokenID() {
+ return tokenID;
+ }
+
+ public Text getTokenAuthenticator() {
+ return tokenAuthenticator;
+ }
+
+ static boolean isEqual(Object a, Object b) {
+ return a == null ? b == null : a.equals(b);
+ }
+
+ /** {@inheritDoc} */
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj instanceof AccessToken) {
+ AccessToken that = (AccessToken) obj;
+ return isEqual(this.tokenID, that.tokenID)
+ && isEqual(this.tokenAuthenticator, that.tokenAuthenticator);
+ }
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ public int hashCode() {
+ return tokenAuthenticator == null ? 0 : tokenAuthenticator.hashCode();
+ }
+
+ // ///////////////////////////////////////////////
+ // Writable
+ // ///////////////////////////////////////////////
+ /**
+ */
+ public void write(DataOutput out) throws IOException {
+ tokenID.write(out);
+ tokenAuthenticator.write(out);
+ }
+
+ /**
+ */
+ public void readFields(DataInput in) throws IOException {
+ tokenID.readFields(in);
+ tokenAuthenticator.readFields(in);
+ }
+
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessTokenHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessTokenHandler.java?rev=1077086&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessTokenHandler.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/AccessTokenHandler.java Fri Mar 4 03:39:02 2011
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.security.GeneralSecurityException;
+import java.security.SecureRandom;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * AccessTokenHandler can be instantiated in 2 modes, master mode and slave
+ * mode. Master can generate new access keys and export access keys to slaves,
+ * while slaves can only import and use access keys received from master. Both
+ * master and slave can generate and verify access tokens. Typically, master
+ * mode is used by NN and slave mode is used by DN.
+ */
+public class AccessTokenHandler {
+ private static final Log LOG = LogFactory.getLog(AccessTokenHandler.class);
+ public static final String STRING_ENABLE_ACCESS_TOKEN = "dfs.access.token.enable";
+ public static final String STRING_ACCESS_KEY_UPDATE_INTERVAL = "dfs.access.key.update.interval";
+ public static final String STRING_ACCESS_TOKEN_LIFETIME = "dfs.access.token.lifetime";
+
+ private final boolean isMaster;
+ /*
+ * keyUpdateInterval is the interval that NN updates its access keys. It
+ * should be set long enough so that all live DN's and Balancer should have
+ * sync'ed their access keys with NN at least once during each interval.
+ */
+ private final long keyUpdateInterval;
+ private final long tokenLifetime;
+ private long serialNo = new SecureRandom().nextLong();
+ private KeyGenerator keyGen;
+ private AccessKey currentKey;
+ private AccessKey nextKey;
+ private Map<Long, AccessKey> allKeys;
+
+ public static enum AccessMode {
+ READ, WRITE, COPY, REPLACE
+ };
+
+ /**
+ * Constructor
+ *
+ * @param isMaster
+ * @param keyUpdateInterval
+ * @param tokenLifetime
+ * @throws IOException
+ */
+ public AccessTokenHandler(boolean isMaster, long keyUpdateInterval,
+ long tokenLifetime) throws IOException {
+ this.isMaster = isMaster;
+ this.keyUpdateInterval = keyUpdateInterval;
+ this.tokenLifetime = tokenLifetime;
+ this.allKeys = new HashMap<Long, AccessKey>();
+ if (isMaster) {
+ try {
+ generateKeys();
+ initMac(currentKey);
+ } catch (GeneralSecurityException e) {
+ throw (IOException) new IOException(
+ "Failed to create AccessTokenHandler").initCause(e);
+ }
+ }
+ }
+
+ /** Initialize access keys */
+ private synchronized void generateKeys() throws NoSuchAlgorithmException {
+ keyGen = KeyGenerator.getInstance("HmacSHA1");
+ /*
+ * Need to set estimated expiry dates for currentKey and nextKey so that if
+ * NN crashes, DN can still expire those keys. NN will stop using the newly
+ * generated currentKey after the first keyUpdateInterval, however it may
+ * still be used by DN and Balancer to generate new tokens before they get a
+ * chance to sync their keys with NN. Since we require keyUpdInterval to be
+ * long enough so that all live DN's and Balancer will sync their keys with
+ * NN at least once during the period, the estimated expiry date for
+ * currentKey is set to now() + 2 * keyUpdateInterval + tokenLifetime.
+ * Similarly, the estimated expiry date for nextKey is one keyUpdateInterval
+ * more.
+ */
+ serialNo++;
+ currentKey = new AccessKey(serialNo, new Text(keyGen.generateKey()
+ .getEncoded()), System.currentTimeMillis() + 2 * keyUpdateInterval
+ + tokenLifetime);
+ serialNo++;
+ nextKey = new AccessKey(serialNo, new Text(keyGen.generateKey()
+ .getEncoded()), System.currentTimeMillis() + 3 * keyUpdateInterval
+ + tokenLifetime);
+ allKeys.put(currentKey.getKeyID(), currentKey);
+ allKeys.put(nextKey.getKeyID(), nextKey);
+ }
+
+ /** Initialize Mac function */
+ private synchronized void initMac(AccessKey key) throws IOException {
+ try {
+ Mac mac = Mac.getInstance("HmacSHA1");
+ mac.init(new SecretKeySpec(key.getKey().getBytes(), "HmacSHA1"));
+ key.setMac(mac);
+ } catch (GeneralSecurityException e) {
+ throw (IOException) new IOException(
+ "Failed to initialize Mac for access key, keyID=" + key.getKeyID())
+ .initCause(e);
+ }
+ }
+
+ /** Export access keys, only to be used in master mode */
+ public synchronized ExportedAccessKeys exportKeys() {
+ if (!isMaster)
+ return null;
+ if (LOG.isDebugEnabled())
+ LOG.debug("Exporting access keys");
+ return new ExportedAccessKeys(true, keyUpdateInterval, tokenLifetime,
+ currentKey, allKeys.values().toArray(new AccessKey[0]));
+ }
+
+ private synchronized void removeExpiredKeys() {
+ long now = System.currentTimeMillis();
+ for (Iterator<Map.Entry<Long, AccessKey>> it = allKeys.entrySet()
+ .iterator(); it.hasNext();) {
+ Map.Entry<Long, AccessKey> e = it.next();
+ if (e.getValue().getExpiryDate() < now) {
+ it.remove();
+ }
+ }
+ }
+
+ /**
+ * Set access keys, only to be used in slave mode
+ */
+ public synchronized void setKeys(ExportedAccessKeys exportedKeys)
+ throws IOException {
+ if (isMaster || exportedKeys == null)
+ return;
+ LOG.info("Setting access keys");
+ removeExpiredKeys();
+ this.currentKey = exportedKeys.getCurrentKey();
+ initMac(currentKey);
+ AccessKey[] receivedKeys = exportedKeys.getAllKeys();
+ for (int i = 0; i < receivedKeys.length; i++) {
+ if (receivedKeys[i] == null)
+ continue;
+ this.allKeys.put(receivedKeys[i].getKeyID(), receivedKeys[i]);
+ }
+ }
+
+ /**
+ * Update access keys, only to be used in master mode
+ */
+ public synchronized void updateKeys() throws IOException {
+ if (!isMaster)
+ return;
+ LOG.info("Updating access keys");
+ removeExpiredKeys();
+ // set final expiry date of retiring currentKey
+ allKeys.put(currentKey.getKeyID(), new AccessKey(currentKey.getKeyID(),
+ currentKey.getKey(), System.currentTimeMillis() + keyUpdateInterval
+ + tokenLifetime));
+ // update the estimated expiry date of new currentKey
+ currentKey = new AccessKey(nextKey.getKeyID(), nextKey.getKey(), System
+ .currentTimeMillis()
+ + 2 * keyUpdateInterval + tokenLifetime);
+ initMac(currentKey);
+ allKeys.put(currentKey.getKeyID(), currentKey);
+ // generate a new nextKey
+ serialNo++;
+ nextKey = new AccessKey(serialNo, new Text(keyGen.generateKey()
+ .getEncoded()), System.currentTimeMillis() + 3 * keyUpdateInterval
+ + tokenLifetime);
+ allKeys.put(nextKey.getKeyID(), nextKey);
+ }
+
+ /** Check if token is well formed */
+ private synchronized Boolean verifyToken(long keyID, AccessToken token)
+ throws IOException {
+ AccessKey key = allKeys.get(keyID);
+ if (key == null) {
+ LOG.warn("Access key for keyID=" + keyID + " doesn't exist.");
+ return false;
+ }
+ if (key.getMac() == null) {
+ initMac(key);
+ }
+ Text tokenID = token.getTokenID();
+ Text authenticator = new Text(key.getMac().doFinal(tokenID.getBytes()));
+ return authenticator.equals(token.getTokenAuthenticator());
+ }
+
+ /** Generate an access token for current user */
+ public AccessToken generateToken(long blockID, EnumSet<AccessMode> modes)
+ throws IOException {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+ String userID = (ugi == null ? null : ugi.getUserName());
+ return generateToken(userID, blockID, modes);
+ }
+
+ /** Generate an access token for a specified user */
+ public synchronized AccessToken generateToken(String userID, long blockID,
+ EnumSet<AccessMode> modes) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generating access token for user=" + userID + ", blockID="
+ + blockID + ", access modes=" + modes + ", keyID="
+ + currentKey.getKeyID());
+ }
+ if (modes == null || modes.isEmpty())
+ throw new IOException("access modes can't be null or empty");
+ ByteArrayOutputStream buf = new ByteArrayOutputStream(4096);
+ DataOutputStream out = new DataOutputStream(buf);
+ WritableUtils.writeVLong(out, System.currentTimeMillis() + tokenLifetime);
+ WritableUtils.writeVLong(out, currentKey.getKeyID());
+ WritableUtils.writeString(out, userID);
+ WritableUtils.writeVLong(out, blockID);
+ WritableUtils.writeVInt(out, modes.size());
+ for (AccessMode aMode : modes) {
+ WritableUtils.writeEnum(out, aMode);
+ }
+ Text tokenID = new Text(buf.toByteArray());
+ return new AccessToken(tokenID, new Text(currentKey.getMac().doFinal(
+ tokenID.getBytes())));
+ }
+
+ /** Check if access should be allowed. userID is not checked if null */
+ public Boolean checkAccess(AccessToken token, String userID, long blockID,
+ AccessMode mode) throws IOException {
+ long oExpiry = 0;
+ long oKeyID = 0;
+ String oUserID = null;
+ long oBlockID = 0;
+ EnumSet<AccessMode> oModes = EnumSet.noneOf(AccessMode.class);
+
+ try {
+ ByteArrayInputStream buf = new ByteArrayInputStream(token.getTokenID()
+ .getBytes());
+ DataInputStream in = new DataInputStream(buf);
+ oExpiry = WritableUtils.readVLong(in);
+ oKeyID = WritableUtils.readVLong(in);
+ oUserID = WritableUtils.readString(in);
+ oBlockID = WritableUtils.readVLong(in);
+ int length = WritableUtils.readVInt(in);
+ for (int i = 0; i < length; ++i) {
+ oModes.add(WritableUtils.readEnum(in, AccessMode.class));
+ }
+ } catch (IOException e) {
+ throw (IOException) new IOException(
+ "Unable to parse access token for user=" + userID + ", blockID="
+ + blockID + ", access mode=" + mode).initCause(e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Verifying access token for user=" + userID + ", blockID="
+ + blockID + ", access mode=" + mode + ", keyID=" + oKeyID);
+ }
+ return (userID == null || userID.equals(oUserID)) && oBlockID == blockID
+ && System.currentTimeMillis() < oExpiry && oModes.contains(mode)
+ && verifyToken(oKeyID, token);
+ }
+
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/ExportedAccessKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/ExportedAccessKeys.java?rev=1077086&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/ExportedAccessKeys.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/ExportedAccessKeys.java Fri Mar 4 03:39:02 2011
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * Object for passing access keys
+ */
+public class ExportedAccessKeys implements Writable {
+ public static final ExportedAccessKeys DUMMY_KEYS = new ExportedAccessKeys();
+ private boolean isAccessTokenEnabled;
+ private long keyUpdateInterval;
+ private long tokenLifetime;
+ private AccessKey currentKey;
+ private AccessKey[] allKeys;
+
+ public ExportedAccessKeys() {
+ this(false, 0, 0, new AccessKey(), new AccessKey[0]);
+ }
+
+ ExportedAccessKeys(boolean isAccessTokenEnabled, long keyUpdateInterval,
+ long tokenLifetime, AccessKey currentKey, AccessKey[] allKeys) {
+ this.isAccessTokenEnabled = isAccessTokenEnabled;
+ this.keyUpdateInterval = keyUpdateInterval;
+ this.tokenLifetime = tokenLifetime;
+ this.currentKey = currentKey;
+ this.allKeys = allKeys;
+ }
+
+ public boolean isAccessTokenEnabled() {
+ return isAccessTokenEnabled;
+ }
+
+ public long getKeyUpdateInterval() {
+ return keyUpdateInterval;
+ }
+
+ public long getTokenLifetime() {
+ return tokenLifetime;
+ }
+
+ public AccessKey getCurrentKey() {
+ return currentKey;
+ }
+
+ public AccessKey[] getAllKeys() {
+ return allKeys;
+ }
+
+ static boolean isEqual(Object a, Object b) {
+ return a == null ? b == null : a.equals(b);
+ }
+
+ /** {@inheritDoc} */
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj instanceof ExportedAccessKeys) {
+ ExportedAccessKeys that = (ExportedAccessKeys) obj;
+ return this.isAccessTokenEnabled == that.isAccessTokenEnabled
+ && this.keyUpdateInterval == that.keyUpdateInterval
+ && this.tokenLifetime == that.tokenLifetime
+ && isEqual(this.currentKey, that.currentKey)
+ && Arrays.equals(this.allKeys, that.allKeys);
+ }
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ public int hashCode() {
+ return currentKey == null ? 0 : currentKey.hashCode();
+ }
+
+ // ///////////////////////////////////////////////
+ // Writable
+ // ///////////////////////////////////////////////
+ static { // register a ctor
+ WritableFactories.setFactory(ExportedAccessKeys.class,
+ new WritableFactory() {
+ public Writable newInstance() {
+ return new ExportedAccessKeys();
+ }
+ });
+ }
+
+ /**
+ */
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(isAccessTokenEnabled);
+ out.writeLong(keyUpdateInterval);
+ out.writeLong(tokenLifetime);
+ currentKey.write(out);
+ out.writeInt(allKeys.length);
+ for (int i = 0; i < allKeys.length; i++) {
+ allKeys[i].write(out);
+ }
+ }
+
+ /**
+ */
+ public void readFields(DataInput in) throws IOException {
+ isAccessTokenEnabled = in.readBoolean();
+ keyUpdateInterval = in.readLong();
+ tokenLifetime = in.readLong();
+ currentKey.readFields(in);
+ this.allKeys = new AccessKey[in.readInt()];
+ for (int i = 0; i < allKeys.length; i++) {
+ allKeys[i] = new AccessKey();
+ allKeys[i].readFields(in);
+ }
+ }
+
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/InvalidAccessTokenException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/InvalidAccessTokenException.java?rev=1077086&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/InvalidAccessTokenException.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/InvalidAccessTokenException.java Fri Mar 4 03:39:02 2011
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.IOException;
+
+/**
+ * Access token verification failed.
+ */
+public class InvalidAccessTokenException extends IOException {
+ private static final long serialVersionUID = 168L;
+
+ public InvalidAccessTokenException() {
+ super();
+ }
+
+ public InvalidAccessTokenException(String msg) {
+ super(msg);
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/hdfs-default.xml?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/hdfs-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/hdfs-default.xml Fri Mar 4 03:39:02 2011
@@ -190,6 +190,29 @@ creations/deletions), or "all".</descrip
</property>
<property>
+ <name>dfs.access.token.enable</name>
+ <value>false</value>
+ <description>
+ If "true", access tokens are used as capabilities for accessing datanodes.
+ If "false", no access tokens are checked on accessing datanodes.
+ </description>
+</property>
+
+<property>
+ <name>dfs.access.key.update.interval</name>
+ <value>600</value>
+ <description>
+ Interval in minutes at which namenode updates its access keys.
+ </description>
+</property>
+
+<property>
+ <name>dfs.access.token.lifetime</name>
+ <value>600</value>
+ <description>The lifetime of access tokens in minutes.</description>
+</property>
+
+<property>
<name>dfs.data.dir</name>
<value>${hadoop.tmp.dir}/dfs/data</value>
<description>Determines where on the local filesystem an DFS data node
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Mar 4 03:39:02 2011
@@ -34,7 +34,9 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.security.InvalidAccessTokenException;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.AccessToken;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.*;
@@ -638,14 +640,21 @@ public class DFSClient implements FSCons
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout
) throws IOException {
//get all block locations
- final List<LocatedBlock> locatedblocks
+ List<LocatedBlock> locatedblocks
= callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE).getLocatedBlocks();
final DataOutputBuffer md5out = new DataOutputBuffer();
int bytesPerCRC = 0;
long crcPerBlock = 0;
+ boolean refetchBlocks = false;
+ int lastRetriedIndex = -1;
//get block checksum for each block
for(int i = 0; i < locatedblocks.size(); i++) {
+ if (refetchBlocks) { // refetch to get fresh tokens
+ locatedblocks = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE)
+ .getLocatedBlocks();
+ refetchBlocks = false;
+ }
LocatedBlock lb = locatedblocks.get(i);
final Block block = lb.getBlock();
final DatanodeInfo[] datanodes = lb.getLocations();
@@ -677,12 +686,28 @@ public class DFSClient implements FSCons
out.write(DataTransferProtocol.OP_BLOCK_CHECKSUM);
out.writeLong(block.getBlockId());
out.writeLong(block.getGenerationStamp());
+ lb.getAccessToken().write(out);
out.flush();
final short reply = in.readShort();
if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
- throw new IOException("Bad response " + reply + " for block "
- + block + " from datanode " + datanodes[j].getName());
+ if (reply == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN
+ && i > lastRetriedIndex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+ + "for file " + src + " for block " + block
+ + " from datanode " + datanodes[j].getName()
+ + ". Will retry the block once.");
+ }
+ lastRetriedIndex = i;
+ done = true; // actually it's not done; but we'll retry
+ i--; // repeat at i-th block
+ refetchBlocks = true;
+ break;
+ } else {
+ throw new IOException("Bad response " + reply + " for block "
+ + block + " from datanode " + datanodes[j].getName());
+ }
}
//read byte-per-checksum
@@ -1325,24 +1350,26 @@ public class DFSClient implements FSCons
checksumSize = this.checksum.getChecksumSize();
}
- public static BlockReader newBlockReader(Socket sock, String file, long blockId,
+ public static BlockReader newBlockReader(Socket sock, String file, long blockId, AccessToken accessToken,
long genStamp, long startOffset, long len, int bufferSize) throws IOException {
- return newBlockReader(sock, file, blockId, genStamp, startOffset, len, bufferSize,
+ return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
true);
}
/** Java Doc required */
public static BlockReader newBlockReader( Socket sock, String file, long blockId,
+ AccessToken accessToken,
long genStamp,
long startOffset, long len,
int bufferSize, boolean verifyChecksum)
throws IOException {
- return newBlockReader(sock, file, blockId, genStamp, startOffset,
+ return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset,
len, bufferSize, verifyChecksum, "");
}
public static BlockReader newBlockReader( Socket sock, String file,
long blockId,
+ AccessToken accessToken,
long genStamp,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
@@ -1360,6 +1387,7 @@ public class DFSClient implements FSCons
out.writeLong( startOffset );
out.writeLong( len );
Text.writeString(out, clientName);
+ accessToken.write(out);
out.flush();
//
@@ -1370,10 +1398,16 @@ public class DFSClient implements FSCons
new BufferedInputStream(NetUtils.getInputStream(sock),
bufferSize));
- if ( in.readShort() != DataTransferProtocol.OP_STATUS_SUCCESS ) {
- throw new IOException("Got error in response to OP_READ_BLOCK " +
- "for file " + file +
- " for block " + blockId);
+ short status = in.readShort();
+ if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
+ if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+ throw new InvalidAccessTokenException(
+ "Got access token error in response to OP_READ_BLOCK "
+ + "for file " + file + " for block " + blockId);
+ } else {
+ throw new IOException("Got error in response to OP_READ_BLOCK "
+ + "for file " + file + " for block " + blockId);
+ }
}
DataChecksum checksum = DataChecksum.newDataChecksum( in );
//Warning when we get CHECKSUM_NULL?
@@ -1520,7 +1554,7 @@ public class DFSClient implements FSCons
* @return located block
* @throws IOException
*/
- private LocatedBlock getBlockAt(long offset) throws IOException {
+ private synchronized LocatedBlock getBlockAt(long offset) throws IOException {
assert (locatedBlocks != null) : "locatedBlocks is null";
// search cached blocks first
int targetBlockIdx = locatedBlocks.findBlock(offset);
@@ -1540,6 +1574,32 @@ public class DFSClient implements FSCons
return blk;
}
+ /** Fetch a block from namenode and cache it */
+ private synchronized void fetchAndCacheBlockAt(long offset) throws IOException {
+ int targetBlockIdx = locatedBlocks.findBlock(offset);
+ if (targetBlockIdx < 0) { // block is not cached
+ targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+ }
+ // fetch blocks
+ LocatedBlocks newBlocks;
+ newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
+ if (newBlocks == null) {
+ throw new IOException("Could not find target position " + offset);
+ }
+ locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+ }
+
+ /** Fetch a block without caching */
+ private LocatedBlock fetchBlockAt(long offset) throws IOException {
+ LocatedBlocks newBlocks;
+ newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
+ if (newBlocks == null) {
+ throw new IOException("Could not find target position " + offset);
+ }
+ int index = newBlocks.findBlock(offset);
+ return newBlocks.get(index);
+ }
+
/**
* Get blocks in the specified range.
* Fetch them from the namenode if not cached.
@@ -1601,17 +1661,18 @@ public class DFSClient implements FSCons
}
//
- // Compute desired block
- //
- LocatedBlock targetBlock = getBlockAt(target);
- assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
- long offsetIntoBlock = target - targetBlock.getStartOffset();
-
- //
// Connect to best DataNode for desired Block, with potential offset
//
DatanodeInfo chosenNode = null;
- while (s == null) {
+ int refetchToken = 1; // only need to get a new access token once
+ while (true) {
+ //
+ // Compute desired block
+ //
+ LocatedBlock targetBlock = getBlockAt(target);
+ assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
+ long offsetIntoBlock = target - targetBlock.getStartOffset();
+
DNAddrPair retval = chooseDataNode(targetBlock);
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
@@ -1621,17 +1682,33 @@ public class DFSClient implements FSCons
NetUtils.connect(s, targetAddr, socketTimeout);
s.setSoTimeout(socketTimeout);
Block blk = targetBlock.getBlock();
+ AccessToken accessToken = targetBlock.getAccessToken();
blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(),
+ accessToken,
blk.getGenerationStamp(),
offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
buffersize, verifyChecksum, clientName);
return chosenNode;
} catch (IOException ex) {
- // Put chosen node into dead list, continue
LOG.debug("Failed to connect to " + targetAddr + ":"
+ StringUtils.stringifyException(ex));
- addToDeadNodes(chosenNode);
+ if (ex instanceof InvalidAccessTokenException && refetchToken-- > 0) {
+ /*
+ * Get a new access token and retry. Retry is needed in 2 cases. 1)
+ * When both NN and DN re-started while DFSClient holding a cached
+ * access token. 2) In the case that NN fails to update its
+ * access key at pre-set interval (by a wide margin) and
+ * subsequently restarts. In this case, DN re-registers itself with
+ * NN and receives a new access key, but DN will delete the old
+ * access key from its memory since it's considered expired based on
+ * the estimated expiration date.
+ */
+ fetchAndCacheBlockAt(target);
+ } else {
+ // Put chosen node into dead list, continue
+ addToDeadNodes(chosenNode);
+ }
if (s != null) {
try {
s.close();
@@ -1641,7 +1718,6 @@ public class DFSClient implements FSCons
s = null;
}
}
- return chosenNode;
}
/**
@@ -1811,6 +1887,7 @@ public class DFSClient implements FSCons
Socket dn = null;
int numAttempts = block.getLocations().length;
IOException ioe = null;
+ int refetchToken = 1; // only need to get a new access token once
while (dn == null && numAttempts-- > 0 ) {
DNAddrPair retval = chooseDataNode(block);
@@ -1822,11 +1899,13 @@ public class DFSClient implements FSCons
dn = socketFactory.createSocket();
NetUtils.connect(dn, targetAddr, socketTimeout);
dn.setSoTimeout(socketTimeout);
+ AccessToken accessToken = block.getAccessToken();
int len = (int) (end - start + 1);
reader = BlockReader.newBlockReader(dn, src,
block.getBlock().getBlockId(),
+ accessToken,
block.getBlock().getGenerationStamp(),
start, len, buffersize,
verifyChecksum, clientName);
@@ -1844,10 +1923,20 @@ public class DFSClient implements FSCons
reportChecksumFailure(src, block.getBlock(), chosenNode);
} catch (IOException e) {
ioe = e;
- LOG.warn("Failed to connect to " + targetAddr +
- " for file " + src +
- " for block " + block.getBlock().getBlockId() + ":" +
- StringUtils.stringifyException(e));
+ if (e instanceof InvalidAccessTokenException && refetchToken-- > 0) {
+ LOG.info("Invalid access token when connecting to " + targetAddr
+ + " for file " + src + " for block "
+ + block.getBlock() + ":"
+ + StringUtils.stringifyException(e)
+ + ", get a new access token and retry...");
+ block = fetchBlockAt(block.getStartOffset());
+ numAttempts = block.getLocations().length;
+ continue;
+ } else {
+ LOG.warn("Failed to connect to " + targetAddr + " for file " + src
+ + " for block " + block.getBlock() + ":"
+ + StringUtils.stringifyException(e));
+ }
} finally {
IOUtils.closeStream(reader);
IOUtils.closeSocket(dn);
@@ -2081,6 +2170,7 @@ public class DFSClient implements FSCons
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
private Block block;
+ private AccessToken accessToken;
final private long blockSize;
private DataChecksum checksum;
private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
@@ -2600,6 +2690,7 @@ public class DFSClient implements FSCons
//
if (newBlock != null) {
block = newBlock.getBlock();
+ accessToken = newBlock.getAccessToken();
nodes = newBlock.getLocations();
}
@@ -2785,6 +2876,7 @@ public class DFSClient implements FSCons
long startTime = System.currentTimeMillis();
lb = locateFollowingBlock(startTime);
block = lb.getBlock();
+ accessToken = lb.getAccessToken();
nodes = lb.getLocations();
//
@@ -2861,6 +2953,7 @@ public class DFSClient implements FSCons
for (int i = 1; i < nodes.length; i++) {
nodes[i].write(out);
}
+ accessToken.write(out);
checksum.writeHeader( out );
out.flush();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Mar 4 03:39:02 2011
@@ -40,9 +40,9 @@ public interface ClientProtocol extends
* Compared to the previous version the following changes have been introduced:
* (Only the latest change is reflected.
* The log of historical changes can be retrieved from the svn).
- * 41: saveNamespace introduced.
+ * 42: All LocatedBlock objects contain access tokens
*/
- public static final long versionID = 41L;
+ public static final long versionID = 42L;
///////////////////////////////////////
// File contents
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Fri Mar 4 03:39:02 2011
@@ -31,15 +31,11 @@ public interface DataTransferProtocol {
* when protocol changes. It is not very obvious.
*/
/*
- * Version 14:
- * OP_REPLACE_BLOCK is sent from the Balancer server to the destination,
- * including the block id, source, and proxy.
- * OP_COPY_BLOCK is sent from the destination to the proxy, which contains
- * only the block id.
- * A reply to OP_COPY_BLOCK sends the block content.
- * A reply to OP_REPLACE_BLOCK includes an operation status.
+ * Version 15:
+ * Added a new status OP_STATUS_ERROR_ACCESS_TOKEN
+ * Access token is now required on all DN operations
*/
- public static final int DATA_TRANSFER_VERSION = 14;
+ public static final int DATA_TRANSFER_VERSION = 15;
// Processed at datanode stream-handler
public static final byte OP_WRITE_BLOCK = (byte) 80;
@@ -54,7 +50,8 @@ public interface DataTransferProtocol {
public static final int OP_STATUS_ERROR_CHECKSUM = 2;
public static final int OP_STATUS_ERROR_INVALID = 3;
public static final int OP_STATUS_ERROR_EXISTS = 4;
- public static final int OP_STATUS_CHECKSUM_OK = 5;
+ public static final int OP_STATUS_ERROR_ACCESS_TOKEN = 5;
+ public static final int OP_STATUS_CHECKSUM_OK = 6;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Fri Mar 4 03:39:02 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.security.AccessToken;
import java.io.*;
@@ -43,6 +44,7 @@ public class LocatedBlock implements Wri
// else false. If block has few corrupt replicas, they are filtered and
// their locations are not part of this object
private boolean corrupt;
+ private AccessToken accessToken = new AccessToken();
/**
*/
@@ -76,6 +78,14 @@ public class LocatedBlock implements Wri
}
}
+ public AccessToken getAccessToken() {
+ return accessToken;
+ }
+
+ public void setAccessToken(AccessToken token) {
+ this.accessToken = token;
+ }
+
/**
*/
public Block getBlock() {
@@ -112,6 +122,7 @@ public class LocatedBlock implements Wri
// Writable
///////////////////////////////////////////
public void write(DataOutput out) throws IOException {
+ accessToken.write(out);
out.writeBoolean(corrupt);
out.writeLong(offset);
b.write(out);
@@ -122,6 +133,7 @@ public class LocatedBlock implements Wri
}
public void readFields(DataInput in) throws IOException {
+ accessToken.readFields(in);
this.corrupt = in.readBoolean();
offset = in.readLong();
this.b = new Block();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Mar 4 03:39:02 2011
@@ -33,6 +33,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -71,8 +72,12 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.security.AccessTokenHandler;
+import org.apache.hadoop.security.ExportedAccessKeys;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -190,6 +195,11 @@ public class Balancer implements Tool {
private NamenodeProtocol namenode;
private ClientProtocol client;
private FileSystem fs;
+ private boolean isAccessTokenEnabled;
+ private boolean shouldRun;
+ private long keyUpdaterInterval;
+ private AccessTokenHandler accessTokenHandler;
+ private Daemon keyupdaterthread = null; // AccessKeyUpdater thread
private final static Random rnd = new Random();
// all data node lists
@@ -359,6 +369,13 @@ public class Balancer implements Tool {
out.writeLong(block.getBlock().getGenerationStamp());
Text.writeString(out, source.getStorageID());
proxySource.write(out);
+ AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+ if (isAccessTokenEnabled) {
+ accessToken = accessTokenHandler.generateToken(null, block.getBlock()
+ .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.REPLACE,
+ AccessTokenHandler.AccessMode.COPY));
+ }
+ accessToken.write(out);
out.flush();
}
@@ -366,6 +383,8 @@ public class Balancer implements Tool {
private void receiveResponse(DataInputStream in) throws IOException {
short status = in.readShort();
if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
+ if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN)
+ throw new IOException("block move failed due to access token error");
throw new IOException("block move is failed");
}
}
@@ -841,6 +860,48 @@ public class Balancer implements Tool {
this.namenode = createNamenode(conf);
this.client = DFSClient.createNamenode(conf);
this.fs = FileSystem.get(conf);
+ ExportedAccessKeys keys = namenode.getAccessKeys();
+ this.isAccessTokenEnabled = keys.isAccessTokenEnabled();
+ if (isAccessTokenEnabled) {
+ long accessKeyUpdateInterval = keys.getKeyUpdateInterval();
+ long accessTokenLifetime = keys.getTokenLifetime();
+ LOG.info("Access token params received from NN: keyUpdateInterval="
+ + accessKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+ + accessTokenLifetime / (60 * 1000) + " min(s)");
+ this.accessTokenHandler = new AccessTokenHandler(false,
+ accessKeyUpdateInterval, accessTokenLifetime);
+ this.accessTokenHandler.setKeys(keys);
+ /*
+ * Balancer should sync its access keys with NN more frequently than NN
+ * updates its access keys
+ */
+ this.keyUpdaterInterval = accessKeyUpdateInterval / 4;
+ LOG.info("Balancer will update its access keys every "
+ + keyUpdaterInterval / (60 * 1000) + " minute(s)");
+ this.keyupdaterthread = new Daemon(new AccessKeyUpdater());
+ this.shouldRun = true;
+ this.keyupdaterthread.start();
+ }
+ }
+
+ /**
+ * Periodically updates access keys.
+ */
+ class AccessKeyUpdater implements Runnable {
+
+ public void run() {
+ while (shouldRun) {
+ try {
+ accessTokenHandler.setKeys(namenode.getAccessKeys());
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ }
+ try {
+ Thread.sleep(keyUpdaterInterval);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
}
/* Build a NamenodeProtocol connection to the namenode and
@@ -857,6 +918,7 @@ public class Balancer implements Tool {
Map<String,RetryPolicy> methodNameToPolicyMap =
new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("getBlocks", methodPolicy);
+ methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
UserGroupInformation ugi;
try {
@@ -1502,6 +1564,12 @@ public class Balancer implements Tool {
dispatcherExecutor.shutdownNow();
moverExecutor.shutdownNow();
+ shouldRun = false;
+ try {
+ if (keyupdaterthread != null) keyupdaterthread.interrupt();
+ } catch (Exception e) {
+ LOG.warn("Exception shutting down access key updater thread", e);
+ }
// close the output file
IOUtils.closeStream(out);
if (fs != null) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Mar 4 03:39:02 2011
@@ -34,6 +34,7 @@ import java.security.SecureRandom;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -71,6 +72,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
@@ -82,6 +84,9 @@ import org.apache.hadoop.ipc.RemoteExcep
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.security.AccessTokenHandler;
+import org.apache.hadoop.security.ExportedAccessKeys;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authorize.ConfiguredPolicy;
import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -189,6 +194,9 @@ public class DataNode extends Configured
int socketWriteTimeout = 0;
boolean transferToAllowed = true;
int writePacketSize = 0;
+ boolean isAccessTokenEnabled;
+ AccessTokenHandler accessTokenHandler;
+ boolean isAccessTokenInitialized = false;
public DataBlockScanner blockScanner = null;
public Daemon blockScannerThread = null;
@@ -556,6 +564,27 @@ public class DataNode extends Configured
+ ". Expecting " + storage.getStorageID());
}
+ if (!isAccessTokenInitialized) {
+ /* first time registering with NN */
+ ExportedAccessKeys keys = dnRegistration.exportedKeys;
+ this.isAccessTokenEnabled = keys.isAccessTokenEnabled();
+ if (isAccessTokenEnabled) {
+ long accessKeyUpdateInterval = keys.getKeyUpdateInterval();
+ long accessTokenLifetime = keys.getTokenLifetime();
+ LOG.info("Access token params received from NN: keyUpdateInterval="
+ + accessKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+ + accessTokenLifetime / (60 * 1000) + " min(s)");
+ this.accessTokenHandler = new AccessTokenHandler(false,
+ accessKeyUpdateInterval, accessTokenLifetime);
+ }
+ isAccessTokenInitialized = true;
+ }
+
+ if (isAccessTokenEnabled) {
+ accessTokenHandler.setKeys(dnRegistration.exportedKeys);
+ dnRegistration.exportedKeys = ExportedAccessKeys.DUMMY_KEYS;
+ }
+
// random short delay - helps scatter the BR from all DNs
scheduleBlockReport(initialBlockReportDelay);
}
@@ -920,6 +949,12 @@ public class DataNode extends Configured
case DatanodeProtocol.DNA_RECOVERBLOCK:
recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
break;
+ case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
+ LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
+ if (isAccessTokenEnabled) {
+ accessTokenHandler.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
+ }
+ break;
default:
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
}
@@ -1176,6 +1211,12 @@ public class DataNode extends Configured
for (int i = 1; i < targets.length; i++) {
targets[i].write(out);
}
+ AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+ if (isAccessTokenEnabled) {
+ accessToken = accessTokenHandler.generateToken(null, b.getBlockId(),
+ EnumSet.of(AccessTokenHandler.AccessMode.WRITE));
+ }
+ accessToken.write(out);
// send data & checksum
blockSender.sendBlock(out, baseStream, null);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Mar 4 03:39:02 2011
@@ -38,6 +38,8 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.security.AccessTokenHandler;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
@@ -153,12 +155,26 @@ class DataXceiver implements Runnable, F
long startOffset = in.readLong();
long length = in.readLong();
String clientName = Text.readString(in);
- // send the block
+ AccessToken accessToken = new AccessToken();
+ accessToken.readFields(in);
OutputStream baseStream = NetUtils.getOutputStream(s,
datanode.socketWriteTimeout);
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
+ if (datanode.isAccessTokenEnabled
+ && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
+ AccessTokenHandler.AccessMode.READ)) {
+ try {
+ out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+ out.flush();
+ throw new IOException("Access token verification failed, on client "
+ + "request for reading block " + block);
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ }
+ // send the block
BlockSender blockSender = null;
final String clientTraceFmt =
clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
@@ -246,10 +262,28 @@ class DataXceiver implements Runnable, F
tmp.readFields(in);
targets[i] = tmp;
}
+ AccessToken accessToken = new AccessToken();
+ accessToken.readFields(in);
+ DataOutputStream replyOut = null; // stream to prev target
+ replyOut = new DataOutputStream(
+ NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+ if (datanode.isAccessTokenEnabled
+ && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
+ .getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
+ try {
+ if (client.length() != 0) {
+ Text.writeString(replyOut, datanode.dnRegistration.getName());
+ replyOut.flush();
+ }
+ throw new IOException("Access token verification failed, on client "
+ + "request for writing block " + block);
+ } finally {
+ IOUtils.closeStream(replyOut);
+ }
+ }
DataOutputStream mirrorOut = null; // stream to next target
DataInputStream mirrorIn = null; // reply from next target
- DataOutputStream replyOut = null; // stream to prev target
Socket mirrorSock = null; // socket to next target
BlockReceiver blockReceiver = null; // responsible for data handling
String mirrorNode = null; // the name:port of next target
@@ -261,10 +295,6 @@ class DataXceiver implements Runnable, F
s.getLocalSocketAddress().toString(),
isRecovery, client, srcDataNode, datanode);
- // get a connection back to the previous target
- replyOut = new DataOutputStream(
- NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
-
//
// Open network conn to backup machine, if
// appropriate
@@ -304,6 +334,7 @@ class DataXceiver implements Runnable, F
for ( int i = 1; i < targets.length; i++ ) {
targets[i].write( mirrorOut );
}
+ accessToken.write(mirrorOut);
blockReceiver.writeChecksumHeader(mirrorOut);
mirrorOut.flush();
@@ -429,8 +460,24 @@ class DataXceiver implements Runnable, F
*/
void getBlockChecksum(DataInputStream in) throws IOException {
final Block block = new Block(in.readLong(), 0 , in.readLong());
+ AccessToken accessToken = new AccessToken();
+ accessToken.readFields(in);
+ DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
+ datanode.socketWriteTimeout));
+ if (datanode.isAccessTokenEnabled
+ && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
+ .getBlockId(), AccessTokenHandler.AccessMode.READ)) {
+ try {
+ out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+ out.flush();
+ throw new IOException(
+ "Access token verification failed, on getBlockChecksum() "
+ + "for block " + block);
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ }
- DataOutputStream out = null;
final MetaDataInputStream metadataIn = datanode.data.getMetaDataInputStream(block);
final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
metadataIn, BUFFER_SIZE));
@@ -452,8 +499,6 @@ class DataXceiver implements Runnable, F
}
//write reply
- out = new DataOutputStream(
- NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
out.writeInt(bytesPerCRC);
out.writeLong(crcPerBlock);
@@ -476,10 +521,24 @@ class DataXceiver implements Runnable, F
// Read in the header
long blockId = in.readLong(); // read block id
Block block = new Block(blockId, 0, in.readLong());
+ AccessToken accessToken = new AccessToken();
+ accessToken.readFields(in);
+ if (datanode.isAccessTokenEnabled
+ && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
+ AccessTokenHandler.AccessMode.COPY)) {
+ LOG.warn("Invalid access token in request from "
+ + s.getRemoteSocketAddress() + " for copying block " + block);
+ sendResponse(s,
+ (short) DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
+ datanode.socketWriteTimeout);
+ return;
+ }
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
LOG.info("Not able to copy block " + blockId + " to "
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.");
+ sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR,
+ datanode.socketWriteTimeout);
return;
}
@@ -498,6 +557,8 @@ class DataXceiver implements Runnable, F
reply = new DataOutputStream(new BufferedOutputStream(
baseStream, SMALL_BUFFER_SIZE));
+ // send status first
+ reply.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
// send block content to the target
long read = blockSender.sendBlock(reply, baseStream,
dataXceiverServer.balanceThrottler);
@@ -538,6 +599,17 @@ class DataXceiver implements Runnable, F
String sourceID = Text.readString(in); // read del hint
DatanodeInfo proxySource = new DatanodeInfo(); // read proxy source
proxySource.readFields(in);
+ AccessToken accessToken = new AccessToken();
+ accessToken.readFields(in);
+ if (datanode.isAccessTokenEnabled
+ && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
+ AccessTokenHandler.AccessMode.REPLACE)) {
+ LOG.warn("Invalid access token in request from "
+ + s.getRemoteSocketAddress() + " for replacing block " + block);
+ sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
+ datanode.socketWriteTimeout);
+ return;
+ }
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
LOG.warn("Not able to receive block " + blockId + " from "
@@ -571,11 +643,22 @@ class DataXceiver implements Runnable, F
proxyOut.writeByte(DataTransferProtocol.OP_COPY_BLOCK); // op code
proxyOut.writeLong(block.getBlockId()); // block id
proxyOut.writeLong(block.getGenerationStamp()); // block id
+ accessToken.write(proxyOut);
proxyOut.flush();
// receive the response from the proxy
proxyReply = new DataInputStream(new BufferedInputStream(
NetUtils.getInputStream(proxySock), BUFFER_SIZE));
+ short status = proxyReply.readShort();
+ if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
+ if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+ throw new IOException("Copy block " + block + " from "
+ + proxySock.getRemoteSocketAddress()
+ + " failed due to access token error");
+ }
+ throw new IOException("Copy block " + block + " from "
+ + proxySock.getRemoteSocketAddress() + " failed");
+ }
// open a block receiver and check if the block does not exist
blockReceiver = new BlockReceiver(
block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Fri Mar 4 03:39:02 2011
@@ -90,6 +90,7 @@ public class DatanodeDescriptor extends
// isAlive == heartbeats.contains(this)
// This is an optimization, because contains takes O(n) time on Arraylist
protected boolean isAlive = false;
+ protected boolean needKeyUpdate = false;
/** A queue of blocks to be replicated by this datanode */
private BlockQueue replicateBlocks = new BlockQueue();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Mar 4 03:39:02 2011
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.AccessTokenHandler;
+import org.apache.hadoop.security.ExportedAccessKeys;
import org.apache.hadoop.security.PermissionChecker;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.fs.ContentSummary;
@@ -130,6 +133,10 @@ public class FSNamesystem implements FSC
private FSNamesystemMetrics myFSMetrics;
private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
private int totalLoad = 0;
+ boolean isAccessTokenEnabled;
+ AccessTokenHandler accessTokenHandler;
+ private long accessKeyUpdateInterval;
+ private long accessTokenLifetime;
volatile long pendingReplicationBlocksCount = 0L;
volatile long corruptReplicaBlocksCount = 0L;
@@ -329,6 +336,10 @@ public class FSNamesystem implements FSC
this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
conf.get("dfs.hosts.exclude",""));
+ if (isAccessTokenEnabled) {
+ accessTokenHandler = new AccessTokenHandler(true,
+ accessKeyUpdateInterval, accessTokenLifetime);
+ }
this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
conf.getInt("dfs.namenode.decommission.interval", 30),
conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));
@@ -438,6 +449,18 @@ public class FSNamesystem implements FSC
20*(int)(heartbeatInterval/1000));
this.accessTimePrecision = conf.getLong("dfs.access.time.precision", 0);
this.supportAppends = conf.getBoolean("dfs.support.append", false);
+ this.isAccessTokenEnabled = conf.getBoolean(
+ AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false);
+ if (isAccessTokenEnabled) {
+ this.accessKeyUpdateInterval = conf.getLong(
+ AccessTokenHandler.STRING_ACCESS_KEY_UPDATE_INTERVAL, 600) * 60 * 1000L; // 10 hrs
+ this.accessTokenLifetime = conf.getLong(
+ AccessTokenHandler.STRING_ACCESS_TOKEN_LIFETIME, 600) * 60 * 1000L; // 10 hrs
+ }
+ LOG.info("isAccessTokenEnabled=" + isAccessTokenEnabled
+ + " accessKeyUpdateInterval=" + accessKeyUpdateInterval / (60 * 1000)
+ + " min(s), accessTokenLifetime=" + accessTokenLifetime / (60 * 1000)
+ + " min(s)");
}
/**
@@ -654,6 +677,16 @@ public class FSNamesystem implements FSC
}
/**
+ * Get access keys
+ *
+ * @return current access keys
+ */
+ ExportedAccessKeys getAccessKeys() {
+ return isAccessTokenEnabled ? accessTokenHandler.exportKeys()
+ : ExportedAccessKeys.DUMMY_KEYS;
+ }
+
+ /**
* Get all valid locations of the block & add the block to results
* return the length of the added block; 0 if the block is not added
*/
@@ -844,8 +877,13 @@ public class FSNamesystem implements FSC
machineSet[numNodes++] = dn;
}
}
- results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos,
- blockCorrupt));
+ LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
+ blockCorrupt);
+ if (isAccessTokenEnabled) {
+ b.setAccessToken(accessTokenHandler.generateToken(b.getBlock()
+ .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.READ)));
+ }
+ results.add(b);
curPos += blocks[curBlk].getNumBytes();
curBlk++;
} while (curPos < endOff
@@ -1193,6 +1231,10 @@ public class FSNamesystem implements FSC
lb = new LocatedBlock(last, targets,
fileLength-storedBlock.getNumBytes());
+ if (isAccessTokenEnabled) {
+ lb.setAccessToken(accessTokenHandler.generateToken(lb.getBlock()
+ .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+ }
// Remove block from replication queue.
updateNeededReplications(last, 0, 0);
@@ -1302,7 +1344,12 @@ public class FSNamesystem implements FSC
}
// Create next block
- return new LocatedBlock(newBlock, targets, fileLength);
+ LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);
+ if (isAccessTokenEnabled) {
+ b.setAccessToken(accessTokenHandler.generateToken(b.getBlock()
+ .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+ }
+ return b;
}
/**
@@ -2055,6 +2102,7 @@ public class FSNamesystem implements FSC
nodeReg.getInfoPort(),
nodeReg.getIpcPort());
nodeReg.updateRegInfo(dnReg);
+ nodeReg.exportedKeys = getAccessKeys();
NameNode.stateChangeLog.info(
"BLOCK* NameSystem.registerDatanode: "
@@ -2254,7 +2302,7 @@ public class FSNamesystem implements FSC
return new DatanodeCommand[] {cmd};
}
- ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(2);
+ ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);
//check pending replication
cmd = nodeinfo.getReplicationCommand(
maxReplicationStreams - xmitsInProgress);
@@ -2266,6 +2314,11 @@ public class FSNamesystem implements FSC
if (cmd != null) {
cmds.add(cmd);
}
+ // check access key update
+ if (isAccessTokenEnabled && nodeinfo.needKeyUpdate) {
+ cmds.add(new KeyUpdateCommand(accessTokenHandler.exportKeys()));
+ nodeinfo.needKeyUpdate = false;
+ }
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
@@ -2297,21 +2350,44 @@ public class FSNamesystem implements FSC
totalLoad -= node.getXceiverCount();
}
}
+
+ /**
+ * Update access keys.
+ */
+ void updateAccessKey() throws IOException {
+ this.accessTokenHandler.updateKeys();
+ synchronized (heartbeats) {
+ for (DatanodeDescriptor nodeInfo : heartbeats) {
+ nodeInfo.needKeyUpdate = true;
+ }
+ }
+ }
+
/**
- * Periodically calls heartbeatCheck().
+ * Periodically calls heartbeatCheck() and updateAccessKey()
*/
class HeartbeatMonitor implements Runnable {
+ private long lastHeartbeatCheck;
+ private long lastAccessKeyUpdate;
/**
*/
public void run() {
while (fsRunning) {
try {
- heartbeatCheck();
+ long now = now();
+ if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
+ heartbeatCheck();
+ lastHeartbeatCheck = now;
+ }
+ if (isAccessTokenEnabled && (lastAccessKeyUpdate + accessKeyUpdateInterval < now)) {
+ updateAccessKey();
+ lastAccessKeyUpdate = now;
+ }
} catch (Exception e) {
FSNamesystem.LOG.error(StringUtils.stringifyException(e));
}
try {
- Thread.sleep(heartbeatRecheckInterval);
+ Thread.sleep(5000); // 5 seconds
} catch (InterruptedException ie) {
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java Fri Mar 4 03:39:02 2011
@@ -116,7 +116,7 @@ public class JspHelper {
return chosenNode;
}
public void streamBlockInAscii(InetSocketAddress addr, long blockId,
- long genStamp, long blockSize,
+ AccessToken accessToken, long genStamp, long blockSize,
long offsetIntoBlock, long chunkSizeToView, JspWriter out)
throws IOException {
if (chunkSizeToView == 0) return;
@@ -129,7 +129,7 @@ public class JspHelper {
// Use the block name for file name.
DFSClient.BlockReader blockReader =
DFSClient.BlockReader.newBlockReader(s, addr.toString() + ":" + blockId,
- blockId, genStamp ,offsetIntoBlock,
+ blockId, accessToken, genStamp ,offsetIntoBlock,
amtToRead,
conf.getInt("io.file.buffer.size",
4096));
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Mar 4 03:39:02 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
@@ -47,6 +48,8 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.AccessKey;
+import org.apache.hadoop.security.ExportedAccessKeys;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -729,6 +732,11 @@ public class NameNode implements ClientP
}
}
+ /** {@inheritDoc} */
+ public ExportedAccessKeys getAccessKeys() throws IOException {
+ return namesystem.getAccessKeys();
+ }
+
/**
*/
public void errorReport(DatanodeRegistration nodeReg,
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Fri Mar 4 03:39:02 2011
@@ -425,6 +425,7 @@ public class NamenodeFsck {
DFSClient.BlockReader.newBlockReader(s, targetAddr.toString() + ":" +
block.getBlockId(),
block.getBlockId(),
+ lblock.getAccessToken(),
block.getGenerationStamp(),
0, -1,
conf.getInt("io.file.buffer.size", 4096));
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Fri Mar 4 03:39:02 2011
@@ -35,10 +35,10 @@ import org.apache.hadoop.ipc.VersionedPr
**********************************************************************/
public interface DatanodeProtocol extends VersionedProtocol {
/**
- * 19: SendHeartbeat returns an array of DatanodeCommand objects
- * in stead of a DatanodeCommand object.
+ * 20: SendHeartbeat may return KeyUpdateCommand
+ * Register returns access keys inside DatanodeRegistration object
*/
- public static final long versionID = 19L;
+ public static final long versionID = 20L;
// error code
final static int NOTIFY = 0;
@@ -57,6 +57,7 @@ public interface DatanodeProtocol extend
final static int DNA_REGISTER = 4; // re-register
final static int DNA_FINALIZE = 5; // finalize previous upgrade
final static int DNA_RECOVERBLOCK = 6; // request a block recovery
+ final static int DNA_ACCESSKEYUPDATE = 7; // update access key
/**
* Register Datanode.
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java?rev=1077086&r1=1077085&r2=1077086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java Fri Mar 4 03:39:02 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.security.ExportedAccessKeys;
/**
* DatanodeRegistration class conatins all information the Namenode needs
@@ -46,6 +47,7 @@ public class DatanodeRegistration extend
}
public StorageInfo storageInfo;
+ public ExportedAccessKeys exportedKeys;
/**
* Default constructor.
@@ -60,6 +62,7 @@ public class DatanodeRegistration extend
public DatanodeRegistration(String nodeName) {
super(nodeName);
this.storageInfo = new StorageInfo();
+ this.exportedKeys = new ExportedAccessKeys();
}
public void setInfoPort(int infoPort) {
@@ -112,6 +115,7 @@ public class DatanodeRegistration extend
out.writeInt(storageInfo.getLayoutVersion());
out.writeInt(storageInfo.getNamespaceID());
out.writeLong(storageInfo.getCTime());
+ exportedKeys.write(out);
}
/** {@inheritDoc} */
@@ -124,5 +128,6 @@ public class DatanodeRegistration extend
storageInfo.layoutVersion = in.readInt();
storageInfo.namespaceID = in.readInt();
storageInfo.cTime = in.readLong();
+ exportedKeys.readFields(in);
}
}