You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2017/02/13 19:30:52 UTC
hadoop git commit: HDFS-11026. Convert BlockTokenIdentifier to use
Protobuf. Contributed by Ewan Higgs.
Repository: hadoop
Updated Branches:
refs/heads/trunk 646c6d650 -> 4ed33e9ca
HDFS-11026. Convert BlockTokenIdentifier to use Protobuf. Contributed by Ewan Higgs.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ed33e9c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ed33e9c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ed33e9c
Branch: refs/heads/trunk
Commit: 4ed33e9ca3d85568e3904753a3ef61a85f801838
Parents: 646c6d6
Author: Chris Douglas <cd...@apache.org>
Authored: Mon Feb 13 11:27:48 2017 -0800
Committer: Chris Douglas <cd...@apache.org>
Committed: Mon Feb 13 11:29:18 2017 -0800
----------------------------------------------------------------------
.../hadoop/hdfs/protocolPB/PBHelperClient.java | 51 ++++
.../token/block/BlockTokenIdentifier.java | 89 +++++-
.../src/main/proto/hdfs.proto | 33 +++
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 +
.../token/block/BlockTokenSecretManager.java | 18 +-
.../hadoop/hdfs/server/balancer/KeyManager.java | 6 +-
.../server/blockmanagement/BlockManager.java | 9 +-
.../hadoop/hdfs/server/datanode/DataNode.java | 5 +-
.../src/main/resources/hdfs-default.xml | 9 +
.../security/token/block/TestBlockToken.java | 297 +++++++++++++++++--
10 files changed, 480 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 0180828..ad80bc2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -121,9 +121,11 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmI
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AccessModeProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTypeProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenSecretProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CryptoProtocolVersionProto;
@@ -584,6 +586,55 @@ public class PBHelperClient {
return blockTokens;
}
+ public static AccessModeProto convert(BlockTokenIdentifier.AccessMode aMode) {
+ switch (aMode) {
+ case READ: return AccessModeProto.READ;
+ case WRITE: return AccessModeProto.WRITE;
+ case COPY: return AccessModeProto.COPY;
+ case REPLACE: return AccessModeProto.REPLACE;
+ default:
+ throw new IllegalArgumentException("Unexpected AccessMode: " + aMode);
+ }
+ }
+
+ public static BlockTokenIdentifier.AccessMode convert(
+ AccessModeProto accessModeProto) {
+ switch (accessModeProto) {
+ case READ: return BlockTokenIdentifier.AccessMode.READ;
+ case WRITE: return BlockTokenIdentifier.AccessMode.WRITE;
+ case COPY: return BlockTokenIdentifier.AccessMode.COPY;
+ case REPLACE: return BlockTokenIdentifier.AccessMode.REPLACE;
+ default:
+ throw new IllegalArgumentException("Unexpected AccessModeProto: " +
+ accessModeProto);
+ }
+ }
+
+ public static BlockTokenSecretProto convert(
+ BlockTokenIdentifier blockTokenSecret) {
+ BlockTokenSecretProto.Builder builder =
+ BlockTokenSecretProto.newBuilder();
+ builder.setExpiryDate(blockTokenSecret.getExpiryDate());
+ builder.setKeyId(blockTokenSecret.getKeyId());
+ String userId = blockTokenSecret.getUserId();
+ if (userId != null) {
+ builder.setUserId(userId);
+ }
+
+ String blockPoolId = blockTokenSecret.getBlockPoolId();
+ if (blockPoolId != null) {
+ builder.setBlockPoolId(blockPoolId);
+ }
+
+ builder.setBlockId(blockTokenSecret.getBlockId());
+
+ for (BlockTokenIdentifier.AccessMode aMode :
+ blockTokenSecret.getAccessModes()) {
+ builder.addModes(convert(aMode));
+ }
+ return builder.build();
+ }
+
static public DatanodeInfo convert(DatanodeInfoProto di) {
if (di == null) {
return null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
index 3f2c9ca..28e7acc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
@@ -19,11 +19,16 @@
package org.apache.hadoop.hdfs.security.token.block;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.EnumSet;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AccessModeProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenSecretProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -44,20 +49,22 @@ public class BlockTokenIdentifier extends TokenIdentifier {
private String blockPoolId;
private long blockId;
private final EnumSet<AccessMode> modes;
+ private boolean useProto;
private byte [] cache;
public BlockTokenIdentifier() {
- this(null, null, 0, EnumSet.noneOf(AccessMode.class));
+ this(null, null, 0, EnumSet.noneOf(AccessMode.class), false);
}
public BlockTokenIdentifier(String userId, String bpid, long blockId,
- EnumSet<AccessMode> modes) {
+ EnumSet<AccessMode> modes, boolean useProto) {
this.cache = null;
this.userId = userId;
this.blockPoolId = bpid;
this.blockId = blockId;
this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
+ this.useProto = useProto;
}
@Override
@@ -144,9 +151,45 @@ public class BlockTokenIdentifier extends TokenIdentifier {
^ (blockPoolId == null ? 0 : blockPoolId.hashCode());
}
+ /**
+ * readFields peeks at the first byte of the DataInput and determines if it
+ * was written using WritableUtils ("Legacy") or Protobuf. We can do this
+ * because we know the first field is the Expiry date.
+ *
+ * In the case of the legacy buffer, the expiry date is a VInt, so the size
+ * (which should always be >1) is encoded in the first byte - which is
+ * always negative due to this encoding. However, there are sometimes null
+ * BlockTokenIdentifier written so we also need to handle the case there
+ * the first byte is also 0.
+ *
+ * In the case of protobuf, the first byte is a type tag for the expiry date
+ * which is written as <code>(field_number << 3 | wire_type</code>.
+ * So as long as the field_number is less than 16, but also positive, then
+ * we know we have a Protobuf.
+ *
+ * @param in <code>DataInput</code> to deserialize this object from.
+ * @throws IOException
+ */
@Override
public void readFields(DataInput in) throws IOException {
this.cache = null;
+
+ final DataInputStream dis = (DataInputStream)in;
+ if (!dis.markSupported()) {
+ throw new IOException("Could not peek first byte.");
+ }
+ dis.mark(1);
+ final byte firstByte = dis.readByte();
+ dis.reset();
+ if (firstByte <= 0) {
+ readFieldsLegacy(dis);
+ } else {
+ readFieldsProtobuf(dis);
+ }
+ }
+
+ @VisibleForTesting
+ void readFieldsLegacy(DataInput in) throws IOException {
expiryDate = WritableUtils.readVLong(in);
keyId = WritableUtils.readVInt(in);
userId = WritableUtils.readString(in);
@@ -157,10 +200,44 @@ public class BlockTokenIdentifier extends TokenIdentifier {
for (int i = 0; i < length; i++) {
modes.add(WritableUtils.readEnum(in, AccessMode.class));
}
+ useProto = false;
+ }
+
+ @VisibleForTesting
+ void readFieldsProtobuf(DataInput in) throws IOException {
+ BlockTokenSecretProto blockTokenSecretProto =
+ BlockTokenSecretProto.parseFrom((DataInputStream)in);
+ expiryDate = blockTokenSecretProto.getExpiryDate();
+ keyId = blockTokenSecretProto.getKeyId();
+ if (blockTokenSecretProto.hasUserId()) {
+ userId = blockTokenSecretProto.getUserId();
+ } else {
+ userId = null;
+ }
+ if (blockTokenSecretProto.hasBlockPoolId()) {
+ blockPoolId = blockTokenSecretProto.getBlockPoolId();
+ } else {
+ blockPoolId = null;
+ }
+ blockId = blockTokenSecretProto.getBlockId();
+ for (int i = 0; i < blockTokenSecretProto.getModesCount(); i++) {
+ AccessModeProto accessModeProto = blockTokenSecretProto.getModes(i);
+ modes.add(PBHelperClient.convert(accessModeProto));
+ }
+ useProto = true;
}
@Override
public void write(DataOutput out) throws IOException {
+ if (useProto) {
+ writeProtobuf(out);
+ } else {
+ writeLegacy(out);
+ }
+ }
+
+ @VisibleForTesting
+ void writeLegacy(DataOutput out) throws IOException {
WritableUtils.writeVLong(out, expiryDate);
WritableUtils.writeVInt(out, keyId);
WritableUtils.writeString(out, userId);
@@ -172,6 +249,12 @@ public class BlockTokenIdentifier extends TokenIdentifier {
}
}
+ @VisibleForTesting
+ void writeProtobuf(DataOutput out) throws IOException {
+ BlockTokenSecretProto secret = PBHelperClient.convert(this);
+ out.write(secret.toByteArray());
+ }
+
@Override
public byte[] getBytes() {
if(cache == null) cache = super.getBytes();
@@ -186,4 +269,4 @@ public class BlockTokenIdentifier extends TokenIdentifier {
return KIND_NAME;
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index 1414120..8a039d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -514,3 +514,36 @@ message RollingUpgradeStatusProto {
message StorageUuidsProto {
repeated string storageUuids = 1;
}
+
+/**
+ * File access permissions mode.
+ */
+enum AccessModeProto {
+ READ = 1;
+ WRITE = 2;
+ COPY = 3;
+ REPLACE = 4;
+}
+
+/**
+ * Secret information for the BlockKeyProto. This is not sent on the wire as
+ * such but is used to pack a byte array and encrypted and put in
+ * BlockKeyProto.bytes
+ * When adding further fields, make sure they are optional as they would
+ * otherwise not be backwards compatible.
+ *
+ * Note: As part of the migration from WritableUtils based tokens (aka "legacy")
+ * to Protocol Buffers, we use the first byte to determine the type. If the
+ * first byte is <=0 then it is a legacy token. This means that when using
+ * protobuf tokens, the the first field sent must have a `field_number` less
+ * than 16 to make sure that the first byte is positive. Otherwise it could be
+ * parsed as a legacy token. See HDFS-11026 for more discussion.
+ */
+message BlockTokenSecretProto {
+ optional uint64 expiryDate = 1;
+ optional uint32 keyId = 2;
+ optional string userId = 3;
+ optional string blockPoolId = 4;
+ optional uint64 blockId = 5;
+ repeated AccessModeProto modes = 6;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 10a521b..cf1d21a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -641,6 +641,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT = 600L;
public static final String DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY = "dfs.block.access.token.lifetime";
public static final long DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT = 600L;
+ public static final String DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE = "dfs.block.access.token.protobuf.enable";
+ public static final boolean DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT = false;
public static final String DFS_BLOCK_REPLICATOR_CLASSNAME_KEY = "dfs.block.replicator.classname";
public static final Class<BlockPlacementPolicyDefault> DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
index ba08740..a3100d0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
@@ -75,6 +75,7 @@ public class BlockTokenSecretManager extends
private final int intRange;
private final int nnRangeStart;
+ private final boolean useProto;
private final SecureRandom nonceGenerator = new SecureRandom();
@@ -83,11 +84,13 @@ public class BlockTokenSecretManager extends
*
* @param keyUpdateInterval how often a new key will be generated
* @param tokenLifetime how long an individual token is valid
+ * @param useProto should we use new protobuf style tokens
*/
public BlockTokenSecretManager(long keyUpdateInterval,
- long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
+ long tokenLifetime, String blockPoolId, String encryptionAlgorithm,
+ boolean useProto) {
this(false, keyUpdateInterval, tokenLifetime, blockPoolId,
- encryptionAlgorithm, 0, 1);
+ encryptionAlgorithm, 0, 1, useProto);
}
/**
@@ -102,8 +105,9 @@ public class BlockTokenSecretManager extends
*/
public BlockTokenSecretManager(long keyUpdateInterval,
long tokenLifetime, int nnIndex, int numNNs, String blockPoolId,
- String encryptionAlgorithm) {
- this(true, keyUpdateInterval, tokenLifetime, blockPoolId, encryptionAlgorithm, nnIndex, numNNs);
+ String encryptionAlgorithm, boolean useProto) {
+ this(true, keyUpdateInterval, tokenLifetime, blockPoolId,
+ encryptionAlgorithm, nnIndex, numNNs, useProto);
Preconditions.checkArgument(nnIndex >= 0);
Preconditions.checkArgument(numNNs > 0);
setSerialNo(new SecureRandom().nextInt());
@@ -111,7 +115,8 @@ public class BlockTokenSecretManager extends
}
private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
- long tokenLifetime, String blockPoolId, String encryptionAlgorithm, int nnIndex, int numNNs) {
+ long tokenLifetime, String blockPoolId, String encryptionAlgorithm,
+ int nnIndex, int numNNs, boolean useProto) {
this.intRange = Integer.MAX_VALUE / numNNs;
this.nnRangeStart = intRange * nnIndex;
this.isMaster = isMaster;
@@ -120,6 +125,7 @@ public class BlockTokenSecretManager extends
this.allKeys = new HashMap<Integer, BlockKey>();
this.blockPoolId = blockPoolId;
this.encryptionAlgorithm = encryptionAlgorithm;
+ this.useProto = useProto;
generateKeys();
}
@@ -246,7 +252,7 @@ public class BlockTokenSecretManager extends
public Token<BlockTokenIdentifier> generateToken(String userId,
ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes) throws IOException {
BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
- .getBlockPoolId(), block.getBlockId(), modes);
+ .getBlockPoolId(), block.getBlockId(), modes, useProto);
return new Token<BlockTokenIdentifier>(id, this);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
index 1c6b352..0aa6fb2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
@@ -69,8 +69,12 @@ public class KeyManager implements Closeable, DataEncryptionKeyFactory {
+ ", token lifetime=" + StringUtils.formatTime(tokenLifetime));
String encryptionAlgorithm = conf.get(
DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
+ final boolean enableProtobuf = conf.getBoolean(
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE,
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT);
this.blockTokenSecretManager = new BlockTokenSecretManager(
- updateInterval, tokenLifetime, blockpoolID, encryptionAlgorithm);
+ updateInterval, tokenLifetime, blockpoolID, encryptionAlgorithm,
+ enableProtobuf);
this.blockTokenSecretManager.addKeys(keys);
// sync block keys with NN more frequently than NN updates its block keys
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 893b12d..5125b33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -542,6 +542,9 @@ public class BlockManager implements BlockStatsMXBean {
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId);
+ boolean shouldWriteProtobufToken = conf.getBoolean(
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE,
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT);
if (isHaEnabled) {
// figure out which index we are of the nns
@@ -555,10 +558,12 @@ public class BlockManager implements BlockStatsMXBean {
nnIndex++;
}
return new BlockTokenSecretManager(updateMin * 60 * 1000L,
- lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(), null, encryptionAlgorithm);
+ lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(), null,
+ encryptionAlgorithm, shouldWriteProtobufToken);
} else {
return new BlockTokenSecretManager(updateMin*60*1000L,
- lifetimeMin*60*1000L, 0, 1, null, encryptionAlgorithm);
+ lifetimeMin*60*1000L, 0, 1, null, encryptionAlgorithm,
+ shouldWriteProtobufToken);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index a6dfa46..9ed80ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1534,9 +1534,12 @@ public class DataNode extends ReconfigurableBase
+ blockKeyUpdateInterval / (60 * 1000)
+ " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
+ " min(s)");
+ final boolean enableProtobuf = getConf().getBoolean(
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE,
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT);
final BlockTokenSecretManager secretMgr =
new BlockTokenSecretManager(0, blockTokenLifetime, blockPoolId,
- dnConf.encryptionAlgorithm);
+ dnConf.encryptionAlgorithm, enableProtobuf);
blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 2bbc788..03f1a08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -503,6 +503,15 @@
</property>
<property>
+ <name>dfs.block.access.token.protobuf.enable</name>
+ <value>false</value>
+ <description>
+ If "true", block tokens are written using Protocol Buffers.
+ If "false", block tokens are written using Legacy format.
+ </description>
+</property>
+
+<property>
<name>dfs.datanode.data.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/data</value>
<description>Determines where on the local filesystem an DFS data node
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
index 55e9d30..ecb63ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.security.token.block;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -31,7 +32,10 @@ import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Calendar;
import java.util.EnumSet;
+import java.util.GregorianCalendar;
import java.util.Set;
import org.apache.commons.logging.Log;
@@ -57,6 +61,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetRep
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.TestWritable;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -104,7 +110,7 @@ public class TestBlockToken {
final ExtendedBlock block1 = new ExtendedBlock("0", 0L);
final ExtendedBlock block2 = new ExtendedBlock("10", 10L);
final ExtendedBlock block3 = new ExtendedBlock("-10", -108L);
-
+
@Before
public void disableKerberos() {
Configuration conf = new Configuration();
@@ -128,7 +134,7 @@ public class TestBlockToken {
InvocationOnMock invocation) throws IOException {
Object args[] = invocation.getArguments();
assertEquals(2, args.length);
- GetReplicaVisibleLengthRequestProto req =
+ GetReplicaVisibleLengthRequestProto req =
(GetReplicaVisibleLengthRequestProto) args[1];
Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
.getTokenIdentifiers();
@@ -158,11 +164,11 @@ public class TestBlockToken {
return id;
}
- @Test
- public void testWritable() throws Exception {
+ private void testWritable(boolean enableProtobuf) throws Exception {
TestWritable.testWritable(new BlockTokenIdentifier());
BlockTokenSecretManager sm = new BlockTokenSecretManager(
- blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
+ blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
+ enableProtobuf);
TestWritable.testWritable(generateTokenId(sm, block1,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class)));
TestWritable.testWritable(generateTokenId(sm, block2,
@@ -171,6 +177,16 @@ public class TestBlockToken {
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class)));
}
+ @Test
+ public void testWritableLegacy() throws Exception {
+ testWritable(false);
+ }
+
+ @Test
+ public void testWritableProtobuf() throws Exception {
+ testWritable(true);
+ }
+
private void tokenGenerationAndVerification(BlockTokenSecretManager master,
BlockTokenSecretManager slave) throws Exception {
// single-mode tokens
@@ -198,12 +214,14 @@ public class TestBlockToken {
}
/** test block key and token handling */
- @Test
- public void testBlockTokenSecretManager() throws Exception {
+ private void testBlockTokenSecretManager(boolean enableProtobuf)
+ throws Exception {
BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
- blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
+ blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
+ enableProtobuf);
BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
- blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null);
+ blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null,
+ enableProtobuf);
ExportedBlockKeys keys = masterHandler.exportKeys();
slaveHandler.addKeys(keys);
tokenGenerationAndVerification(masterHandler, slaveHandler);
@@ -215,6 +233,16 @@ public class TestBlockToken {
tokenGenerationAndVerification(masterHandler, slaveHandler);
}
+ @Test
+ public void testBlockTokenSecretManagerLegacy() throws Exception {
+ testBlockTokenSecretManager(false);
+ }
+
+ @Test
+ public void testBlockTokenSecretManagerProtobuf() throws Exception {
+ testBlockTokenSecretManager(true);
+ }
+
private static Server createMockDatanode(BlockTokenSecretManager sm,
Token<BlockTokenIdentifier> token, Configuration conf)
throws IOException, ServiceException {
@@ -223,7 +251,7 @@ public class TestBlockToken {
BlockTokenIdentifier id = sm.createIdentifier();
id.readFields(new DataInputStream(new ByteArrayInputStream(token
.getIdentifier())));
-
+
doAnswer(new GetLengthAnswer(sm, id)).when(mockDN)
.getReplicaVisibleLength(any(RpcController.class),
any(GetReplicaVisibleLengthRequestProto.class));
@@ -237,14 +265,14 @@ public class TestBlockToken {
.setNumHandlers(5).setVerbose(true).setSecretManager(sm).build();
}
- @Test
- public void testBlockTokenRpc() throws Exception {
+ private void testBlockTokenRpc(boolean enableProtobuf) throws Exception {
Configuration conf = new Configuration();
conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(conf);
-
+
BlockTokenSecretManager sm = new BlockTokenSecretManager(
- blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
+ blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
+ enableProtobuf);
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
@@ -270,20 +298,30 @@ public class TestBlockToken {
}
}
+ @Test
+ public void testBlockTokenRpcLegacy() throws Exception {
+ testBlockTokenRpc(false);
+ }
+
+ @Test
+ public void testBlockTokenRpcProtobuf() throws Exception {
+ testBlockTokenRpc(true);
+ }
+
/**
* Test that fast repeated invocations of createClientDatanodeProtocolProxy
* will not end up using up thousands of sockets. This is a regression test
* for HDFS-1965.
*/
- @Test
- public void testBlockTokenRpcLeak() throws Exception {
+ private void testBlockTokenRpcLeak(boolean enableProtobuf) throws Exception {
Configuration conf = new Configuration();
conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(conf);
-
+
Assume.assumeTrue(FD_DIR.exists());
BlockTokenSecretManager sm = new BlockTokenSecretManager(
- blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
+ blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
+ enableProtobuf);
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
@@ -334,6 +372,16 @@ public class TestBlockToken {
RPC.stopProxy(proxyToNoWhere);
}
+ @Test
+ public void testBlockTokenRpcLeakLegacy() throws Exception {
+ testBlockTokenRpcLeak(false);
+ }
+
+ @Test
+ public void testBlockTokenRpcLeakProtobuf() throws Exception {
+ testBlockTokenRpcLeak(true);
+ }
+
/**
* @return the current number of file descriptors open by this process.
*/
@@ -344,17 +392,19 @@ public class TestBlockToken {
/**
* Test {@link BlockPoolTokenSecretManager}
*/
- @Test
- public void testBlockPoolTokenSecretManager() throws Exception {
+ private void testBlockPoolTokenSecretManager(boolean enableProtobuf)
+ throws Exception {
BlockPoolTokenSecretManager bpMgr = new BlockPoolTokenSecretManager();
// Test BlockPoolSecretManager with upto 10 block pools
for (int i = 0; i < 10; i++) {
String bpid = Integer.toString(i);
BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
- blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
+ blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
+ enableProtobuf);
BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
- blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null);
+ blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null,
+ enableProtobuf);
bpMgr.addBlockPool(bpid, slaveHandler);
ExportedBlockKeys keys = masterHandler.exportKeys();
@@ -370,20 +420,31 @@ public class TestBlockToken {
}
}
+ @Test
+ public void testBlockPoolTokenSecretManagerLegacy() throws Exception {
+ testBlockPoolTokenSecretManager(false);
+ }
+
+ @Test
+ public void testBlockPoolTokenSecretManagerProtobuf() throws Exception {
+ testBlockPoolTokenSecretManager(true);
+ }
+
/**
* This test writes a file and gets the block locations without closing the
* file, and tests the block token in the last block. Block token is verified
* by ensuring it is of correct kind.
- *
+ *
* @throws IOException
* @throws InterruptedException
*/
- @Test
- public void testBlockTokenInLastLocatedBlock() throws IOException,
- InterruptedException {
+ private void testBlockTokenInLastLocatedBlock(boolean enableProtobuf)
+ throws IOException, InterruptedException {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512);
+ conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE,
+ enableProtobuf);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
cluster.waitActive();
@@ -411,4 +472,188 @@ public class TestBlockToken {
cluster.shutdown();
}
}
+
+ @Test
+ public void testBlockTokenInLastLocatedBlockLegacy() throws IOException,
+ InterruptedException {
+ testBlockTokenInLastLocatedBlock(false);
+ }
+
+ @Test
+ public void testBlockTokenInLastLocatedBlockProtobuf() throws IOException,
+ InterruptedException {
+ testBlockTokenInLastLocatedBlock(true);
+ }
+
+ @Test
+ public void testLegacyBlockTokenBytesIsLegacy() throws IOException {
+ final boolean useProto = false;
+ BlockTokenSecretManager sm = new BlockTokenSecretManager(
+ blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
+ useProto);
+ Token<BlockTokenIdentifier> token = sm.generateToken(block1,
+ EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class));
+ final byte[] tokenBytes = token.getIdentifier();
+ BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
+ BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
+ BlockTokenIdentifier readToken = new BlockTokenIdentifier();
+
+ DataInputBuffer dib = new DataInputBuffer();
+
+ dib.reset(tokenBytes, tokenBytes.length);
+ legacyToken.readFieldsLegacy(dib);
+
+ boolean invalidProtobufMessage = false;
+ try {
+ dib.reset(tokenBytes, tokenBytes.length);
+ protobufToken.readFieldsProtobuf(dib);
+ } catch (IOException e) {
+ invalidProtobufMessage = true;
+ }
+ assertTrue(invalidProtobufMessage);
+
+ dib.reset(tokenBytes, tokenBytes.length);
+ readToken.readFields(dib);
+
+ // Using legacy, the token parses as a legacy block token and not a protobuf
+ assertEquals(legacyToken, readToken);
+ assertNotEquals(protobufToken, readToken);
+ }
+
+ @Test
+ public void testEmptyLegacyBlockTokenBytesIsLegacy() throws IOException {
+ BlockTokenIdentifier emptyIdent = new BlockTokenIdentifier();
+ DataOutputBuffer dob = new DataOutputBuffer(4096);
+ DataInputBuffer dib = new DataInputBuffer();
+
+ emptyIdent.writeLegacy(dob);
+ byte[] emptyIdentBytes = Arrays.copyOf(dob.getData(), dob.getLength());
+
+ BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
+ BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
+ BlockTokenIdentifier readToken = new BlockTokenIdentifier();
+
+ dib.reset(emptyIdentBytes, emptyIdentBytes.length);
+ legacyToken.readFieldsLegacy(dib);
+
+ boolean invalidProtobufMessage = false;
+ try {
+ dib.reset(emptyIdentBytes, emptyIdentBytes.length);
+ protobufToken.readFieldsProtobuf(dib);
+ } catch (IOException e) {
+ invalidProtobufMessage = true;
+ }
+ assertTrue(invalidProtobufMessage);
+
+ dib.reset(emptyIdentBytes, emptyIdentBytes.length);
+ readToken.readFields(dib);
+ assertTrue(invalidProtobufMessage);
+ }
+
+ @Test
+ public void testProtobufBlockTokenBytesIsProtobuf() throws IOException {
+ final boolean useProto = true;
+ BlockTokenSecretManager sm = new BlockTokenSecretManager(
+ blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
+ useProto);
+ Token<BlockTokenIdentifier> token = sm.generateToken(block1,
+ EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class));
+ final byte[] tokenBytes = token.getIdentifier();
+ BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
+ BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
+ BlockTokenIdentifier readToken = new BlockTokenIdentifier();
+
+ DataInputBuffer dib = new DataInputBuffer();
+
+ /* We receive NegativeArraySizeException because we didn't call
+ * readFields and instead try to parse this directly as a legacy
+ * BlockTokenIdentifier.
+ *
+ * Note: because the parsing depends on the expiryDate which is based on
+ * `Time.now()` it can sometimes fail with IOException and sometimes with
+ * NegativeArraySizeException.
+ */
+ boolean invalidLegacyMessage = false;
+ try {
+ dib.reset(tokenBytes, tokenBytes.length);
+ legacyToken.readFieldsLegacy(dib);
+ } catch (IOException | NegativeArraySizeException e) {
+ invalidLegacyMessage = true;
+ }
+ assertTrue(invalidLegacyMessage);
+
+ dib.reset(tokenBytes, tokenBytes.length);
+ protobufToken.readFieldsProtobuf(dib);
+
+ dib.reset(tokenBytes, tokenBytes.length);
+ readToken.readFields(dib);
+
+ // Using protobuf, the token parses as a protobuf and not a legacy block
+ // token
+ assertNotEquals(legacyToken, readToken);
+ assertEquals(protobufToken, readToken);
+ }
+
+ public void testCraftedProtobufBlockTokenIdentifier(
+ BlockTokenIdentifier identifier, boolean expectIOE,
+ boolean expectRTE) throws IOException {
+ DataOutputBuffer dob = new DataOutputBuffer(4096);
+ DataInputBuffer dib = new DataInputBuffer();
+
+ identifier.writeProtobuf(dob);
+ byte[] identBytes = Arrays.copyOf(dob.getData(), dob.getLength());
+
+ BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
+ BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
+ BlockTokenIdentifier readToken = new BlockTokenIdentifier();
+
+ boolean invalidLegacyMessage = false;
+ try {
+ dib.reset(identBytes, identBytes.length);
+ legacyToken.readFieldsLegacy(dib);
+ } catch (IOException e) {
+ if (!expectIOE) {
+ fail("Received IOException but it was not expected.");
+ }
+ invalidLegacyMessage = true;
+ } catch (RuntimeException e) {
+ if (!expectRTE) {
+ fail("Received RuntimeException but it was not expected.");
+ }
+ invalidLegacyMessage = true;
+ }
+
+ assertTrue(invalidLegacyMessage);
+
+ dib.reset(identBytes, identBytes.length);
+ protobufToken.readFieldsProtobuf(dib);
+
+ dib.reset(identBytes, identBytes.length);
+ readToken.readFieldsProtobuf(dib);
+ assertEquals(protobufToken, readToken);
+ }
+
+ @Test
+ public void testCraftedProtobufBlockTokenBytesIsProtobuf() throws
+ IOException {
+ // Empty BlockTokenIdentifiers throw IOException
+ BlockTokenIdentifier identifier = new BlockTokenIdentifier();
+ testCraftedProtobufBlockTokenIdentifier(identifier, true, false);
+
+ /* Parsing BlockTokenIdentifier with expiryDate
+ * 2017-02-09 00:12:35,072+0100 will throw IOException.
+ * However, expiryDate of
+ * 2017-02-09 00:12:35,071+0100 will throw NegativeArraySizeException.
+ */
+ Calendar cal = new GregorianCalendar();
+ cal.set(2017, 1, 9, 0, 12, 35);
+ long datetime = cal.getTimeInMillis();
+ datetime = ((datetime / 1000) * 1000); // strip milliseconds.
+ datetime = datetime + 71; // 2017-02-09 00:12:35,071+0100
+ identifier.setExpiryDate(datetime);
+ testCraftedProtobufBlockTokenIdentifier(identifier, false, true);
+ datetime += 1; // 2017-02-09 00:12:35,072+0100
+ identifier.setExpiryDate(datetime);
+ testCraftedProtobufBlockTokenIdentifier(identifier, true, false);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org