You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2014/07/14 23:23:22 UTC
svn commit: r1610533 - in
/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/net/
src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ s...
Author: szetszwo
Date: Mon Jul 14 21:23:20 2014
New Revision: 1610533
URL: http://svn.apache.org/r1610533
Log:
Merge r1609845 through r1610532 from trunk.
Added:
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/
- copied from r1610532, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/
- copied from r1610532, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithSaslDataTransfer.java
- copied unchanged from r1610532, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithSaslDataTransfer.java
Removed:
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java
Modified:
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/ (props changed)
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed)
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
Propchange: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1609845-1610532
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Jul 14 21:23:20 2014
@@ -61,8 +61,6 @@ Trunk (Unreleased)
HDFS-3030. Remove getProtocolVersion and getProtocolSignature from translators.
(jitendra)
- HDFS-2976. Remove unnecessary method (tokenRefetchNeeded) in DFSClient.
-
HDFS-3111. Missing license headers in trunk. (umamahesh)
HDFS-3091. Update the usage limitations of ReplaceDatanodeOnFailure policy in
@@ -103,8 +101,6 @@ Trunk (Unreleased)
HDFS-3768. Exception in TestJettyHelper is incorrect.
(Eli Reisman via jghoman)
- HDFS-3851. DFSOutputStream class code cleanup. (Jing Zhao via suresh)
-
HDFS-2580. NameNode#main(...) can make use of GenericOptionsParser. (harsh)
HDFS-2127. Add a test that ensure AccessControlExceptions contain
@@ -208,9 +204,6 @@ Trunk (Unreleased)
HDFS-3834. Remove unused static fields NAME, DESCRIPTION and Usage from
Command. (Jing Zhao via suresh)
- HADOOP-8158. Interrupting hadoop fs -put from the command line
- causes a LeaseExpiredException. (daryn via harsh)
-
HDFS-2434. TestNameNodeMetrics.testCorruptBlock fails intermittently.
(Jing Zhao via suresh)
@@ -294,6 +287,14 @@ Release 2.6.0 - UNRELEASED
HDFS-5202. Support Centralized Cache Management on Windows. (cnauroth)
+ HDFS-2976. Remove unnecessary method (tokenRefetchNeeded) in DFSClient.
+ (Uma Maheswara Rao G)
+
+ HDFS-3851. DFSOutputStream class code cleanup. (Jing Zhao via suresh)
+
+ HDFS-2856. Fix block protocol so that Datanodes don't require root or jsvc.
+ (cnauroth)
+
OPTIMIZATIONS
BUG FIXES
@@ -307,6 +308,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6630. Unable to fetch the block information by Browsing the file system on
Namenode UI through IE9 ( Haohui Mai via vinayakumarb)
+ HADOOP-8158. Interrupting hadoop fs -put from the command line
+ causes a LeaseExpiredException. (daryn via harsh)
+
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml Mon Jul 14 21:23:20 2014
@@ -142,6 +142,11 @@ http://maven.apache.org/xsd/maven-4.0.0.
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minikdc</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
Propchange: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1609845-1610532
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Mon Jul 14 21:23:20 2014
@@ -744,7 +744,8 @@ public class BlockReaderFactory implemen
}
}
try {
- Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress);
+ Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
+ datanode);
if (LOG.isTraceEnabled()) {
LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
}
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Mon Jul 14 21:23:20 2014
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
@@ -135,6 +137,7 @@ import org.apache.hadoop.hdfs.protocol.C
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -152,16 +155,19 @@ import org.apache.hadoop.hdfs.protocol.S
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -208,7 +214,8 @@ import com.google.common.net.InetAddress
*
********************************************************/
@InterfaceAudience.Private
-public class DFSClient implements java.io.Closeable, RemotePeerFactory {
+public class DFSClient implements java.io.Closeable, RemotePeerFactory,
+ DataEncryptionKeyFactory {
public static final Log LOG = LogFactory.getLog(DFSClient.class);
public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
@@ -232,7 +239,7 @@ public class DFSClient implements java.i
private final Random r = new Random();
private SocketAddress[] localInterfaceAddrs;
private DataEncryptionKey encryptionKey;
- final TrustedChannelResolver trustedChannelResolver;
+ final SaslDataTransferClient saslClient;
private final CachingStrategy defaultReadCachingStrategy;
private final CachingStrategy defaultWriteCachingStrategy;
private final ClientContext clientContext;
@@ -634,7 +641,12 @@ public class DFSClient implements java.i
if (numThreads > 0) {
this.initThreadsNumForHedgedReads(numThreads);
}
- this.trustedChannelResolver = TrustedChannelResolver.getInstance(getConfiguration());
+ this.saslClient = new SaslDataTransferClient(
+ DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+ TrustedChannelResolver.getInstance(conf),
+ conf.getBoolean(
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
}
/**
@@ -1804,23 +1816,6 @@ public class DFSClient implements java.i
UnresolvedPathException.class);
}
}
-
- /**
- * Get the checksum of the whole file of a range of the file. Note that the
- * range always starts from the beginning of the file.
- * @param src The file path
- * @param length The length of the range
- * @return The checksum
- * @see DistributedFileSystem#getFileChecksum(Path)
- */
- public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
- throws IOException {
- checkOpen();
- Preconditions.checkArgument(length >= 0);
- return getFileChecksum(src, length, clientName, namenode,
- socketFactory, dfsClientConf.socketTimeout, getDataEncryptionKey(),
- dfsClientConf.connectToDnViaHostname);
- }
@InterfaceAudience.Private
public void clearDataEncryptionKey() {
@@ -1840,11 +1835,9 @@ public class DFSClient implements java.i
return d == null ? false : d.getEncryptDataTransfer();
}
- @InterfaceAudience.Private
- public DataEncryptionKey getDataEncryptionKey()
- throws IOException {
- if (shouldEncryptData() &&
- !this.trustedChannelResolver.isTrusted()) {
+ @Override
+ public DataEncryptionKey newDataEncryptionKey() throws IOException {
+ if (shouldEncryptData()) {
synchronized (this) {
if (encryptionKey == null ||
encryptionKey.expiryDate < Time.now()) {
@@ -1859,22 +1852,17 @@ public class DFSClient implements java.i
}
/**
- * Get the checksum of the whole file or a range of the file.
+ * Get the checksum of the whole file of a range of the file. Note that the
+ * range always starts from the beginning of the file.
* @param src The file path
* @param length the length of the range, i.e., the range is [0, length]
- * @param clientName the name of the client requesting the checksum.
- * @param namenode the RPC proxy for the namenode
- * @param socketFactory to create sockets to connect to DNs
- * @param socketTimeout timeout to use when connecting and waiting for a response
- * @param encryptionKey the key needed to communicate with DNs in this cluster
- * @param connectToDnViaHostname whether the client should use hostnames instead of IPs
* @return The checksum
+ * @see DistributedFileSystem#getFileChecksum(Path)
*/
- private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
- long length, String clientName, ClientProtocol namenode,
- SocketFactory socketFactory, int socketTimeout,
- DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
+ public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
throws IOException {
+ checkOpen();
+ Preconditions.checkArgument(length >= 0);
//get block locations for the file range
LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,
length);
@@ -1909,7 +1897,7 @@ public class DFSClient implements java.i
final DatanodeInfo[] datanodes = lb.getLocations();
//try each datanode location of the block
- final int timeout = 3000 * datanodes.length + socketTimeout;
+ final int timeout = 3000 * datanodes.length + dfsClientConf.socketTimeout;
boolean done = false;
for(int j = 0; !done && j < datanodes.length; j++) {
DataOutputStream out = null;
@@ -1917,8 +1905,7 @@ public class DFSClient implements java.i
try {
//connect to a datanode
- IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname,
- encryptionKey, datanodes[j], timeout);
+ IOStreamPair pair = connectToDN(datanodes[j], timeout, lb);
out = new DataOutputStream(new BufferedOutputStream(pair.out,
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(pair.in);
@@ -1974,9 +1961,7 @@ public class DFSClient implements java.i
} else {
LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
"inferring checksum by reading first byte");
- ct = inferChecksumTypeByReading(
- clientName, socketFactory, socketTimeout, lb, datanodes[j],
- encryptionKey, connectToDnViaHostname);
+ ct = inferChecksumTypeByReading(lb, datanodes[j]);
}
if (i == 0) { // first block
@@ -2050,16 +2035,13 @@ public class DFSClient implements java.i
* Connect to the given datanode's datantrasfer port, and return
* the resulting IOStreamPair. This includes encryption wrapping, etc.
*/
- private static IOStreamPair connectToDN(
- SocketFactory socketFactory, boolean connectToDnViaHostname,
- DataEncryptionKey encryptionKey, DatanodeInfo dn, int timeout)
- throws IOException
- {
+ private IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
+ LocatedBlock lb) throws IOException {
boolean success = false;
Socket sock = null;
try {
sock = socketFactory.createSocket();
- String dnAddr = dn.getXferAddr(connectToDnViaHostname);
+ String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr);
}
@@ -2068,13 +2050,8 @@ public class DFSClient implements java.i
OutputStream unbufOut = NetUtils.getOutputStream(sock);
InputStream unbufIn = NetUtils.getInputStream(sock);
- IOStreamPair ret;
- if (encryptionKey != null) {
- ret = DataTransferEncryptor.getEncryptedStreams(
- unbufOut, unbufIn, encryptionKey);
- } else {
- ret = new IOStreamPair(unbufIn, unbufOut);
- }
+ IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this,
+ lb.getBlockToken(), dn);
success = true;
return ret;
} finally {
@@ -2090,21 +2067,14 @@ public class DFSClient implements java.i
* with older HDFS versions which did not include the checksum type in
* OpBlockChecksumResponseProto.
*
- * @param in input stream from datanode
- * @param out output stream to datanode
* @param lb the located block
- * @param clientName the name of the DFSClient requesting the checksum
* @param dn the connected datanode
* @return the inferred checksum type
* @throws IOException if an error occurs
*/
- private static Type inferChecksumTypeByReading(
- String clientName, SocketFactory socketFactory, int socketTimeout,
- LocatedBlock lb, DatanodeInfo dn,
- DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
+ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
throws IOException {
- IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname,
- encryptionKey, dn, socketTimeout);
+ IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb);
try {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
@@ -2857,7 +2827,9 @@ public class DFSClient implements java.i
}
@Override // RemotePeerFactory
- public Peer newConnectedPeer(InetSocketAddress addr) throws IOException {
+ public Peer newConnectedPeer(InetSocketAddress addr,
+ Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
+ throws IOException {
Peer peer = null;
boolean success = false;
Socket sock = null;
@@ -2866,8 +2838,8 @@ public class DFSClient implements java.i
NetUtils.connect(sock, addr,
getRandomLocalInterfaceAddr(),
dfsClientConf.socketTimeout);
- peer = TcpPeerServer.peerFromSocketAndKey(sock,
- getDataEncryptionKey());
+ peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
+ blockToken, datanodeId);
success = true;
return peer;
} finally {
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Mon Jul 14 21:23:20 2014
@@ -570,6 +570,8 @@ public class DFSConfigKeys extends Commo
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
+ public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
+ public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class";
// Journal-node related configs. These are read on the JN side.
public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Mon Jul 14 21:23:20 2014
@@ -61,7 +61,6 @@ import org.apache.hadoop.hdfs.protocol.N
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
@@ -1047,14 +1046,10 @@ public class DFSOutputStream extends FSO
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock);
- if (dfsClient.shouldEncryptData() &&
- !dfsClient.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufOut, unbufIn, dfsClient.getDataEncryptionKey());
- unbufOut = encryptedStreams.out;
- unbufIn = encryptedStreams.in;
- }
+ IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
+ unbufOut, unbufIn, dfsClient, blockToken, src);
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn);
@@ -1325,14 +1320,10 @@ public class DFSOutputStream extends FSO
OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(s);
- if (dfsClient.shouldEncryptData() &&
- !dfsClient.trustedChannelResolver.isTrusted(s.getInetAddress())) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(unbufOut,
- unbufIn, dfsClient.getDataEncryptionKey());
- unbufOut = encryptedStreams.out;
- unbufIn = encryptedStreams.in;
- }
+ IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
+ unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
blockReplyStream = new DataInputStream(unbufIn);
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java Mon Jul 14 21:23:20 2014
@@ -21,15 +21,21 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
public interface RemotePeerFactory {
/**
* @param addr The address to connect to.
- *
+ * @param blockToken Token used during optional SASL negotiation
+ * @param datanodeId ID of destination DataNode
* @return A new Peer connected to the address.
*
* @throws IOException If there was an error connecting or creating
* the remote socket, encrypted stream, etc.
*/
- Peer newConnectedPeer(InetSocketAddress addr) throws IOException;
+ Peer newConnectedPeer(InetSocketAddress addr,
+ Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
+ throws IOException;
}
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java Mon Jul 14 21:23:20 2014
@@ -19,9 +19,7 @@ package org.apache.hadoop.hdfs.net;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.net.unix.DomainSocket;
import java.io.InputStream;
@@ -51,11 +49,8 @@ public class EncryptedPeer implements Pe
*/
private final ReadableByteChannel channel;
- public EncryptedPeer(Peer enclosedPeer, DataEncryptionKey key)
- throws IOException {
+ public EncryptedPeer(Peer enclosedPeer, IOStreamPair ios) {
this.enclosedPeer = enclosedPeer;
- IOStreamPair ios = DataTransferEncryptor.getEncryptedStreams(
- enclosedPeer.getOutputStream(), enclosedPeer.getInputStream(), key);
this.in = ios.in;
this.out = ios.out;
this.channel = ios.in instanceof ReadableByteChannel ?
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java Mon Jul 14 21:23:20 2014
@@ -28,10 +28,14 @@ import java.nio.channels.SocketChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.token.Token;
@InterfaceAudience.Private
public class TcpPeerServer implements PeerServer {
@@ -74,15 +78,16 @@ public class TcpPeerServer implements Pe
}
}
- public static Peer peerFromSocketAndKey(Socket s,
- DataEncryptionKey key) throws IOException {
+ public static Peer peerFromSocketAndKey(
+ SaslDataTransferClient saslClient, Socket s,
+ DataEncryptionKeyFactory keyFactory,
+ Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
+ throws IOException {
Peer peer = null;
boolean success = false;
try {
- peer = peerFromSocket(s);
- if (key != null) {
- peer = new EncryptedPeer(peer, key);
- }
+ peer = peerFromSocket(s);
+ peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId);
success = true;
return peer;
} finally {
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Mon Jul 14 21:23:20 2014
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.balancer;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.BufferedInputStream;
@@ -62,9 +64,11 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -202,6 +206,7 @@ public class Balancer {
private final NameNodeConnector nnc;
private final BalancingPolicy policy;
+ private final SaslDataTransferClient saslClient;
private final double threshold;
// all data node lists
@@ -352,19 +357,18 @@ public class Balancer {
OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
- if (nnc.getDataEncryptionKey() != null) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufOut, unbufIn, nnc.getDataEncryptionKey());
- unbufOut = encryptedStreams.out;
- unbufIn = encryptedStreams.in;
- }
+ ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
+ Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
+ IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+ unbufIn, nnc, accessToken, target.datanode);
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.IO_FILE_BUFFER_SIZE));
in = new DataInputStream(new BufferedInputStream(unbufIn,
HdfsConstants.IO_FILE_BUFFER_SIZE));
- sendRequest(out);
+ sendRequest(out, eb, accessToken);
receiveResponse(in);
bytesMoved.addAndGet(block.getNumBytes());
LOG.info("Successfully moved " + this);
@@ -395,9 +399,8 @@ public class Balancer {
}
/* Send a block replace request to the output stream*/
- private void sendRequest(DataOutputStream out) throws IOException {
- final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
- final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
+ private void sendRequest(DataOutputStream out, ExtendedBlock eb,
+ Token<BlockTokenIdentifier> accessToken) throws IOException {
new Sender(out).replaceBlock(eb, accessToken,
source.getStorageID(), proxySource.getDatanode());
}
@@ -876,6 +879,12 @@ public class Balancer {
this.maxConcurrentMovesPerNode =
conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+ this.saslClient = new SaslDataTransferClient(
+ DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+ TrustedChannelResolver.getInstance(conf),
+ conf.getBoolean(
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
}
/* Given a data node set, build a network topology and decide
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Mon Jul 14 21:23:20 2014
@@ -34,8 +34,8 @@ import org.apache.hadoop.hdfs.NameNodePr
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -50,7 +50,7 @@ import org.apache.hadoop.util.Daemon;
* The class provides utilities for {@link Balancer} to access a NameNode
*/
@InterfaceAudience.Private
-class NameNodeConnector {
+class NameNodeConnector implements DataEncryptionKeyFactory {
private static final Log LOG = Balancer.LOG;
private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
@@ -72,7 +72,6 @@ class NameNodeConnector {
private BlockTokenSecretManager blockTokenSecretManager;
private Daemon keyupdaterthread; // AccessKeyUpdater thread
private DataEncryptionKey encryptionKey;
- private final TrustedChannelResolver trustedChannelResolver;
NameNodeConnector(URI nameNodeUri,
Configuration conf) throws IOException {
@@ -122,7 +121,6 @@ class NameNodeConnector {
if (out == null) {
throw new IOException("Another balancer is running");
}
- this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
}
boolean shouldContinue(long dispatchBlockMoveBytes) {
@@ -154,10 +152,10 @@ class NameNodeConnector {
BlockTokenSecretManager.AccessMode.COPY));
}
}
-
- DataEncryptionKey getDataEncryptionKey()
- throws IOException {
- if (encryptDataTransfer && !this.trustedChannelResolver.isTrusted()) {
+
+ @Override
+ public DataEncryptionKey newDataEncryptionKey() {
+ if (encryptDataTransfer) {
synchronized (this) {
if (encryptionKey == null) {
encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Mon Jul 14 21:23:20 2014
@@ -52,7 +52,9 @@ import static org.apache.hadoop.hdfs.DFS
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.security.SaslPropertiesResolver;
/**
* Simple class encapsulating all of the configuration that the DataNode
@@ -86,6 +88,7 @@ public class DNConf {
final String minimumNameNodeVersion;
final String encryptionAlgorithm;
+ final SaslPropertiesResolver saslPropsResolver;
final TrustedChannelResolver trustedChannelResolver;
final long xceiverStopTimeout;
@@ -168,6 +171,8 @@ public class DNConf {
DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
+ this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
+ conf);
this.xceiverStopTimeout = conf.getLong(
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
@@ -186,7 +191,26 @@ public class DNConf {
String getMinimumNameNodeVersion() {
return this.minimumNameNodeVersion;
}
-
+
+ /**
+ * Returns true if encryption enabled for DataTransferProtocol.
+ *
+ * @return boolean true if encryption enabled for DataTransferProtocol
+ */
+ public boolean getEncryptDataTransfer() {
+ return encryptDataTransfer;
+ }
+
+ /**
+ * Returns encryption algorithm configured for DataTransferProtocol, or null
+ * if not configured.
+ *
+ * @return encryption algorithm configured for DataTransferProtocol
+ */
+ public String getEncryptionAlgorithm() {
+ return encryptionAlgorithm;
+ }
+
public long getXceiverStopTimeout() {
return xceiverStopTimeout;
}
@@ -194,4 +218,24 @@ public class DNConf {
public long getMaxLockedMemory() {
return maxLockedMemory;
}
+
+ /**
+ * Returns the SaslPropertiesResolver configured for use with
+ * DataTransferProtocol, or null if not configured.
+ *
+ * @return SaslPropertiesResolver configured for use with DataTransferProtocol
+ */
+ public SaslPropertiesResolver getSaslPropsResolver() {
+ return saslPropsResolver;
+ }
+
+ /**
+ * Returns the TrustedChannelResolver configured for use with
+ * DataTransferProtocol, or null if not configured.
+ *
+ * @return TrustedChannelResolver configured for use with DataTransferProtocol
+ */
+ public TrustedChannelResolver getTrustedChannelResolver() {
+ return trustedChannelResolver;
+ }
}
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Jul 14 21:23:20 2014
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -40,6 +43,9 @@ import org.apache.hadoop.hdfs.net.Domain
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.*;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@@ -224,6 +230,8 @@ public class DataNode extends Configured
private final List<String> usersWithLocalPathAccess;
private final boolean connectToDnViaHostname;
ReadaheadPool readaheadPool;
+ SaslDataTransferClient saslClient;
+ SaslDataTransferServer saslServer;
private final boolean getHdfsBlockLocationsEnabled;
private ObjectName dataNodeInfoBeanName;
private Thread checkDiskErrorThread = null;
@@ -722,15 +730,10 @@ public class DataNode extends Configured
*/
void startDataNode(Configuration conf,
List<StorageLocation> dataDirs,
- // DatanodeProtocol namenode,
SecureResources resources
) throws IOException {
- if(UserGroupInformation.isSecurityEnabled() && resources == null) {
- if (!conf.getBoolean("ignore.secure.ports.for.testing", false)) {
- throw new RuntimeException("Cannot start secure cluster without "
- + "privileged resources.");
- }
- }
+
+ checkSecureConfig(conf, resources);
// settings global for all BPs in the Data Node
this.secureResources = resources;
@@ -790,6 +793,55 @@ public class DataNode extends Configured
// Create the ReadaheadPool from the DataNode context so we can
// exit without having to explicitly shutdown its thread pool.
readaheadPool = ReadaheadPool.getInstance();
+ saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver,
+ dnConf.trustedChannelResolver,
+ conf.getBoolean(
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
+ saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
+ }
+
+ /**
+ * Checks if the DataNode has a secure configuration if security is enabled.
+ * There are 2 possible configurations that are considered secure:
+ * 1. The server has bound to privileged ports for RPC and HTTP via
+ * SecureDataNodeStarter.
+ * 2. The configuration enables SASL on DataTransferProtocol and HTTPS (no
+ * plain HTTP) for the HTTP server. The SASL handshake guarantees
+ * authentication of the RPC server before a client transmits a secret, such
+ * as a block access token. Similarly, SSL guarantees authentication of the
+ * HTTP server before a client transmits a secret, such as a delegation
+ * token.
+ * It is not possible to run with both privileged ports and SASL on
+ * DataTransferProtocol. For backwards-compatibility, the connection logic
+ * must check if the target port is a privileged port, and if so, skip the
+ * SASL handshake.
+ *
+ * @param conf Configuration to check
+ * @param resources SecuredResources obtained for DataNode
+ * @throws RuntimeException if security enabled, but configuration is insecure
+ */
+ private static void checkSecureConfig(Configuration conf,
+ SecureResources resources) throws RuntimeException {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+ String dataTransferProtection = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
+ if (resources != null && dataTransferProtection == null) {
+ return;
+ }
+ if (conf.getBoolean("ignore.secure.ports.for.testing", false)) {
+ return;
+ }
+ if (dataTransferProtection != null &&
+ DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY &&
+ resources == null) {
+ return;
+ }
+ throw new RuntimeException("Cannot start secure DataNode without " +
+ "configuring either privileged resources or SASL RPC data transfer " +
+ "protection and SSL for HTTP. Using privileged resources in " +
+ "combination with SASL RPC data transfer protection is not supported.");
}
public static String generateUuid() {
@@ -1623,20 +1675,25 @@ public class DataNode extends Configured
NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
sock.setSoTimeout(targets.length * dnConf.socketTimeout);
+ //
+ // Header info
+ //
+ Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
+ if (isBlockTokenEnabled) {
+ accessToken = blockPoolTokenSecretManager.generateToken(b,
+ EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
+ }
+
long writeTimeout = dnConf.socketWriteTimeout +
HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock);
- if (dnConf.encryptDataTransfer &&
- !dnConf.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufOut, unbufIn,
- blockPoolTokenSecretManager.generateDataEncryptionKey(
- b.getBlockPoolId()));
- unbufOut = encryptedStreams.out;
- unbufIn = encryptedStreams.in;
- }
+ DataEncryptionKeyFactory keyFactory =
+ getDataEncryptionKeyFactoryForBlock(b);
+ IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+ unbufIn, keyFactory, accessToken, bpReg);
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
@@ -1645,15 +1702,6 @@ public class DataNode extends Configured
false, false, true, DataNode.this, null, cachingStrategy);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
- //
- // Header info
- //
- Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
- if (isBlockTokenEnabled) {
- accessToken = blockPoolTokenSecretManager.generateToken(b,
- EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
- }
-
new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
@@ -1696,7 +1744,26 @@ public class DataNode extends Configured
}
}
}
-
+
+ /**
+ * Returns a new DataEncryptionKeyFactory that generates a key from the
+ * BlockPoolTokenSecretManager, using the block pool ID of the given block.
+ *
+ * @param block for which the factory needs to create a key
+ * @return DataEncryptionKeyFactory for block's block pool ID
+ */
+ DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
+ final ExtendedBlock block) {
+ return new DataEncryptionKeyFactory() {
+ @Override
+ public DataEncryptionKey newDataEncryptionKey() {
+ return dnConf.encryptDataTransfer ?
+ blockPoolTokenSecretManager.generateDataEncryptionKey(
+ block.getBlockPoolId()) : null;
+ }
+ };
+ }
+
/**
* After a block becomes finalized, a datanode increases metric counter,
* notifies namenode, and adds it to the block scanner
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon Jul 14 21:23:20 2014
@@ -36,11 +36,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
-import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.security.MessageDigest;
import java.util.Arrays;
@@ -52,13 +50,12 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
@@ -85,7 +82,6 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.util.DataChecksum;
import com.google.common.base.Preconditions;
-import com.google.common.net.InetAddresses;
import com.google.protobuf.ByteString;
@@ -174,24 +170,11 @@ class DataXceiver extends Receiver imple
dataXceiverServer.addPeer(peer, Thread.currentThread());
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
- if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer &&
- !dnConf.trustedChannelResolver.isTrusted(getClientAddress(peer))){
- IOStreamPair encryptedStreams = null;
- try {
- encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut,
- socketIn, datanode.blockPoolTokenSecretManager,
- dnConf.encryptionAlgorithm);
- } catch (InvalidMagicNumberException imne) {
- LOG.info("Failed to read expected encryption handshake from client " +
- "at " + peer.getRemoteAddressString() + ". Perhaps the client " +
- "is running an older version of Hadoop which does not support " +
- "encryption");
- return;
- }
- input = encryptedStreams.in;
- socketOut = encryptedStreams.out;
- }
- input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE);
+ IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
+ socketIn, datanode.getDatanodeId());
+ input = new BufferedInputStream(saslStreams.in,
+ HdfsConstants.SMALL_BUFFER_SIZE);
+ socketOut = saslStreams.out;
super.initialize(new DataInputStream(input));
@@ -263,19 +246,6 @@ class DataXceiver extends Receiver imple
}
}
}
-
- /**
- * Returns InetAddress from peer
- * The getRemoteAddressString is the form /ip-address:port
- * The ip-address is extracted from peer and InetAddress is formed
- * @param peer
- * @return
- * @throws UnknownHostException
- */
- private static InetAddress getClientAddress(Peer peer) {
- return InetAddresses.forString(
- peer.getRemoteAddressString().split(":")[0].substring(1));
- }
@Override
public void requestShortCircuitFds(final ExtendedBlock blk,
@@ -656,17 +626,12 @@ class DataXceiver extends Receiver imple
OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
writeTimeout);
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
- if (dnConf.encryptDataTransfer &&
- !dnConf.trustedChannelResolver.isTrusted(mirrorSock.getInetAddress())) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufMirrorOut, unbufMirrorIn,
- datanode.blockPoolTokenSecretManager
- .generateDataEncryptionKey(block.getBlockPoolId()));
-
- unbufMirrorOut = encryptedStreams.out;
- unbufMirrorIn = encryptedStreams.in;
- }
+ DataEncryptionKeyFactory keyFactory =
+ datanode.getDataEncryptionKeyFactoryForBlock(block);
+ IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
+ unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
+ unbufMirrorOut = saslStreams.out;
+ unbufMirrorIn = saslStreams.in;
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn);
@@ -1026,17 +991,12 @@ class DataXceiver extends Receiver imple
OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
dnConf.socketWriteTimeout);
InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
- if (dnConf.encryptDataTransfer &&
- !dnConf.trustedChannelResolver.isTrusted(
- proxySock.getInetAddress())) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufProxyOut, unbufProxyIn,
- datanode.blockPoolTokenSecretManager
- .generateDataEncryptionKey(block.getBlockPoolId()));
- unbufProxyOut = encryptedStreams.out;
- unbufProxyIn = encryptedStreams.in;
- }
+ DataEncryptionKeyFactory keyFactory =
+ datanode.getDataEncryptionKeyFactoryForBlock(block);
+ IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock,
+ unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource);
+ unbufProxyOut = saslStreams.out;
+ unbufProxyIn = saslStreams.in;
proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut,
HdfsConstants.SMALL_BUFFER_SIZE));
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Mon Jul 14 21:23:20 2014
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
@@ -47,6 +50,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -55,6 +59,12 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
@@ -65,6 +75,7 @@ import org.apache.hadoop.net.NetworkTopo
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
@@ -92,7 +103,7 @@ import com.google.common.annotations.Vis
* factors of each file.
*/
@InterfaceAudience.Private
-public class NamenodeFsck {
+public class NamenodeFsck implements DataEncryptionKeyFactory {
public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
// return string marking fsck status
@@ -149,6 +160,7 @@ public class NamenodeFsck {
private List<String> snapshottableDirs = null;
private final BlockPlacementPolicy bpPolicy;
+ private final SaslDataTransferClient saslClient;
/**
* Filesystem checker.
@@ -175,6 +187,12 @@ public class NamenodeFsck {
networktopology,
namenode.getNamesystem().getBlockManager().getDatanodeManager()
.getHost2DatanodeMap());
+ this.saslClient = new SaslDataTransferClient(
+ DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+ TrustedChannelResolver.getInstance(conf),
+ conf.getBoolean(
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
String key = it.next();
@@ -616,15 +634,16 @@ public class NamenodeFsck {
setConfiguration(namenode.conf).
setRemotePeerFactory(new RemotePeerFactory() {
@Override
- public Peer newConnectedPeer(InetSocketAddress addr)
+ public Peer newConnectedPeer(InetSocketAddress addr,
+ Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
throws IOException {
Peer peer = null;
Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
try {
s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
- peer = TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
- getDataEncryptionKey());
+ peer = TcpPeerServer.peerFromSocketAndKey(saslClient, s,
+ NamenodeFsck.this, blockToken, datanodeId);
} finally {
if (peer == null) {
IOUtils.closeQuietly(s);
@@ -663,7 +682,12 @@ public class NamenodeFsck {
throw new Exception("Could not copy block data for " + lblock.getBlock());
}
}
-
+
+ @Override
+ public DataEncryptionKey newDataEncryptionKey() throws IOException {
+ return namenode.getRpcServer().getDataEncryptionKey();
+ }
+
/*
* XXX (ab) See comment above for copyBlock().
*
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Mon Jul 14 21:23:20 2014
@@ -1453,6 +1453,37 @@
</property>
<property>
+ <name>dfs.data.transfer.protection</name>
+ <value></value>
+ <description>
+ A comma-separated list of SASL protection values used for secured
+ connections to the DataNode when reading or writing block data. Possible
+ values are authentication, integrity and privacy. authentication means
+ authentication only and no integrity or privacy; integrity implies
+ authentication and integrity are enabled; and privacy implies all of
+ authentication, integrity and privacy are enabled. If
+ dfs.encrypt.data.transfer is set to true, then it supersedes the setting for
+ dfs.data.transfer.protection and enforces that all connections must use a
+ specialized encrypted SASL handshake. This property is ignored for
+ connections to a DataNode listening on a privileged port. In this case, it
+ is assumed that the use of a privileged port establishes sufficient trust.
+ </description>
+</property>
+
+<property>
+ <name>dfs.data.transfer.saslproperties.resolver.class</name>
+ <value></value>
+ <description>
+ SaslPropertiesResolver used to resolve the QOP used for a connection to the
+ DataNode when reading or writing block data. If not specified, the full set
+ of values specified in dfs.data.transfer.protection is used while
+ determining the QOP used for the connection. If a class is specified, then
+ the QOP values returned by the class will be used while determining the QOP
+ used for the connection.
+ </description>
+</property>
+
+<property>
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
<value>false</value>
<description>
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Mon Jul 14 21:23:20 2014
@@ -33,9 +33,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -48,6 +50,7 @@ import org.apache.hadoop.hdfs.shortcircu
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
@@ -192,7 +195,8 @@ public class BlockReaderTestUtil {
setAllowShortCircuitLocalReads(true).
setRemotePeerFactory(new RemotePeerFactory() {
@Override
- public Peer newConnectedPeer(InetSocketAddress addr)
+ public Peer newConnectedPeer(InetSocketAddress addr,
+ Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
throws IOException {
Peer peer = null;
Socket sock = NetUtils.
@@ -251,4 +255,4 @@ public class BlockReaderTestUtil {
LogManager.getLogger(DataNode.class.getName()).setLevel(
Level.TRACE);
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Mon Jul 14 21:23:20 2014
@@ -19,12 +19,15 @@ package org.apache.hadoop.hdfs;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
@@ -1308,15 +1311,42 @@ public class MiniDFSCluster {
}
SecureResources secureResources = null;
- if (UserGroupInformation.isSecurityEnabled()) {
+ if (UserGroupInformation.isSecurityEnabled() &&
+ conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY) == null) {
try {
secureResources = SecureDataNodeStarter.getSecureResources(dnConf);
} catch (Exception ex) {
ex.printStackTrace();
}
}
- DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf,
- secureResources);
+ final int maxRetriesOnSasl = conf.getInt(
+ IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY,
+ IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT);
+ int numRetries = 0;
+ DataNode dn = null;
+ while (true) {
+ try {
+ dn = DataNode.instantiateDataNode(dnArgs, dnConf,
+ secureResources);
+ break;
+ } catch (IOException e) {
+ // Work around issue testing security where rapidly starting multiple
+ // DataNodes using the same principal gets rejected by the KDC as a
+ // replay attack.
+ if (UserGroupInformation.isSecurityEnabled() &&
+ numRetries < maxRetriesOnSasl) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ ++numRetries;
+ continue;
+ }
+ throw e;
+ }
+ }
if(dn == null)
throw new IOException("Cannot start DataNode in "
+ dnConf.get(DFS_DATANODE_DATA_DIR_KEY));
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Mon Jul 14 21:23:20 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu
import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -160,7 +161,8 @@ public class TestBlockTokenWithDFS {
setConfiguration(conf).
setRemotePeerFactory(new RemotePeerFactory() {
@Override
- public Peer newConnectedPeer(InetSocketAddress addr)
+ public Peer newConnectedPeer(InetSocketAddress addr,
+ Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
throws IOException {
Peer peer = null;
Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1610533&r1=1610532&r2=1610533&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Mon Jul 14 21:23:20 2014
@@ -46,9 +46,11 @@ import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -58,6 +60,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -307,7 +310,8 @@ public class TestDataNodeVolumeFailure {
setConfiguration(conf).
setRemotePeerFactory(new RemotePeerFactory() {
@Override
- public Peer newConnectedPeer(InetSocketAddress addr)
+ public Peer newConnectedPeer(InetSocketAddress addr,
+ Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
throws IOException {
Peer peer = null;
Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();