You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2016/10/05 20:30:01 UTC
hadoop git commit: HDFS-10609. Uncaught InvalidEncryptionKeyException
during pipeline recovery may abort downstream applications. Contributed by
Wei-Chiu Chuang.
Repository: hadoop
Updated Branches:
refs/heads/branch-2.7 599146d10 -> 039c3a735
HDFS-10609. Uncaught InvalidEncryptionKeyException during pipeline recovery may abort downstream applications. Contributed by Wei-Chiu Chuang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/039c3a73
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/039c3a73
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/039c3a73
Branch: refs/heads/branch-2.7
Commit: 039c3a735192ac05209af89c0cc74a27c118a21f
Parents: 599146d
Author: Wei-Chiu Chuang <we...@apache.org>
Authored: Wed Oct 5 13:29:20 2016 -0700
Committer: Wei-Chiu Chuang <we...@apache.org>
Committed: Wed Oct 5 13:29:20 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/hdfs/DFSClient.java | 5 +
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 147 +++-
.../block/BlockPoolTokenSecretManager.java | 3 +-
.../token/block/BlockTokenSecretManager.java | 6 +
.../hadoop/hdfs/server/datanode/DataNode.java | 5 +
.../hadoop/hdfs/TestEncryptedTransfer.java | 719 ++++++++-----------
6 files changed, 421 insertions(+), 464 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 58d93cd..1a6a96b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -2188,6 +2188,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
}
+ @VisibleForTesting
+ public DataEncryptionKey getEncryptionKey() {
+ return encryptionKey;
+ }
+
/**
* Get the checksum of the whole file of a range of the file. Note that the
* range always starts from the beginning of the file.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index f8c8592..ef8aa5a 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -225,6 +225,89 @@ public class DFSOutputStream extends FSOutputSummer
// if them are received, the DataStreamer closes the current block.
//
class DataStreamer extends Daemon {
+ private class RefetchEncryptionKeyPolicy {
+ private int fetchEncryptionKeyTimes = 0;
+ private InvalidEncryptionKeyException lastException;
+ private final DatanodeInfo src;
+
+ RefetchEncryptionKeyPolicy(DatanodeInfo src) {
+ this.src = src;
+ }
+ boolean continueRetryingOrThrow() throws InvalidEncryptionKeyException {
+ if (fetchEncryptionKeyTimes >= 2) {
+ // hit the same exception twice connecting to the node, so
+ // throw the exception and exclude the node.
+ throw lastException;
+ }
+ // Don't exclude this node just yet.
+ // Try again with a new encryption key.
+ DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ + "encryption key was invalid when connecting to "
+ + this.src + ": ", lastException);
+ // The encryption key used is invalid.
+ dfsClient.clearDataEncryptionKey();
+ return true;
+ }
+
+ /**
+ * Record a connection exception.
+ * @param e
+ * @throws InvalidEncryptionKeyException
+ */
+ void recordFailure(final InvalidEncryptionKeyException e)
+ throws InvalidEncryptionKeyException {
+ fetchEncryptionKeyTimes++;
+ lastException = e;
+ }
+ }
+
+ private class StreamerStreams implements java.io.Closeable {
+ private Socket sock = null;
+ private DataOutputStream out = null;
+ private DataInputStream in = null;
+
+ StreamerStreams(final DatanodeInfo src,
+ final long writeTimeout, final long readTimeout,
+ final Token<BlockTokenIdentifier> blockToken)
+ throws IOException {
+ sock = createSocketForPipeline(src, 2, dfsClient);
+
+ OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
+ InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
+ IOStreamPair saslStreams = dfsClient.saslClient
+ .socketSend(sock, unbufOut, unbufIn, dfsClient, blockToken, src);
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
+ out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+ HdfsConstants.SMALL_BUFFER_SIZE));
+ in = new DataInputStream(unbufIn);
+ }
+
+ void sendTransferBlock(final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException {
+ //send the TRANSFER_BLOCK request
+ new Sender(out)
+ .transferBlock(block, blockToken, dfsClient.clientName, targets,
+ targetStorageTypes);
+ out.flush();
+ //ack
+ BlockOpResponseProto transferResponse = BlockOpResponseProto
+ .parseFrom(PBHelper.vintPrefixed(in));
+ if (SUCCESS != transferResponse.getStatus()) {
+ throw new IOException("Failed to add a datanode. Response status: "
+ + transferResponse.getStatus());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.closeStream(in);
+ IOUtils.closeStream(out);
+ IOUtils.closeSocket(sock);
+ }
+ }
+
private volatile boolean streamerClosed = false;
private volatile ExtendedBlock block; // its length is number of bytes acked
private Token<BlockTokenIdentifier> accessToken;
@@ -1010,48 +1093,38 @@ public class DFSOutputStream extends FSOutputSummer
new IOException("Failed to add a node");
}
+ private long computeTransferWriteTimeout() {
+ return dfsClient.getDatanodeWriteTimeout(2);
+ }
+ private long computeTransferReadTimeout() {
+ // transfer timeout multiplier based on the transfer size
+ // One per 200 packets = 12.8MB. Minimum is 2.
+ int multi = 2
+ + (int) (bytesSent / dfsClient.getConf().writePacketSize) / 200;
+ return dfsClient.getDatanodeReadTimeout(multi);
+ }
+
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
//transfer replica to the new datanode
- Socket sock = null;
- DataOutputStream out = null;
- DataInputStream in = null;
- try {
- sock = createSocketForPipeline(src, 2, dfsClient);
- final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
-
- // transfer timeout multiplier based on the transfer size
- // One per 200 packets = 12.8MB. Minimum is 2.
- int multi = 2 + (int)(bytesSent/dfsClient.getConf().writePacketSize)/200;
- final long readTimeout = dfsClient.getDatanodeReadTimeout(multi);
-
- OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
- InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
- IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
- unbufOut, unbufIn, dfsClient, blockToken, src);
- unbufOut = saslStreams.out;
- unbufIn = saslStreams.in;
- out = new DataOutputStream(new BufferedOutputStream(unbufOut,
- HdfsConstants.SMALL_BUFFER_SIZE));
- in = new DataInputStream(unbufIn);
-
- //send the TRANSFER_BLOCK request
- new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
- targets, targetStorageTypes);
- out.flush();
-
- //ack
- BlockOpResponseProto response =
- BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
- if (SUCCESS != response.getStatus()) {
- throw new IOException("Failed to add a datanode");
+ RefetchEncryptionKeyPolicy policy = new RefetchEncryptionKeyPolicy(src);
+ do {
+ StreamerStreams streams = null;
+ try {
+ final long writeTimeout = computeTransferWriteTimeout();
+ final long readTimeout = computeTransferReadTimeout();
+
+ streams = new StreamerStreams(src, writeTimeout, readTimeout,
+ blockToken);
+ streams.sendTransferBlock(targets, targetStorageTypes, blockToken);
+ return;
+ } catch (InvalidEncryptionKeyException e) {
+ policy.recordFailure(e);
+ } finally {
+ IOUtils.closeStream(streams);
}
- } finally {
- IOUtils.closeStream(in);
- IOUtils.closeStream(out);
- IOUtils.closeSocket(sock);
- }
+ } while (policy.continueRetryingOrThrow());
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
index 0df7067..7e3c877 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
@@ -49,7 +49,8 @@ public class BlockPoolTokenSecretManager extends
map.put(bpid, secretMgr);
}
- synchronized BlockTokenSecretManager get(String bpid) {
+ @VisibleForTesting
+ public synchronized BlockTokenSecretManager get(String bpid) {
BlockTokenSecretManager secretMgr = map.get(bpid);
if (secretMgr == null) {
throw new IllegalArgumentException("Block pool " + bpid
http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
index a3685ca..4d4c4bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
@@ -439,6 +439,12 @@ public class BlockTokenSecretManager extends
}
@VisibleForTesting
+ public synchronized boolean hasKey(int keyId) {
+ BlockKey key = allKeys.get(keyId);
+ return key != null;
+ }
+
+ @VisibleForTesting
public synchronized int getSerialNoForTesting() {
return serialNo;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 3b27752..9ef23d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2487,6 +2487,11 @@ public class DataNode extends ReconfigurableBase
}
+ @VisibleForTesting
+ public BlockPoolTokenSecretManager getBlockPoolTokenSecretManager() {
+ return blockPoolTokenSecretManager;
+ }
+
public static void secureMain(String args[], SecureResources resources) {
int errorCode = 0;
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
index 30484d1..0ffa933 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
@@ -21,32 +21,41 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.times;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.TimeoutException;
+import com.google.common.base.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -72,8 +81,12 @@ public class TestEncryptedTransfer {
private static final String PLAIN_TEXT = "this is very secret plain text";
private static final Path TEST_PATH = new Path("/non-encrypted-file");
-
- private void setEncryptionConfigKeys(Configuration conf) {
+
+ private MiniDFSCluster cluster = null;
+ private Configuration conf = null;
+ private FileSystem fs = null;
+
+ private void setEncryptionConfigKeys() {
conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
if (resolverClazz != null){
@@ -96,389 +109,271 @@ public class TestEncryptedTransfer {
this.resolverClazz = resolverClazz;
}
- @Test
- public void testEncryptedRead() throws IOException {
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = new Configuration();
- cluster = new MiniDFSCluster.Builder(conf).build();
-
- FileSystem fs = getFileSystem(conf);
- writeTestDataToFile(fs);
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
+ @Before
+ public void setup() throws IOException {
+ conf = new Configuration();
+ }
+
+ @After
+ public void teardown() throws IOException {
+ if (fs != null) {
fs.close();
+ }
+ if (cluster != null) {
cluster.shutdown();
-
- setEncryptionConfigKeys(conf);
-
- cluster = new MiniDFSCluster.Builder(conf)
- .manageDataDfsDirs(false)
- .manageNameDfsDirs(false)
- .format(false)
- .startupOption(StartupOption.REGULAR)
- .build();
-
- fs = getFileSystem(conf);
- LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
- LogFactory.getLog(SaslDataTransferServer.class));
- LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
- LogFactory.getLog(DataTransferSaslUtil.class));
- try {
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
- } finally {
- logs.stopCapturing();
- logs1.stopCapturing();
- }
-
- fs.close();
-
- if (resolverClazz == null) {
- // Test client and server negotiate cipher option
- GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
- "Server using cipher suite");
- // Check the IOStreamPair
- GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
- "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
- }
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
}
}
-
- @Test
- public void testEncryptedReadWithRC4() throws IOException {
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = new Configuration();
- cluster = new MiniDFSCluster.Builder(conf).build();
-
- FileSystem fs = getFileSystem(conf);
- writeTestDataToFile(fs);
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
- fs.close();
- cluster.shutdown();
-
- setEncryptionConfigKeys(conf);
- // It'll use 3DES by default, but we set it to rc4 here.
- conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, "rc4");
-
- cluster = new MiniDFSCluster.Builder(conf)
- .manageDataDfsDirs(false)
- .manageNameDfsDirs(false)
- .format(false)
- .startupOption(StartupOption.REGULAR)
- .build();
-
- fs = getFileSystem(conf);
- LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
- LogFactory.getLog(SaslDataTransferServer.class));
- LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
- LogFactory.getLog(DataTransferSaslUtil.class));
- try {
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
- } finally {
- logs.stopCapturing();
- logs1.stopCapturing();
- }
- fs.close();
+ private FileChecksum writeUnencryptedAndThenRestartEncryptedCluster()
+ throws IOException {
+ cluster = new MiniDFSCluster.Builder(conf).build();
- if (resolverClazz == null) {
- // Test client and server negotiate cipher option
- GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
- "Server using cipher suite");
- // Check the IOStreamPair
- GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
- "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
- }
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- @Test
- public void testEncryptedReadWithAES() throws IOException {
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = new Configuration();
- conf.set(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY,
- "AES/CTR/NoPadding");
- cluster = new MiniDFSCluster.Builder(conf).build();
+ fs = getFileSystem(conf);
+ writeTestDataToFile(fs);
+ assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+ FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
+ fs.close();
+ cluster.shutdown();
- FileSystem fs = getFileSystem(conf);
- writeTestDataToFile(fs);
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
- fs.close();
- cluster.shutdown();
+ setEncryptionConfigKeys();
- setEncryptionConfigKeys(conf);
+ cluster = new MiniDFSCluster.Builder(conf)
+ .manageDataDfsDirs(false)
+ .manageNameDfsDirs(false)
+ .format(false)
+ .startupOption(StartupOption.REGULAR)
+ .build();
- cluster = new MiniDFSCluster.Builder(conf)
- .manageDataDfsDirs(false)
- .manageNameDfsDirs(false)
- .format(false)
- .startupOption(StartupOption.REGULAR)
- .build();
+ fs = getFileSystem(conf);
+ return checksum;
+ }
- fs = getFileSystem(conf);
- LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
- LogFactory.getLog(SaslDataTransferServer.class));
- LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
- LogFactory.getLog(DataTransferSaslUtil.class));
- try {
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
- } finally {
- logs.stopCapturing();
- logs1.stopCapturing();
- }
+ public void testEncryptedRead(String algorithm, String cipherSuite,
+ boolean matchLog, boolean readAfterRestart) throws IOException {
+ // set encryption algorithm and cipher suites, but don't enable transfer
+ // encryption yet.
+ conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, algorithm);
+ conf.set(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY,
+ cipherSuite);
- fs.close();
+ FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster();
+
+ LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+ LogFactory.getLog(SaslDataTransferServer.class));
+ LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
+ LogFactory.getLog(DataTransferSaslUtil.class));
+ try {
+ assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+ assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+ } finally {
+ logs.stopCapturing();
+ logs1.stopCapturing();
+ }
- if (resolverClazz == null) {
+ if (resolverClazz == null) {
+ if (matchLog) {
// Test client and server negotiate cipher option
- GenericTestUtils.assertMatches(logs.getOutput(),
- "Server using cipher suite");
+ GenericTestUtils
+ .assertMatches(logs.getOutput(), "Server using cipher suite");
// Check the IOStreamPair
GenericTestUtils.assertMatches(logs1.getOutput(),
"Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
- }
- } finally {
- if (cluster != null) {
- cluster.shutdown();
+ } else {
+ // Test client and server negotiate cipher option
+ GenericTestUtils
+ .assertDoesNotMatch(logs.getOutput(), "Server using cipher suite");
+ // Check the IOStreamPair
+ GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
+ "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
}
}
- }
- @Test
- public void testEncryptedReadAfterNameNodeRestart() throws IOException {
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = new Configuration();
- cluster = new MiniDFSCluster.Builder(conf).build();
-
- FileSystem fs = getFileSystem(conf);
- writeTestDataToFile(fs);
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
- fs.close();
- cluster.shutdown();
-
- setEncryptionConfigKeys(conf);
-
- cluster = new MiniDFSCluster.Builder(conf)
- .manageDataDfsDirs(false)
- .manageNameDfsDirs(false)
- .format(false)
- .startupOption(StartupOption.REGULAR)
- .build();
-
- fs = getFileSystem(conf);
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
- fs.close();
-
+ if (readAfterRestart) {
cluster.restartNameNode();
fs = getFileSystem(conf);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
- fs.close();
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
}
}
+
+ @Test
+ public void testEncryptedReadDefaultAlgorithmCipherSuite()
+ throws IOException {
+ testEncryptedRead("", "", false, false);
+ }
+
+ @Test
+ public void testEncryptedReadWithRC4() throws IOException {
+ testEncryptedRead("rc4", "", false, false);
+ }
+
+ @Test
+ public void testEncryptedReadWithAES() throws IOException {
+ testEncryptedRead("", "AES/CTR/NoPadding", true, false);
+ }
+
+ @Test
+ public void testEncryptedReadAfterNameNodeRestart() throws IOException {
+ testEncryptedRead("", "", false, true);
+ }
@Test
public void testClientThatDoesNotSupportEncryption() throws IOException {
- MiniDFSCluster cluster = null;
+ // Set short retry timeouts so this test runs faster
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
+
+ writeUnencryptedAndThenRestartEncryptedCluster();
+
+ DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
+ DFSClient spyClient = Mockito.spy(client);
+ Mockito.doReturn(false).when(spyClient).shouldEncryptData();
+ DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
+
+ LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+ LogFactory.getLog(DataNode.class));
try {
- Configuration conf = new Configuration();
- // Set short retry timeouts so this test runs faster
- conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
- cluster = new MiniDFSCluster.Builder(conf).build();
-
- FileSystem fs = getFileSystem(conf);
- writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- fs.close();
- cluster.shutdown();
-
- setEncryptionConfigKeys(conf);
-
- cluster = new MiniDFSCluster.Builder(conf)
- .manageDataDfsDirs(false)
- .manageNameDfsDirs(false)
- .format(false)
- .startupOption(StartupOption.REGULAR)
- .build();
-
-
- fs = getFileSystem(conf);
- DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs);
- DFSClient spyClient = Mockito.spy(client);
- Mockito.doReturn(false).when(spyClient).shouldEncryptData();
- DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
-
- LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
- LogFactory.getLog(DataNode.class));
- try {
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
- fail("Should not have been able to read without encryption enabled.");
- }
- } catch (IOException ioe) {
- GenericTestUtils.assertExceptionContains("Could not obtain block:",
- ioe);
- } finally {
- logs.stopCapturing();
- }
- fs.close();
-
- if (resolverClazz == null) {
- GenericTestUtils.assertMatches(logs.getOutput(),
- "Failed to read expected encryption handshake from client at");
+ if (resolverClazz != null&&
+ !resolverClazz.endsWith("TestTrustedChannelResolver")){
+ fail("Should not have been able to read without encryption enabled.");
}
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("Could not obtain block:",
+ ioe);
} finally {
- if (cluster != null) {
- cluster.shutdown();
- }
+ logs.stopCapturing();
+ }
+
+ if (resolverClazz == null) {
+ GenericTestUtils.assertMatches(logs.getOutput(),
+ "Failed to read expected encryption handshake from client at");
}
}
@Test
public void testLongLivedReadClientAfterRestart() throws IOException {
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = new Configuration();
- cluster = new MiniDFSCluster.Builder(conf).build();
-
- FileSystem fs = getFileSystem(conf);
- writeTestDataToFile(fs);
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
- fs.close();
- cluster.shutdown();
-
- setEncryptionConfigKeys(conf);
-
- cluster = new MiniDFSCluster.Builder(conf)
- .manageDataDfsDirs(false)
- .manageNameDfsDirs(false)
- .format(false)
- .startupOption(StartupOption.REGULAR)
- .build();
-
- fs = getFileSystem(conf);
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-
- // Restart the NN and DN, after which the client's encryption key will no
- // longer be valid.
- cluster.restartNameNode();
- assertTrue(cluster.restartDataNode(0));
-
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-
- fs.close();
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
+ FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster();
+
+ assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+ assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+
+ // Restart the NN and DN, after which the client's encryption key will no
+ // longer be valid.
+ cluster.restartNameNode();
+ assertTrue(cluster.restartDataNode(0));
+
+ assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+ assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
}
@Test
public void testLongLivedWriteClientAfterRestart() throws IOException {
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = new Configuration();
- setEncryptionConfigKeys(conf);
- cluster = new MiniDFSCluster.Builder(conf).build();
-
- FileSystem fs = getFileSystem(conf);
-
- writeTestDataToFile(fs);
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-
- // Restart the NN and DN, after which the client's encryption key will no
- // longer be valid.
- cluster.restartNameNode();
- assertTrue(cluster.restartDataNodes());
- cluster.waitActive();
-
- writeTestDataToFile(fs);
- assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-
- fs.close();
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
+ setEncryptionConfigKeys();
+ cluster = new MiniDFSCluster.Builder(conf).build();
+
+ fs = getFileSystem(conf);
+
+ writeTestDataToFile(fs);
+ assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+
+ // Restart the NN and DN, after which the client's encryption key will no
+ // longer be valid.
+ cluster.restartNameNode();
+ assertTrue(cluster.restartDataNodes());
+ cluster.waitActive();
+
+ writeTestDataToFile(fs);
+ assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
}
@Test
public void testLongLivedClient() throws IOException, InterruptedException {
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = new Configuration();
- cluster = new MiniDFSCluster.Builder(conf).build();
-
- FileSystem fs = getFileSystem(conf);
- writeTestDataToFile(fs);
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
- fs.close();
- cluster.shutdown();
-
- setEncryptionConfigKeys(conf);
-
- cluster = new MiniDFSCluster.Builder(conf)
- .manageDataDfsDirs(false)
- .manageNameDfsDirs(false)
- .format(false)
- .startupOption(StartupOption.REGULAR)
- .build();
-
- BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
- .getBlockTokenSecretManager();
- btsm.setKeyUpdateIntervalForTesting(2 * 1000);
- btsm.setTokenLifetime(2 * 1000);
- btsm.clearAllKeysForTesting();
-
- fs = getFileSystem(conf);
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-
- // Sleep for 15 seconds, after which the encryption key will no longer be
- // valid. It needs to be a few multiples of the block token lifetime,
- // since several block tokens are valid at any given time (the current
- // and the last two, by default.)
- LOG.info("Sleeping so that encryption keys expire...");
- Thread.sleep(15 * 1000);
- LOG.info("Done sleeping.");
-
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-
- fs.close();
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
+ FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster();
+
+ BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
+ .getBlockTokenSecretManager();
+ btsm.setKeyUpdateIntervalForTesting(2 * 1000);
+ btsm.setTokenLifetime(2 * 1000);
+ btsm.clearAllKeysForTesting();
+
+ assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+ assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+
+ // Sleep for 15 seconds, after which the encryption key will no longer be
+ // valid. It needs to be a few multiples of the block token lifetime,
+ // since several block tokens are valid at any given time (the current
+ // and the last two, by default.)
+ LOG.info("Sleeping so that encryption keys expire...");
+ Thread.sleep(15 * 1000);
+ LOG.info("Done sleeping.");
+
+ assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+ assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+ }
+
+ @Test
+ public void testLongLivedClientPipelineRecovery()
+ throws IOException, InterruptedException, TimeoutException {
+ if (resolverClazz != null) {
+ // TestTrustedChannelResolver does not use encryption keys.
+ return;
}
+ // use 4 datanodes to make sure that after 1 data node is stopped,
+ // client only retries establishing pipeline with the 4th node.
+ int numDataNodes = 4;
+ // do not consider load factor when selecting a data node
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
+ false);
+ setEncryptionConfigKeys();
+
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(numDataNodes)
+ .build();
+
+ fs = getFileSystem(conf);
+ DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
+ DFSClient spyClient = Mockito.spy(client);
+ DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
+ writeTestDataToFile(fs);
+
+ BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
+ .getBlockTokenSecretManager();
+ // Reduce key update interval and token life for testing.
+ btsm.setKeyUpdateIntervalForTesting(2 * 1000);
+ btsm.setTokenLifetime(2 * 1000);
+ btsm.clearAllKeysForTesting();
+
+ // Wait until the encryption key becomes invalid.
+ LOG.info("Wait until encryption keys become invalid...");
+
+ final DataEncryptionKey encryptionKey = spyClient.getEncryptionKey();
+ List<DataNode> dataNodes = cluster.getDataNodes();
+ for (final DataNode dn: dataNodes) {
+ GenericTestUtils.waitFor(
+ new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return !dn.getBlockPoolTokenSecretManager().
+ get(encryptionKey.blockPoolId)
+ .hasKey(encryptionKey.keyId);
+ }
+ }, 100, 30*1000
+ );
+ }
+ LOG.info("The encryption key is invalid on all nodes now.");
+ try(FSDataOutputStream out = fs.append(TEST_PATH)) {
+ DFSOutputStream dfstream = (DFSOutputStream) out.getWrappedStream();
+ // shut down the first datanode in the pipeline.
+ DatanodeInfo[] targets = dfstream.getPipeline();
+ cluster.stopDataNode(targets[0].getXferAddr());
+ // write data to induce pipeline recovery
+ out.write(PLAIN_TEXT.getBytes());
+ out.hflush();
+ assertFalse("The first datanode in the pipeline was not replaced.",
+ Arrays.asList(dfstream.getPipeline()).contains(targets[0]));
+ }
+ // verify that InvalidEncryptionKeyException is handled properly
+ Mockito.verify(spyClient, times(1)).clearDataEncryptionKey();
}
@Test
@@ -497,104 +392,76 @@ public class TestEncryptedTransfer {
}
private void testEncryptedWrite(int numDns) throws IOException {
- MiniDFSCluster cluster = null;
+ setEncryptionConfigKeys();
+
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
+
+ fs = getFileSystem(conf);
+
+ LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+ LogFactory.getLog(SaslDataTransferServer.class));
+ LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
+ LogFactory.getLog(DataTransferSaslUtil.class));
try {
- Configuration conf = new Configuration();
- setEncryptionConfigKeys(conf);
-
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
-
- FileSystem fs = getFileSystem(conf);
-
- LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
- LogFactory.getLog(SaslDataTransferServer.class));
- LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
- LogFactory.getLog(DataTransferSaslUtil.class));
- try {
- writeTestDataToFile(fs);
- } finally {
- logs.stopCapturing();
- logs1.stopCapturing();
- }
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
- fs.close();
-
- if (resolverClazz == null) {
- // Test client and server negotiate cipher option
- GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
- "Server using cipher suite");
- // Check the IOStreamPair
- GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
- "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
- }
+ writeTestDataToFile(fs);
} finally {
- if (cluster != null) {
- cluster.shutdown();
- }
+ logs.stopCapturing();
+ logs1.stopCapturing();
+ }
+ assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+
+ if (resolverClazz == null) {
+ // Test client and server negotiate cipher option
+ GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
+ "Server using cipher suite");
+ // Check the IOStreamPair
+ GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
+ "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
}
}
@Test
public void testEncryptedAppend() throws IOException {
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = new Configuration();
- setEncryptionConfigKeys(conf);
-
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
-
- FileSystem fs = getFileSystem(conf);
-
- writeTestDataToFile(fs);
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-
- writeTestDataToFile(fs);
- assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-
- fs.close();
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
+ setEncryptionConfigKeys();
+
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+
+ fs = getFileSystem(conf);
+
+ writeTestDataToFile(fs);
+ assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+
+ writeTestDataToFile(fs);
+ assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
}
@Test
public void testEncryptedAppendRequiringBlockTransfer() throws IOException {
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = new Configuration();
- setEncryptionConfigKeys(conf);
-
- // start up 4 DNs
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
-
- FileSystem fs = getFileSystem(conf);
-
- // Create a file with replication 3, so its block is on 3 / 4 DNs.
- writeTestDataToFile(fs);
- assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-
- // Shut down one of the DNs holding a block replica.
- FSDataInputStream in = fs.open(TEST_PATH);
- List<LocatedBlock> locatedBlocks = DFSTestUtil.getAllBlocks(in);
- in.close();
- assertEquals(1, locatedBlocks.size());
- assertEquals(3, locatedBlocks.get(0).getLocations().length);
- DataNode dn = cluster.getDataNode(locatedBlocks.get(0).getLocations()[0].getIpcPort());
- dn.shutdown();
-
- // Reopen the file for append, which will need to add another DN to the
- // pipeline and in doing so trigger a block transfer.
- writeTestDataToFile(fs);
- assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-
- fs.close();
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
+ setEncryptionConfigKeys();
+
+ // start up 4 DNs
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
+
+ fs = getFileSystem(conf);
+
+ // Create a file with replication 3, so its block is on 3 / 4 DNs.
+ writeTestDataToFile(fs);
+ assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+
+ // Shut down one of the DNs holding a block replica.
+ FSDataInputStream in = fs.open(TEST_PATH);
+ List<LocatedBlock> locatedBlocks = DFSTestUtil.getAllBlocks(in);
+ in.close();
+ assertEquals(1, locatedBlocks.size());
+ assertEquals(3, locatedBlocks.get(0).getLocations().length);
+ DataNode dn = cluster.getDataNode(
+ locatedBlocks.get(0).getLocations()[0].getIpcPort());
+ dn.shutdown();
+
+ // Reopen the file for append, which will need to add another DN to the
+ // pipeline and in doing so trigger a block transfer.
+ writeTestDataToFile(fs);
+ assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
}
private static void writeTestDataToFile(FileSystem fs) throws IOException {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org