You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/08/16 17:59:30 UTC

[hadoop] 02/04: HDFS-13617. Allow wrapping NN QOP into token in encrypted message. Contributed by Chen Liang

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit ca15968d09b64346f36e8bd3c24d4e89ea6b1b68
Author: Chen Liang <cl...@apache.org>
AuthorDate: Wed Feb 13 12:40:31 2019 -0800

    HDFS-13617. Allow wrapping NN QOP into token in encrypted message. Contributed by Chen Liang
---
 .../main/java/org/apache/hadoop/ipc/Server.java    |  11 +-
 .../org/apache/hadoop/security/token/Token.java    |  21 +++
 .../hadoop-common/src/main/proto/Security.proto    |   1 +
 .../hadoop/hdfs/protocolPB/PBHelperClient.java     |  16 +-
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   4 +
 .../token/block/BlockTokenSecretManager.java       |  16 ++
 .../hdfs/server/namenode/NameNodeRpcServer.java    |  67 +++++++-
 .../src/main/resources/hdfs-default.xml            |  11 ++
 .../hadoop/hdfs/TestBlockTokenWrappingQOP.java     | 190 +++++++++++++++++++++
 9 files changed, 322 insertions(+), 15 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 34141e0..afc8379 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -1782,6 +1782,7 @@ public abstract class Server {
     IpcConnectionContextProto connectionContext;
     String protocolName;
     SaslServer saslServer;
+    private String establishedQOP;
     private AuthMethod authMethod;
     private AuthProtocol authProtocol;
     private boolean saslContextEstablished;
@@ -1859,14 +1860,7 @@ public abstract class Server {
     }
 
     public String getEstablishedQOP() {
-      // In practice, saslServer should not be null when this is
-      // called. If it is null, it must be either some
-      // configuration mistake or it is called from unit test.
-      if (saslServer == null) {
-        LOG.warn("SASL server should not be null!");
-        return null;
-      }
-      return (String)saslServer.getNegotiatedProperty(Sasl.QOP);
+      return establishedQOP;
     }
     
     public void setLastContact(long lastContact) {
@@ -2027,6 +2021,7 @@ public abstract class Server {
       // do NOT enable wrapping until the last auth response is sent
       if (saslContextEstablished) {
         String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+        establishedQOP = qop;
         // SASL wrapping is only used if the connection has a QOP, and
         // the value is not auth.  ex. auth-int & auth-priv
         useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java
index f02e9d6..c8a10cc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java
@@ -56,6 +56,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
   private Text kind;
   private Text service;
   private TokenRenewer renewer;
+  private byte[] dnHandshakeSecret;
 
   /**
    * Construct a token given a token identifier and a secret manager for the
@@ -68,6 +69,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
     identifier = id.getBytes();
     kind = id.getKind();
     service = new Text();
+    dnHandshakeSecret = new byte[0];
   }
 
   /**
@@ -82,6 +84,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
     this.password = (password == null)? new byte[0] : password;
     this.kind = (kind == null)? new Text() : kind;
     this.service = (service == null)? new Text() : service;
+    this.dnHandshakeSecret = new byte[0];
   }
 
   /**
@@ -92,6 +95,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
     password = new byte[0];
     kind = new Text();
     service = new Text();
+    dnHandshakeSecret = new byte[0];
   }
 
   /**
@@ -103,6 +107,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
     this.password = other.password.clone();
     this.kind = new Text(other.kind);
     this.service = new Text(other.service);
+    this.dnHandshakeSecret = other.dnHandshakeSecret.clone();
   }
 
   public Token<T> copyToken() {
@@ -118,6 +123,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
     this.password = tokenPB.getPassword().toByteArray();
     this.kind = new Text(tokenPB.getKindBytes().toByteArray());
     this.service = new Text(tokenPB.getServiceBytes().toByteArray());
+    this.dnHandshakeSecret = new byte[0];
   }
 
   /**
@@ -143,6 +149,14 @@ public class Token<T extends TokenIdentifier> implements Writable {
     return identifier;
   }
 
+  public byte[] getDnHandshakeSecret() {
+    return dnHandshakeSecret;
+  }
+
+  public void setDNHandshakeSecret(byte[] secret) {
+    this.dnHandshakeSecret = secret;
+  }
+
   private static Class<? extends TokenIdentifier>
       getClassForIdentifier(Text kind) {
     Class<? extends TokenIdentifier> cls = null;
@@ -337,6 +351,11 @@ public class Token<T extends TokenIdentifier> implements Writable {
     in.readFully(password);
     kind.readFields(in);
     service.readFields(in);
+    len = WritableUtils.readVInt(in);
+    if (dnHandshakeSecret == null || dnHandshakeSecret.length != len) {
+      dnHandshakeSecret = new byte[len];
+    }
+    in.readFully(dnHandshakeSecret);
   }
 
   @Override
@@ -347,6 +366,8 @@ public class Token<T extends TokenIdentifier> implements Writable {
     out.write(password);
     kind.write(out);
     service.write(out);
+    WritableUtils.writeVInt(out, dnHandshakeSecret.length);
+    out.write(dnHandshakeSecret);
   }
 
   /**
diff --git a/hadoop-common-project/hadoop-common/src/main/proto/Security.proto b/hadoop-common-project/hadoop-common/src/main/proto/Security.proto
index 037a878..4cf4520 100644
--- a/hadoop-common-project/hadoop-common/src/main/proto/Security.proto
+++ b/hadoop-common-project/hadoop-common/src/main/proto/Security.proto
@@ -36,6 +36,7 @@ message TokenProto {
   required bytes password = 2;
   required string kind = 3;
   required string service = 4;
+  optional bytes handshakeSecret = 5;
 }
 
 message CredentialsKVProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 4b966bb..829d3ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -349,11 +349,16 @@ public class PBHelperClient {
   }
 
   public static TokenProto convert(Token<?> tok) {
-    return TokenProto.newBuilder().
+    TokenProto.Builder builder = TokenProto.newBuilder().
         setIdentifier(getByteString(tok.getIdentifier())).
         setPassword(getByteString(tok.getPassword())).
         setKindBytes(getFixedByteString(tok.getKind())).
-        setServiceBytes(getFixedByteString(tok.getService())).build();
+        setServiceBytes(getFixedByteString(tok.getService()));
+    if (tok.getDnHandshakeSecret() != null) {
+      builder.setHandshakeSecret(
+          ByteString.copyFrom(tok.getDnHandshakeSecret()));
+    }
+    return builder.build();
   }
 
   public static ShortCircuitShmIdProto convert(ShmId shmId) {
@@ -826,9 +831,14 @@ public class PBHelperClient {
 
   public static Token<BlockTokenIdentifier> convert(
       TokenProto blockToken) {
-    return new Token<>(blockToken.getIdentifier()
+    Token<BlockTokenIdentifier> token =
+        new Token<>(blockToken.getIdentifier()
         .toByteArray(), blockToken.getPassword().toByteArray(), new Text(
         blockToken.getKind()), new Text(blockToken.getService()));
+    if (blockToken.hasHandshakeSecret()) {
+      token.setDNHandshakeSecret(blockToken.getHandshakeSecret().toByteArray());
+    }
+    return token;
   }
 
   // DatanodeId
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 291bb13..b05c08d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1284,6 +1284,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY =
       HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
 
+  public static final String DFS_NAMENODE_SEND_QOP_ENABLED =
+      "dfs.namenode.send.qop.enabled";
+  public static final boolean DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT = false;
+
   // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry 
   @Deprecated
   public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
index 8e41214..757cfb0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
@@ -523,6 +523,22 @@ public class BlockTokenSecretManager extends
     return createPassword(nonce, key.getKey());
   }
 
+  /**
+   * Encrypt the given message with the current block key, using the current
+   * block key.
+   *
+   * @param message the message to be encrypted.
+   * @return the secret created by encrypting the given message.
+   */
+  public byte[] secretGen(byte[] message) {
+    return createPassword(message, currentKey.getKey());
+  }
+
+  @VisibleForTesting
+  public BlockKey getCurrentKey() {
+    return currentKey;
+  }
+
   @VisibleForTesting
   public synchronized void setKeyUpdateIntervalForTesting(long millis) {
     this.keyUpdateInterval = millis;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index c8579b7..747bb92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -27,10 +27,13 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
 import static org.apache.hadoop.util.Time.now;
 
+import com.google.common.base.Charsets;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -144,6 +147,8 @@ import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -264,6 +269,8 @@ public class NameNodeRpcServer implements NamenodeProtocols {
 
   private final String defaultECPolicyName;
 
+  private final boolean shouldSendQOP;
+
   public NameNodeRpcServer(Configuration conf, NameNode nn)
       throws IOException {
     this.nn = nn;
@@ -546,6 +553,8 @@ public class NameNodeRpcServer implements NamenodeProtocols {
         this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
       }
     }
+    this.shouldSendQOP = conf.getBoolean(
+        DFS_NAMENODE_SEND_QOP_ENABLED, DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT);
   }
 
   /** Allow access to the lifeline RPC server for testing */
@@ -751,8 +760,14 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       throws IOException {
     checkNNStartup();
     metrics.incrGetBlockLocations();
-    return namesystem.getBlockLocations(getClientMachine(), 
-                                        src, offset, length);
+    LocatedBlocks locatedBlocks =
+        namesystem.getBlockLocations(getClientMachine(), src, offset, length);
+    if (shouldSendQOP) {
+      for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+        wrapEstablishedQOP(lb, getEstablishedClientQOP());
+      }
+    }
+    return locatedBlocks;
   }
   
   @Override // ClientProtocol
@@ -825,6 +840,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       RetryCache.setState(cacheEntry, success, info);
     }
     metrics.incrFilesAppended();
+    if (shouldSendQOP) {
+      wrapEstablishedQOP(info.getLastBlock(), getEstablishedClientQOP());
+    }
     return info;
   }
 
@@ -893,6 +911,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     if (locatedBlock != null) {
       metrics.incrAddBlockOps();
     }
+    if (shouldSendQOP) {
+      wrapEstablishedQOP(locatedBlock, getEstablishedClientQOP());
+    }
     return locatedBlock;
   }
 
@@ -923,8 +944,13 @@ public class NameNodeRpcServer implements NamenodeProtocols {
         excludeSet.add(node);
       }
     }
-    return namesystem.getAdditionalDatanode(src, fileId, blk, existings,
-        existingStorageIDs, excludeSet, numAdditionalNodes, clientName);
+    LocatedBlock locatedBlock = namesystem.getAdditionalDatanode(src, fileId,
+        blk, existings, existingStorageIDs, excludeSet, numAdditionalNodes,
+        clientName);
+    if (shouldSendQOP) {
+      wrapEstablishedQOP(locatedBlock, getEstablishedClientQOP());
+    }
+    return locatedBlock;
   }
   /**
    * The client needs to give up on the block.
@@ -1844,6 +1870,17 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     return clientMachine;
   }
 
+  /**
+   * Return the QOP of the client that the current handler thread
+   * is handling. Assuming the negotiation is done at this point,
+   * otherwise returns null.
+   *
+   * @return the established QOP of this client.
+   */
+  private static String getEstablishedClientQOP() {
+    return Server.getEstablishedQOP();
+  }
+
   @Override
   public DataEncryptionKey getDataEncryptionKey() throws IOException {
     checkNNStartup();
@@ -2594,4 +2631,26 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     }
     return namesystem.getBlockManager().getSPSManager().getNextPathId();
   }
+
+
+  /**
+   * Wrapping the QOP information into the LocatedBlock instance.
+   * The wrapped QOP will be used by DataNode, i.e. DataNode will simply use
+   * this QOP to accept client calls, because this this QOP is viewed
+   * as the QOP that NameNode has accepted.
+   *
+   * @param locatedBlock the LocatedBlock instance
+   * @param qop the QOP to wrap in
+   * @throws RuntimeException
+   */
+  private void wrapEstablishedQOP(LocatedBlock locatedBlock, String qop) {
+    if (qop == null || locatedBlock == null) {
+      return;
+    }
+    BlockTokenSecretManager btsm = namesystem.getBlockManager()
+        .getBlockTokenSecretManager();
+    Token<BlockTokenIdentifier> token = locatedBlock.getBlockToken();
+    byte[] secret = btsm.secretGen(qop.getBytes(Charsets.UTF_8));
+    token.setDNHandshakeSecret(secret);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index bff6ac8..165c804 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -5236,6 +5236,17 @@
   </property>
 
   <property>
+    <name>dfs.namenode.send.qop.enabled</name>
+    <value>false</value>
+    <description>
+      A boolean specifies whether NameNode should encrypt the established QOP
+      and include it in block token. The encrypted QOP will be used by DataNode
+      as target QOP, overwriting DataNode configuration. This ensures DataNode
+      will use exactly the same QOP NameNode and client has already agreed on.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.namenode.blockreport.queue.size</name>
     <value>1024</value>
     <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java
new file mode 100644
index 0000000..ea7ab97
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import javax.crypto.Mac;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
+import org.apache.hadoop.hdfs.security.token.block.BlockKey;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.security.TestPermission;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.junit.Assert.*;
+
+
+/**
+ * This tests enabling NN sending the established QOP back to client,
+ * in encrypted message, using block access token key.
+ */
+@RunWith(Parameterized.class)
+public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
+  public static final Log LOG = LogFactory.getLog(TestPermission.class);
+
+  private HdfsConfiguration conf;
+  private MiniDFSCluster cluster;
+  private String encryptionAlgorithm;
+  private DistributedFileSystem dfs;
+
+  private String configKey;
+  private String qopValue;
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> qopSettings() {
+    // if configured with privacy, the negotiated QOP should auth-conf
+    // similarly for the other two
+    return Arrays.asList(new Object[][] {
+        {"privacy", "auth-conf"},
+        {"integrity", "auth-int"},
+        {"authentication", "auth"}
+    });
+  }
+
+  public TestBlockTokenWrappingQOP(String configKey, String qopValue) {
+    this.configKey = configKey;
+    this.qopValue = qopValue;
+  }
+
+  @Before
+  public void setup() throws Exception {
+    conf = createSecureConfig(this.configKey);
+    conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
+    conf.set(HADOOP_RPC_PROTECTION, this.configKey);
+    cluster = null;
+    encryptionAlgorithm = "HmacSHA1";
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testAddBlockWrappingQOP() throws Exception {
+    final String src = "/testAddBlockWrappingQOP";
+    final Path path = new Path(src);
+
+    dfs = cluster.getFileSystem();
+    dfs.create(path);
+
+    DFSClient client = dfs.getClient();
+    String clientName = client.getClientName();
+
+    LocatedBlock lb = client.namenode.addBlock(src, clientName, null, null,
+        HdfsConstants.GRANDFATHER_INODE_ID, null, null);
+    byte[] secret = lb.getBlockToken().getDnHandshakeSecret();
+    BlockKey currentKey = cluster.getNamesystem().getBlockManager()
+        .getBlockTokenSecretManager().getCurrentKey();
+    String decrypted = decryptMessage(secret, currentKey,
+        encryptionAlgorithm);
+    assertEquals(this.qopValue, decrypted);
+  }
+
+  @Test
+  public void testAppendWrappingQOP() throws Exception {
+    final String src = "/testAppendWrappingQOP";
+    final Path path = new Path(src);
+
+    dfs = cluster.getFileSystem();
+    FSDataOutputStream out = dfs.create(path);
+    // NameNode append call returns a last block instance. If there is nothing
+    // it returns as a null. So write something, so that lastBlock has
+    // something
+    out.write(0);
+    out.close();
+
+    DFSClient client = dfs.getClient();
+    String clientName = client.getClientName();
+
+    LastBlockWithStatus lastBlock = client.namenode.append(src, clientName,
+        new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
+
+    byte[] secret = lastBlock.getLastBlock().getBlockToken()
+        .getDnHandshakeSecret();
+    BlockKey currentKey = cluster.getNamesystem().getBlockManager()
+        .getBlockTokenSecretManager().getCurrentKey();
+    String decrypted = decryptMessage(secret, currentKey,
+        encryptionAlgorithm);
+    assertEquals(this.qopValue, decrypted);
+  }
+
+  @Test
+  public void testGetBlockLocationWrappingQOP() throws Exception {
+    final String src = "/testGetBlockLocationWrappingQOP";
+    final Path path = new Path(src);
+
+    dfs = cluster.getFileSystem();
+    FSDataOutputStream out = dfs.create(path);
+    // if the file is empty, there will be no blocks returned. Write something
+    // so that getBlockLocations actually returns some block.
+    out.write(0);
+    out.close();
+
+    FileStatus status = dfs.getFileStatus(path);
+    DFSClient client = dfs.getClient();
+    LocatedBlocks lbs = client.namenode.getBlockLocations(
+        src, 0, status.getLen());
+
+    assertTrue(lbs.getLocatedBlocks().size() > 0);
+
+    BlockKey currentKey = cluster.getNamesystem().getBlockManager()
+        .getBlockTokenSecretManager().getCurrentKey();
+    for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+      byte[] secret = lb.getBlockToken().getDnHandshakeSecret();
+      String decrypted = decryptMessage(secret, currentKey,
+          encryptionAlgorithm);
+      assertEquals(this.qopValue, decrypted);
+    }
+  }
+
+  private String decryptMessage(byte[] secret, BlockKey key,
+      String algorithm) throws Exception {
+    String[] qops = {"auth", "auth-conf", "auth-int"};
+    Mac mac = Mac.getInstance(algorithm);
+    mac.init(key.getKey());
+    for (String qop : qops) {
+      byte[] encrypted = mac.doFinal(qop.getBytes());
+      if (Arrays.equals(encrypted, secret)) {
+        return qop;
+      }
+    }
+    return null;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org