You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2014/07/25 22:33:22 UTC
svn commit: r1613514 [3/6] - in
/hadoop/common/branches/YARN-1051/hadoop-hdfs-project:
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/
hadoop-hdfs-nfs/src/test/java/org/apac...
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Jul 25 20:33:09 2014
@@ -17,10 +17,68 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.BlockingService;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
+import static org.apache.hadoop.util.ExitUtil.terminate;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.management.ObjectName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,17 +94,44 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.net.DomainPeerServer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.protocol.datatransfer.*;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
-import org.apache.hadoop.hdfs.protocolPB.*;
-import org.apache.hadoop.hdfs.security.token.block.*;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -59,7 +144,11 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.*;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.http.HttpConfig;
@@ -82,22 +171,21 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.JvmPauseMonitor;
+import org.apache.hadoop.util.ServicePlugin;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
-import javax.management.ObjectName;
-
-import java.io.*;
-import java.lang.management.ManagementFactory;
-import java.net.*;
-import java.nio.channels.SocketChannel;
-import java.security.PrivilegedExceptionAction;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import static org.apache.hadoop.util.ExitUtil.terminate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
/**********************************************************
* DataNode is a class (and program) that stores a set of
@@ -224,6 +312,8 @@ public class DataNode extends Configured
private final List<String> usersWithLocalPathAccess;
private final boolean connectToDnViaHostname;
ReadaheadPool readaheadPool;
+ SaslDataTransferClient saslClient;
+ SaslDataTransferServer saslServer;
private final boolean getHdfsBlockLocationsEnabled;
private ObjectName dataNodeInfoBeanName;
private Thread checkDiskErrorThread = null;
@@ -722,15 +812,10 @@ public class DataNode extends Configured
*/
void startDataNode(Configuration conf,
List<StorageLocation> dataDirs,
- // DatanodeProtocol namenode,
SecureResources resources
) throws IOException {
- if(UserGroupInformation.isSecurityEnabled() && resources == null) {
- if (!conf.getBoolean("ignore.secure.ports.for.testing", false)) {
- throw new RuntimeException("Cannot start secure cluster without "
- + "privileged resources.");
- }
- }
+
+ checkSecureConfig(conf, resources);
// settings global for all BPs in the Data Node
this.secureResources = resources;
@@ -790,6 +875,55 @@ public class DataNode extends Configured
// Create the ReadaheadPool from the DataNode context so we can
// exit without having to explicitly shutdown its thread pool.
readaheadPool = ReadaheadPool.getInstance();
+ saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver,
+ dnConf.trustedChannelResolver,
+ conf.getBoolean(
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
+ saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
+ }
+
+ /**
+ * Checks if the DataNode has a secure configuration if security is enabled.
+ * There are 2 possible configurations that are considered secure:
+ * 1. The server has bound to privileged ports for RPC and HTTP via
+ * SecureDataNodeStarter.
+ * 2. The configuration enables SASL on DataTransferProtocol and HTTPS (no
+ * plain HTTP) for the HTTP server. The SASL handshake guarantees
+ * authentication of the RPC server before a client transmits a secret, such
+ * as a block access token. Similarly, SSL guarantees authentication of the
+ * HTTP server before a client transmits a secret, such as a delegation
+ * token.
+ * It is not possible to run with both privileged ports and SASL on
+ * DataTransferProtocol. For backwards-compatibility, the connection logic
+ * must check if the target port is a privileged port, and if so, skip the
+ * SASL handshake.
+ *
+ * @param conf Configuration to check
+ * @param resources SecuredResources obtained for DataNode
+ * @throws RuntimeException if security enabled, but configuration is insecure
+ */
+ private static void checkSecureConfig(Configuration conf,
+ SecureResources resources) throws RuntimeException {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+ String dataTransferProtection = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
+ if (resources != null && dataTransferProtection == null) {
+ return;
+ }
+ if (conf.getBoolean("ignore.secure.ports.for.testing", false)) {
+ return;
+ }
+ if (dataTransferProtection != null &&
+ DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY &&
+ resources == null) {
+ return;
+ }
+ throw new RuntimeException("Cannot start secure DataNode without " +
+ "configuring either privileged resources or SASL RPC data transfer " +
+ "protection and SSL for HTTP. Using privileged resources in " +
+ "combination with SASL RPC data transfer protection is not supported.");
}
public static String generateUuid() {
@@ -1423,8 +1557,8 @@ public class DataNode extends Configured
return xmitsInProgress.get();
}
- private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[])
- throws IOException {
+ private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
+ StorageType[] xferTargetStorageTypes) throws IOException {
BPOfferService bpos = getBPOSForBlock(block);
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
@@ -1460,16 +1594,17 @@ public class DataNode extends Configured
LOG.info(bpReg + " Starting thread to transfer " +
block + " to " + xfersBuilder);
- new Daemon(new DataTransfer(xferTargets, block,
+ new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block,
BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
}
}
void transferBlocks(String poolId, Block blocks[],
- DatanodeInfo xferTargets[][]) {
+ DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) {
for (int i = 0; i < blocks.length; i++) {
try {
- transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]);
+ transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i],
+ xferTargetStorageTypes[i]);
} catch (IOException ie) {
LOG.warn("Failed to transfer block " + blocks[i], ie);
}
@@ -1572,6 +1707,7 @@ public class DataNode extends Configured
*/
private class DataTransfer implements Runnable {
final DatanodeInfo[] targets;
+ final StorageType[] targetStorageTypes;
final ExtendedBlock b;
final BlockConstructionStage stage;
final private DatanodeRegistration bpReg;
@@ -1582,7 +1718,8 @@ public class DataNode extends Configured
* Connect to the first item in the target list. Pass along the
* entire target list, the block, and the data.
*/
- DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
+ DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes,
+ ExtendedBlock b, BlockConstructionStage stage,
final String clientname) {
if (DataTransferProtocol.LOG.isDebugEnabled()) {
DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
@@ -1592,6 +1729,7 @@ public class DataNode extends Configured
+ ", targests=" + Arrays.asList(targets));
}
this.targets = targets;
+ this.targetStorageTypes = targetStorageTypes;
this.b = b;
this.stage = stage;
BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
@@ -1623,20 +1761,25 @@ public class DataNode extends Configured
NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
sock.setSoTimeout(targets.length * dnConf.socketTimeout);
+ //
+ // Header info
+ //
+ Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
+ if (isBlockTokenEnabled) {
+ accessToken = blockPoolTokenSecretManager.generateToken(b,
+ EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
+ }
+
long writeTimeout = dnConf.socketWriteTimeout +
HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock);
- if (dnConf.encryptDataTransfer &&
- !dnConf.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufOut, unbufIn,
- blockPoolTokenSecretManager.generateDataEncryptionKey(
- b.getBlockPoolId()));
- unbufOut = encryptedStreams.out;
- unbufIn = encryptedStreams.in;
- }
+ DataEncryptionKeyFactory keyFactory =
+ getDataEncryptionKeyFactoryForBlock(b);
+ IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+ unbufIn, keyFactory, accessToken, bpReg);
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
@@ -1645,16 +1788,8 @@ public class DataNode extends Configured
false, false, true, DataNode.this, null, cachingStrategy);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
- //
- // Header info
- //
- Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
- if (isBlockTokenEnabled) {
- accessToken = blockPoolTokenSecretManager.generateToken(b,
- EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
- }
-
- new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
+ new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
+ clientname, targets, targetStorageTypes, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
// send data & checksum
@@ -1696,7 +1831,26 @@ public class DataNode extends Configured
}
}
}
-
+
+ /**
+ * Returns a new DataEncryptionKeyFactory that generates a key from the
+ * BlockPoolTokenSecretManager, using the block pool ID of the given block.
+ *
+ * @param block for which the factory needs to create a key
+ * @return DataEncryptionKeyFactory for block's block pool ID
+ */
+ DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
+ final ExtendedBlock block) {
+ return new DataEncryptionKeyFactory() {
+ @Override
+ public DataEncryptionKey newDataEncryptionKey() {
+ return dnConf.encryptDataTransfer ?
+ blockPoolTokenSecretManager.generateDataEncryptionKey(
+ block.getBlockPoolId()) : null;
+ }
+ };
+ }
+
/**
* After a block becomes finalized, a datanode increases metric counter,
* notifies namenode, and adds it to the block scanner
@@ -2336,7 +2490,8 @@ public class DataNode extends Configured
* @param client client name
*/
void transferReplicaForPipelineRecovery(final ExtendedBlock b,
- final DatanodeInfo[] targets, final String client) throws IOException {
+ final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
+ final String client) throws IOException {
final long storedGS;
final long visible;
final BlockConstructionStage stage;
@@ -2369,7 +2524,7 @@ public class DataNode extends Configured
b.setNumBytes(visible);
if (targets.length > 0) {
- new DataTransfer(targets, b, stage, client).run();
+ new DataTransfer(targets, targetStorageTypes, b, stage, client).run();
}
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Jul 25 20:33:09 2014
@@ -36,29 +36,27 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
-import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.security.MessageDigest;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
@@ -85,7 +83,6 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.util.DataChecksum;
import com.google.common.base.Preconditions;
-import com.google.common.net.InetAddresses;
import com.google.protobuf.ByteString;
@@ -174,24 +171,11 @@ class DataXceiver extends Receiver imple
dataXceiverServer.addPeer(peer, Thread.currentThread());
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
- if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer &&
- !dnConf.trustedChannelResolver.isTrusted(getClientAddress(peer))){
- IOStreamPair encryptedStreams = null;
- try {
- encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut,
- socketIn, datanode.blockPoolTokenSecretManager,
- dnConf.encryptionAlgorithm);
- } catch (InvalidMagicNumberException imne) {
- LOG.info("Failed to read expected encryption handshake from client " +
- "at " + peer.getRemoteAddressString() + ". Perhaps the client " +
- "is running an older version of Hadoop which does not support " +
- "encryption");
- return;
- }
- input = encryptedStreams.in;
- socketOut = encryptedStreams.out;
- }
- input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE);
+ IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
+ socketIn, datanode.getDatanodeId());
+ input = new BufferedInputStream(saslStreams.in,
+ HdfsConstants.SMALL_BUFFER_SIZE);
+ socketOut = saslStreams.out;
super.initialize(new DataInputStream(input));
@@ -263,19 +247,6 @@ class DataXceiver extends Receiver imple
}
}
}
-
- /**
- * Returns InetAddress from peer
- * The getRemoteAddressString is the form /ip-address:port
- * The ip-address is extracted from peer and InetAddress is formed
- * @param peer
- * @return
- * @throws UnknownHostException
- */
- private static InetAddress getClientAddress(Peer peer) {
- return InetAddresses.forString(
- peer.getRemoteAddressString().split(":")[0].substring(1));
- }
@Override
public void requestShortCircuitFds(final ExtendedBlock blk,
@@ -554,9 +525,11 @@ class DataXceiver extends Receiver imple
@Override
public void writeBlock(final ExtendedBlock block,
+ final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientname,
final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes,
final DatanodeInfo srcDataNode,
final BlockConstructionStage stage,
final int pipelineSize,
@@ -620,12 +593,13 @@ class DataXceiver extends Receiver imple
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
- blockReceiver = new BlockReceiver(block, in,
+ blockReceiver = new BlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy);
+
storageUuid = blockReceiver.getStorageUuid();
} else {
storageUuid = datanode.data.recoverClose(
@@ -656,25 +630,20 @@ class DataXceiver extends Receiver imple
OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
writeTimeout);
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
- if (dnConf.encryptDataTransfer &&
- !dnConf.trustedChannelResolver.isTrusted(mirrorSock.getInetAddress())) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufMirrorOut, unbufMirrorIn,
- datanode.blockPoolTokenSecretManager
- .generateDataEncryptionKey(block.getBlockPoolId()));
-
- unbufMirrorOut = encryptedStreams.out;
- unbufMirrorIn = encryptedStreams.in;
- }
+ DataEncryptionKeyFactory keyFactory =
+ datanode.getDataEncryptionKeyFactoryForBlock(block);
+ IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
+ unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
+ unbufMirrorOut = saslStreams.out;
+ unbufMirrorIn = saslStreams.in;
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn);
- new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
- clientname, targets, srcDataNode, stage, pipelineSize,
- minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum,
- cachingStrategy);
+ new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
+ blockToken, clientname, targets, targetStorageTypes, srcDataNode,
+ stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+ latestGenerationStamp, requestedChecksum, cachingStrategy);
mirrorOut.flush();
@@ -789,7 +758,8 @@ class DataXceiver extends Receiver imple
public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
- final DatanodeInfo[] targets) throws IOException {
+ final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes) throws IOException {
checkAccess(socketOut, true, blk, blockToken,
Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
previousOpClientName = clientName;
@@ -798,7 +768,8 @@ class DataXceiver extends Receiver imple
final DataOutputStream out = new DataOutputStream(
getOutputStream());
try {
- datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
+ datanode.transferReplicaForPipelineRecovery(blk, targets,
+ targetStorageTypes, clientName);
writeResponse(Status.SUCCESS, null, out);
} finally {
IOUtils.closeStream(out);
@@ -976,6 +947,7 @@ class DataXceiver extends Receiver imple
@Override
public void replaceBlock(final ExtendedBlock block,
+ final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
final DatanodeInfo proxySource) throws IOException {
@@ -1026,17 +998,12 @@ class DataXceiver extends Receiver imple
OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
dnConf.socketWriteTimeout);
InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
- if (dnConf.encryptDataTransfer &&
- !dnConf.trustedChannelResolver.isTrusted(
- proxySock.getInetAddress())) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufProxyOut, unbufProxyIn,
- datanode.blockPoolTokenSecretManager
- .generateDataEncryptionKey(block.getBlockPoolId()));
- unbufProxyOut = encryptedStreams.out;
- unbufProxyIn = encryptedStreams.in;
- }
+ DataEncryptionKeyFactory keyFactory =
+ datanode.getDataEncryptionKeyFactoryForBlock(block);
+ IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock,
+ unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource);
+ unbufProxyOut = saslStreams.out;
+ unbufProxyIn = saslStreams.in;
proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut,
HdfsConstants.SMALL_BUFFER_SIZE));
@@ -1066,8 +1033,8 @@ class DataXceiver extends Receiver imple
DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
checksumInfo.getChecksum());
// open a block receiver and check if the block does not exist
- blockReceiver = new BlockReceiver(
- block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
+ blockReceiver = new BlockReceiver(block, storageType,
+ proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum,
CachingStrategy.newDropBehind());
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Fri Jul 25 20:33:09 2014
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -176,8 +177,8 @@ public interface FsDatasetSpi<V extends
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
- public ReplicaInPipelineInterface createTemporary(ExtendedBlock b
- ) throws IOException;
+ public ReplicaInPipelineInterface createTemporary(StorageType storageType,
+ ExtendedBlock b) throws IOException;
/**
* Creates a RBW replica and returns the meta info of the replica
@@ -186,8 +187,8 @@ public interface FsDatasetSpi<V extends
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
- public ReplicaInPipelineInterface createRbw(ExtendedBlock b
- ) throws IOException;
+ public ReplicaInPipelineInterface createRbw(StorageType storageType,
+ ExtendedBlock b) throws IOException;
/**
* Recovers a RBW replica and returns the meta info of the replica
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Fri Jul 25 20:33:09 2014
@@ -17,6 +17,28 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -24,12 +46,37 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.datanode.*;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.hdfs.server.datanode.Replica;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -43,15 +90,6 @@ import org.apache.hadoop.util.DiskChecke
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.*;
-import java.util.concurrent.Executor;
-
/**************************************************
* FSDataset manages a set of data blocks. Each block
* has a unique name and an extent on disk.
@@ -736,8 +774,8 @@ class FsDatasetImpl implements FsDataset
}
@Override // FsDatasetSpi
- public synchronized ReplicaInPipeline createRbw(ExtendedBlock b)
- throws IOException {
+ public synchronized ReplicaInPipeline createRbw(StorageType storageType,
+ ExtendedBlock b) throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
@@ -746,7 +784,7 @@ class FsDatasetImpl implements FsDataset
" and thus cannot be created.");
}
// create a new block
- FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
+ FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
// create a rbw file to hold block in the designated volume
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
@@ -874,8 +912,8 @@ class FsDatasetImpl implements FsDataset
}
@Override // FsDatasetSpi
- public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b)
- throws IOException {
+ public synchronized ReplicaInPipeline createTemporary(StorageType storageType,
+ ExtendedBlock b) throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (replicaInfo != null) {
throw new ReplicaAlreadyExistsException("Block " + b +
@@ -883,7 +921,7 @@ class FsDatasetImpl implements FsDataset
" and thus cannot be created.");
}
- FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
+ FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
// create a temporary file to hold block in the designated volume
File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Fri Jul 25 20:33:09 2014
@@ -18,13 +18,17 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
-import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.Time;
class FsVolumeList {
/**
@@ -52,11 +56,18 @@ class FsVolumeList {
* by a single thread and next volume is chosen with no concurrent
* update to {@link #volumes}.
* @param blockSize free space needed on the volume
+ * @param storageType the desired {@link StorageType}
* @return next volume to store the block in.
*/
- // TODO should choose volume with storage type
- synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException {
- return blockChooser.chooseVolume(volumes, blockSize);
+ synchronized FsVolumeImpl getNextVolume(StorageType storageType,
+ long blockSize) throws IOException {
+ final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
+ for(FsVolumeImpl v : volumes) {
+ if (v.getStorageType() == storageType) {
+ list.add(v);
+ }
+ }
+ return blockChooser.chooseVolume(list, blockSize);
}
long getDfsUsed() throws IOException {
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Fri Jul 25 20:33:09 2014
@@ -128,7 +128,8 @@ public class DatanodeWebHdfsMethods {
"://" + nnId);
boolean isLogical = HAUtil.isLogicalUri(conf, nnUri);
if (isLogical) {
- token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri));
+ token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri,
+ HdfsConstants.HDFS_URI_SCHEME));
} else {
token.setService(SecurityUtil.buildTokenService(nnUri));
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Fri Jul 25 20:33:09 2014
@@ -48,6 +48,15 @@ public interface FSClusterStats {
* @return Number of datanodes that are both alive and not decommissioned.
*/
public int getNumDatanodesInService();
+
+ /**
+ * an indication of the average load of non-decommission(ing|ed) nodes
+ * eligible for block placement
+ *
+ * @return average of the in service number of block transfers and block
+ * writes that are currently occurring on the cluster.
+ */
+ public double getInServiceXceiverAverage();
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Jul 25 20:33:09 2014
@@ -1074,10 +1074,11 @@ public class FSEditLog implements LogsPu
logEdit(op);
}
- void logRemoveXAttrs(String src, List<XAttr> xAttrs) {
+ void logRemoveXAttrs(String src, List<XAttr> xAttrs, boolean toLogRpcIds) {
final RemoveXAttrOp op = RemoveXAttrOp.getInstance();
op.src = src;
op.xAttrs = xAttrs;
+ logRpcIds(op, toLogRpcIds);
logEdit(op);
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Fri Jul 25 20:33:09 2014
@@ -821,6 +821,10 @@ public class FSEditLogLoader {
RemoveXAttrOp removeXAttrOp = (RemoveXAttrOp) op;
fsDir.unprotectedRemoveXAttrs(removeXAttrOp.src,
removeXAttrOp.xAttrs);
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntry(removeXAttrOp.rpcClientId,
+ removeXAttrOp.rpcCallId);
+ }
break;
}
default:
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Fri Jul 25 20:33:09 2014
@@ -3551,6 +3551,7 @@ public abstract class FSEditLogOp {
XAttrEditLogProto p = XAttrEditLogProto.parseDelimitedFrom(in);
src = p.getSrc();
xAttrs = PBHelper.convertXAttrs(p.getXAttrsList());
+ readRpcIds(in, logVersion);
}
@Override
@@ -3561,18 +3562,22 @@ public abstract class FSEditLogOp {
}
b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs));
b.build().writeDelimitedTo(out);
+ // clientId and callId
+ writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "SRC", src);
appendXAttrsToXml(contentHandler, xAttrs);
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
src = st.getValue("SRC");
xAttrs = readXAttrsFromXml(st);
+ readRpcIdsFromXml(st);
}
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Jul 25 20:33:09 2014
@@ -225,6 +225,7 @@ public class FSImage implements Closeabl
NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
}
if (startOpt != StartupOption.UPGRADE
+ && startOpt != StartupOption.UPGRADEONLY
&& !RollingUpgradeStartupOption.STARTED.matches(startOpt)
&& layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
&& layoutVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) {
@@ -263,6 +264,7 @@ public class FSImage implements Closeabl
// 3. Do transitions
switch(startOpt) {
case UPGRADE:
+ case UPGRADEONLY:
doUpgrade(target);
return false; // upgrade saved image already
case IMPORT:
@@ -748,11 +750,13 @@ public class FSImage implements Closeabl
editLog.recoverUnclosedStreams();
} else if (HAUtil.isHAEnabled(conf, nameserviceId)
&& (startOpt == StartupOption.UPGRADE
+ || startOpt == StartupOption.UPGRADEONLY
|| RollingUpgradeStartupOption.ROLLBACK.matches(startOpt))) {
// This NN is HA, but we're doing an upgrade or a rollback of rolling
// upgrade so init the edit log for write.
editLog.initJournalsForWrite();
- if (startOpt == StartupOption.UPGRADE) {
+ if (startOpt == StartupOption.UPGRADE
+ || startOpt == StartupOption.UPGRADEONLY) {
long sharedLogCTime = editLog.getSharedLogCTime();
if (this.storage.getCTime() < sharedLogCTime) {
throw new IOException("It looks like the shared log is already " +
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Fri Jul 25 20:33:09 2014
@@ -614,6 +614,16 @@ public class FSImageFormat {
INodeDirectory parentINode = fsDir.rootDir;
for (long i = 0; i < numFiles; i++) {
pathComponents = FSImageSerialization.readPathComponents(in);
+ for (int j=0; j < pathComponents.length; j++) {
+ byte[] newComponent = renameReservedComponentOnUpgrade
+ (pathComponents[j], getLayoutVersion());
+ if (!Arrays.equals(newComponent, pathComponents[j])) {
+ String oldPath = DFSUtil.byteArray2PathString(pathComponents);
+ pathComponents[j] = newComponent;
+ String newPath = DFSUtil.byteArray2PathString(pathComponents);
+ LOG.info("Renaming reserved path " + oldPath + " to " + newPath);
+ }
+ }
final INode newNode = loadINode(
pathComponents[pathComponents.length-1], false, in, counter);
@@ -926,6 +936,7 @@ public class FSImageFormat {
oldnode = namesystem.dir.getInode(cons.getId()).asFile();
inSnapshot = true;
} else {
+ path = renameReservedPathsOnUpgrade(path, getLayoutVersion());
final INodesInPath iip = fsDir.getLastINodeInPath(path);
oldnode = INodeFile.valueOf(iip.getINode(0), path);
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jul 25 20:33:09 2014
@@ -83,6 +83,9 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT;
+
import static org.apache.hadoop.util.Time.now;
import java.io.BufferedWriter;
@@ -528,6 +531,8 @@ public class FSNamesystem implements Nam
private final FSImage fsImage;
+ private boolean randomizeBlockLocationsPerBlock;
+
/**
* Notify that loading of this FSDirectory is complete, and
* it is imageLoaded for use
@@ -837,6 +842,10 @@ public class FSNamesystem implements Nam
alwaysUseDelegationTokensForTests = conf.getBoolean(
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
+
+ this.randomizeBlockLocationsPerBlock = conf.getBoolean(
+ DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK,
+ DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT);
this.dtSecretManager = createDelegationTokenSecretManager(conf);
this.dir = new FSDirectory(this, conf);
@@ -979,7 +988,8 @@ public class FSNamesystem implements Nam
}
// This will start a new log segment and write to the seen_txid file, so
// we shouldn't do it when coming up in standby state
- if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)) {
+ if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)
+ || (haEnabled && startOpt == StartupOption.UPGRADEONLY)) {
fsImage.openEditLogForWrite();
}
success = true;
@@ -1699,17 +1709,17 @@ public class FSNamesystem implements Nam
LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true,
true);
if (blocks != null) {
- blockManager.getDatanodeManager().sortLocatedBlocks(
- clientMachine, blocks.getLocatedBlocks());
-
+ blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
+ blocks.getLocatedBlocks(), randomizeBlockLocationsPerBlock);
+
// lastBlock is not part of getLocatedBlocks(), might need to sort it too
LocatedBlock lastBlock = blocks.getLastLocatedBlock();
if (lastBlock != null) {
ArrayList<LocatedBlock> lastBlockList =
Lists.newArrayListWithCapacity(1);
lastBlockList.add(lastBlock);
- blockManager.getDatanodeManager().sortLocatedBlocks(
- clientMachine, lastBlockList);
+ blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
+ lastBlockList, randomizeBlockLocationsPerBlock);
}
}
return blocks;
@@ -7319,7 +7329,18 @@ public class FSNamesystem implements Nam
@Override // FSClusterStats
public int getNumDatanodesInService() {
- return getNumLiveDataNodes() - getNumDecomLiveDataNodes();
+ return datanodeStatistics.getNumDatanodesInService();
+ }
+
+ @Override // for block placement strategy
+ public double getInServiceXceiverAverage() {
+ double avgLoad = 0;
+ final int nodes = getNumDatanodesInService();
+ if (nodes != 0) {
+ final int xceivers = datanodeStatistics.getInServiceXceiverCount();
+ avgLoad = (double)xceivers/nodes;
+ }
+ return avgLoad;
}
public SnapshotManager getSnapshotManager() {
@@ -8258,11 +8279,12 @@ public class FSNamesystem implements Nam
nnConf.checkXAttrsConfigFlag();
FSPermissionChecker pc = getPermissionChecker();
boolean getAll = xAttrs == null || xAttrs.isEmpty();
- List<XAttr> filteredXAttrs = null;
if (!getAll) {
- filteredXAttrs = XAttrPermissionFilter.filterXAttrsForApi(pc, xAttrs);
- if (filteredXAttrs.isEmpty()) {
- return filteredXAttrs;
+ try {
+ XAttrPermissionFilter.checkPermissionForApi(pc, xAttrs);
+ } catch (AccessControlException e) {
+ logAuditEvent(false, "getXAttrs", src);
+ throw e;
}
}
checkOperation(OperationCategory.READ);
@@ -8281,15 +8303,21 @@ public class FSNamesystem implements Nam
if (filteredAll == null || filteredAll.isEmpty()) {
return null;
}
- List<XAttr> toGet = Lists.newArrayListWithCapacity(filteredXAttrs.size());
- for (XAttr xAttr : filteredXAttrs) {
+ List<XAttr> toGet = Lists.newArrayListWithCapacity(xAttrs.size());
+ for (XAttr xAttr : xAttrs) {
+ boolean foundIt = false;
for (XAttr a : filteredAll) {
if (xAttr.getNameSpace() == a.getNameSpace()
&& xAttr.getName().equals(a.getName())) {
toGet.add(a);
+ foundIt = true;
break;
}
}
+ if (!foundIt) {
+ throw new IOException(
+ "At least one of the attributes provided was not found.");
+ }
}
return toGet;
}
@@ -8323,17 +8351,42 @@ public class FSNamesystem implements Nam
readUnlock();
}
}
-
+
+ /**
+ * Remove an xattr for a file or directory.
+ *
+ * @param src
+ * - path to remove the xattr from
+ * @param xAttr
+ * - xAttr to remove
+ * @throws AccessControlException
+ * @throws SafeModeException
+ * @throws UnresolvedLinkException
+ * @throws IOException
+ */
void removeXAttr(String src, XAttr xAttr) throws IOException {
- nnConf.checkXAttrsConfigFlag();
- HdfsFileStatus resultingStat = null;
- FSPermissionChecker pc = getPermissionChecker();
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+ boolean success = false;
try {
- XAttrPermissionFilter.checkPermissionForApi(pc, xAttr);
+ removeXAttrInt(src, xAttr, cacheEntry != null);
+ success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "removeXAttr", src);
throw e;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
}
+ }
+
+ void removeXAttrInt(String src, XAttr xAttr, boolean logRetryCache)
+ throws IOException {
+ nnConf.checkXAttrsConfigFlag();
+ HdfsFileStatus resultingStat = null;
+ FSPermissionChecker pc = getPermissionChecker();
+ XAttrPermissionFilter.checkPermissionForApi(pc, xAttr);
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock();
@@ -8347,12 +8400,12 @@ public class FSNamesystem implements Nam
xAttrs.add(xAttr);
List<XAttr> removedXAttrs = dir.removeXAttrs(src, xAttrs);
if (removedXAttrs != null && !removedXAttrs.isEmpty()) {
- getEditLog().logRemoveXAttrs(src, removedXAttrs);
+ getEditLog().logRemoveXAttrs(src, removedXAttrs, logRetryCache);
+ } else {
+ throw new IOException(
+ "No matching attributes found for remove operation");
}
resultingStat = getAuditFileInfo(src, false);
- } catch (AccessControlException e) {
- logAuditEvent(false, "removeXAttr", src);
- throw e;
} finally {
writeUnlock();
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Fri Jul 25 20:33:09 2014
@@ -71,6 +71,8 @@ public class FileJournalManager implemen
NameNodeFile.EDITS.getName() + "_(\\d+)-(\\d+)");
private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile(
NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
+ private static final Pattern EDITS_INPROGRESS_STALE_REGEX = Pattern.compile(
+ NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+).*(\\S+)");
private File currentInProgress = null;
@@ -162,8 +164,7 @@ public class FileJournalManager implemen
throws IOException {
LOG.info("Purging logs older than " + minTxIdToKeep);
File[] files = FileUtil.listFiles(sd.getCurrentDir());
- List<EditLogFile> editLogs =
- FileJournalManager.matchEditLogs(files);
+ List<EditLogFile> editLogs = matchEditLogs(files, true);
for (EditLogFile log : editLogs) {
if (log.getFirstTxId() < minTxIdToKeep &&
log.getLastTxId() < minTxIdToKeep) {
@@ -244,8 +245,13 @@ public class FileJournalManager implemen
public static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
return matchEditLogs(FileUtil.listFiles(logDir));
}
-
+
static List<EditLogFile> matchEditLogs(File[] filesInStorage) {
+ return matchEditLogs(filesInStorage, false);
+ }
+
+ private static List<EditLogFile> matchEditLogs(File[] filesInStorage,
+ boolean forPurging) {
List<EditLogFile> ret = Lists.newArrayList();
for (File f : filesInStorage) {
String name = f.getName();
@@ -256,6 +262,7 @@ public class FileJournalManager implemen
long startTxId = Long.parseLong(editsMatch.group(1));
long endTxId = Long.parseLong(editsMatch.group(2));
ret.add(new EditLogFile(f, startTxId, endTxId));
+ continue;
} catch (NumberFormatException nfe) {
LOG.error("Edits file " + f + " has improperly formatted " +
"transaction ID");
@@ -270,12 +277,30 @@ public class FileJournalManager implemen
long startTxId = Long.parseLong(inProgressEditsMatch.group(1));
ret.add(
new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID, true));
+ continue;
} catch (NumberFormatException nfe) {
LOG.error("In-progress edits file " + f + " has improperly " +
"formatted transaction ID");
// skip
}
}
+ if (forPurging) {
+ // Check for in-progress stale edits
+ Matcher staleInprogressEditsMatch = EDITS_INPROGRESS_STALE_REGEX
+ .matcher(name);
+ if (staleInprogressEditsMatch.matches()) {
+ try {
+ long startTxId = Long.valueOf(staleInprogressEditsMatch.group(1));
+ ret.add(new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID,
+ true));
+ continue;
+ } catch (NumberFormatException nfe) {
+ LOG.error("In-progress stale edits file " + f + " has improperly "
+ + "formatted transaction ID");
+ // skip
+ }
+ }
+ }
}
return ret;
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java Fri Jul 25 20:33:09 2014
@@ -836,7 +836,7 @@ public class NNStorage extends Storage i
*/
void processStartupOptionsForUpgrade(StartupOption startOpt, int layoutVersion)
throws IOException {
- if (startOpt == StartupOption.UPGRADE) {
+ if (startOpt == StartupOption.UPGRADE || startOpt == StartupOption.UPGRADEONLY) {
// If upgrade from a release that does not support federation,
// if clusterId is provided in the startupOptions use it.
// Else generate a new cluster ID
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Jul 25 20:33:09 2014
@@ -210,6 +210,9 @@ public class NameNode implements NameNod
+ StartupOption.UPGRADE.getName() +
" [" + StartupOption.CLUSTERID.getName() + " cid]" +
" [" + StartupOption.RENAMERESERVED.getName() + "<k-v pairs>] ] | \n\t["
+ + StartupOption.UPGRADEONLY.getName() +
+ " [" + StartupOption.CLUSTERID.getName() + " cid]" +
+ " [" + StartupOption.RENAMERESERVED.getName() + "<k-v pairs>] ] | \n\t["
+ StartupOption.ROLLBACK.getName() + "] | \n\t["
+ StartupOption.ROLLINGUPGRADE.getName() + " <"
+ RollingUpgradeStartupOption.DOWNGRADE.name().toLowerCase() + "|"
@@ -713,6 +716,7 @@ public class NameNode implements NameNod
* <li>{@link StartupOption#BACKUP BACKUP} - start backup node</li>
* <li>{@link StartupOption#CHECKPOINT CHECKPOINT} - start checkpoint node</li>
* <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster
+ * <li>{@link StartupOption#UPGRADEONLY UPGRADEONLY} - upgrade the cluster
* upgrade and create a snapshot of the current file system state</li>
* <li>{@link StartupOption#RECOVER RECOVERY} - recover name node
* metadata</li>
@@ -767,7 +771,8 @@ public class NameNode implements NameNod
}
protected HAState createHAState(StartupOption startOpt) {
- if (!haEnabled || startOpt == StartupOption.UPGRADE) {
+ if (!haEnabled || startOpt == StartupOption.UPGRADE
+ || startOpt == StartupOption.UPGRADEONLY) {
return ACTIVE_STATE;
} else {
return STANDBY_STATE;
@@ -1198,8 +1203,10 @@ public class NameNode implements NameNod
startOpt = StartupOption.BACKUP;
} else if (StartupOption.CHECKPOINT.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.CHECKPOINT;
- } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)) {
- startOpt = StartupOption.UPGRADE;
+ } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)
+ || StartupOption.UPGRADEONLY.getName().equalsIgnoreCase(cmd)) {
+ startOpt = StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd) ?
+ StartupOption.UPGRADE : StartupOption.UPGRADEONLY;
/* Can be followed by CLUSTERID with a required parameter or
* RENAMERESERVED with an optional parameter
*/
@@ -1407,6 +1414,12 @@ public class NameNode implements NameNod
terminate(0);
return null; // avoid javac warning
}
+ case UPGRADEONLY: {
+ DefaultMetricsSystem.initialize("NameNode");
+ new NameNode(conf);
+ terminate(0);
+ return null;
+ }
default: {
DefaultMetricsSystem.initialize("NameNode");
return new NameNode(conf);
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Fri Jul 25 20:33:09 2014
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
@@ -47,6 +50,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -55,6 +59,12 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
@@ -65,6 +75,7 @@ import org.apache.hadoop.net.NetworkTopo
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
@@ -92,7 +103,7 @@ import com.google.common.annotations.Vis
* factors of each file.
*/
@InterfaceAudience.Private
-public class NamenodeFsck {
+public class NamenodeFsck implements DataEncryptionKeyFactory {
public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
// return string marking fsck status
@@ -115,6 +126,7 @@ public class NamenodeFsck {
private boolean showBlocks = false;
private boolean showLocations = false;
private boolean showRacks = false;
+ private boolean showprogress = false;
private boolean showCorruptFileBlocks = false;
/**
@@ -149,6 +161,7 @@ public class NamenodeFsck {
private List<String> snapshottableDirs = null;
private final BlockPlacementPolicy bpPolicy;
+ private final SaslDataTransferClient saslClient;
/**
* Filesystem checker.
@@ -175,6 +188,12 @@ public class NamenodeFsck {
networktopology,
namenode.getNamesystem().getBlockManager().getDatanodeManager()
.getHost2DatanodeMap());
+ this.saslClient = new SaslDataTransferClient(
+ DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+ TrustedChannelResolver.getInstance(conf),
+ conf.getBoolean(
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
String key = it.next();
@@ -185,6 +204,7 @@ public class NamenodeFsck {
else if (key.equals("blocks")) { this.showBlocks = true; }
else if (key.equals("locations")) { this.showLocations = true; }
else if (key.equals("racks")) { this.showRacks = true; }
+ else if (key.equals("showprogress")) { this.showprogress = true; }
else if (key.equals("openforwrite")) {this.showOpenFiles = true; }
else if (key.equals("listcorruptfileblocks")) {
this.showCorruptFileBlocks = true;
@@ -363,10 +383,13 @@ public class NamenodeFsck {
} else if (showFiles) {
out.print(path + " " + fileLen + " bytes, " +
blocks.locatedBlockCount() + " block(s): ");
- } else {
+ } else if (showprogress) {
out.print('.');
}
- if (res.totalFiles % 100 == 0) { out.println(); out.flush(); }
+ if ((showprogress) && res.totalFiles % 100 == 0) {
+ out.println();
+ out.flush();
+ }
int missing = 0;
int corrupt = 0;
long missize = 0;
@@ -616,15 +639,16 @@ public class NamenodeFsck {
setConfiguration(namenode.conf).
setRemotePeerFactory(new RemotePeerFactory() {
@Override
- public Peer newConnectedPeer(InetSocketAddress addr)
+ public Peer newConnectedPeer(InetSocketAddress addr,
+ Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
throws IOException {
Peer peer = null;
Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
try {
s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
- peer = TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
- getDataEncryptionKey());
+ peer = TcpPeerServer.peerFromSocketAndKey(saslClient, s,
+ NamenodeFsck.this, blockToken, datanodeId);
} finally {
if (peer == null) {
IOUtils.closeQuietly(s);
@@ -663,7 +687,12 @@ public class NamenodeFsck {
throw new Exception("Could not copy block data for " + lblock.getBlock());
}
}
-
+
+ @Override
+ public DataEncryptionKey newDataEncryptionKey() throws IOException {
+ return namenode.getRpcServer().getDataEncryptionKey();
+ }
+
/*
* XXX (ab) See comment above for copyBlock().
*
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrPermissionFilter.java Fri Jul 25 20:33:09 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.XAttrHelpe
import org.apache.hadoop.security.AccessControlException;
import com.google.common.collect.Lists;
+import com.google.common.base.Preconditions;
/**
* There are four types of extended attributes <XAttr> defined by the
@@ -60,8 +61,20 @@ public class XAttrPermissionFilter {
throw new AccessControlException("User doesn't have permission for xattr: "
+ XAttrHelper.getPrefixName(xAttr));
}
-
- static List<XAttr> filterXAttrsForApi(FSPermissionChecker pc,
+
+ static void checkPermissionForApi(FSPermissionChecker pc,
+ List<XAttr> xAttrs) throws AccessControlException {
+ Preconditions.checkArgument(xAttrs != null);
+ if (xAttrs.isEmpty()) {
+ return;
+ }
+
+ for (XAttr xAttr : xAttrs) {
+ checkPermissionForApi(pc, xAttr);
+ }
+ }
+
+ static List<XAttr> filterXAttrsForApi(FSPermissionChecker pc,
List<XAttr> xAttrs) {
assert xAttrs != null : "xAttrs can not be null";
if (xAttrs == null || xAttrs.isEmpty()) {
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java Fri Jul 25 20:33:09 2014
@@ -19,24 +19,30 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.util.List;
+import java.util.Map;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
-import org.apache.hadoop.hdfs.server.namenode.INode;
-
-import com.google.common.collect.ImmutableList;
/**
* XAttrStorage is used to read and set xattrs for an inode.
*/
@InterfaceAudience.Private
public class XAttrStorage {
-
+
+ private static final Map<String, String> internedNames = Maps.newHashMap();
+
/**
* Reads the existing extended attributes of an inode. If the
* inode does not have an <code>XAttr</code>, then this method
* returns an empty list.
+ * <p/>
+ * Must be called while holding the FSDirectory read lock.
+ *
* @param inode INode to read
* @param snapshotId
* @return List<XAttr> <code>XAttr</code> list.
@@ -48,6 +54,9 @@ public class XAttrStorage {
/**
* Reads the existing extended attributes of an inode.
+ * <p/>
+ * Must be called while holding the FSDirectory read lock.
+ *
* @param inode INode to read.
* @return List<XAttr> <code>XAttr</code> list.
*/
@@ -58,6 +67,9 @@ public class XAttrStorage {
/**
* Update xattrs of inode.
+ * <p/>
+ * Must be called while holding the FSDirectory write lock.
+ *
* @param inode INode to update
* @param xAttrs to update xAttrs.
* @param snapshotId id of the latest snapshot of the inode
@@ -70,8 +82,24 @@ public class XAttrStorage {
}
return;
}
-
- ImmutableList<XAttr> newXAttrs = ImmutableList.copyOf(xAttrs);
+ // Dedupe the xAttr name and save them into a new interned list
+ List<XAttr> internedXAttrs = Lists.newArrayListWithCapacity(xAttrs.size());
+ for (XAttr xAttr : xAttrs) {
+ final String name = xAttr.getName();
+ String internedName = internedNames.get(name);
+ if (internedName == null) {
+ internedName = name;
+ internedNames.put(internedName, internedName);
+ }
+ XAttr internedXAttr = new XAttr.Builder()
+ .setName(internedName)
+ .setNameSpace(xAttr.getNameSpace())
+ .setValue(xAttr.getValue())
+ .build();
+ internedXAttrs.add(internedXAttr);
+ }
+ // Save the list of interned xattrs
+ ImmutableList<XAttr> newXAttrs = ImmutableList.copyOf(internedXAttrs);
if (inode.getXAttrFeature() != null) {
inode.removeXAttrFeature(snapshotId);
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java Fri Jul 25 20:33:09 2014
@@ -81,6 +81,7 @@ public class BootstrapStandby implements
private boolean force = false;
private boolean interactive = true;
+ private boolean skipSharedEditsCheck = false;
// Exit/return codes.
static final int ERR_CODE_FAILED_CONNECT = 2;
@@ -117,6 +118,8 @@ public class BootstrapStandby implements
force = true;
} else if ("-nonInteractive".equals(arg)) {
interactive = false;
+ } else if ("-skipSharedEditsCheck".equals(arg)) {
+ skipSharedEditsCheck = true;
} else {
printUsage();
throw new HadoopIllegalArgumentException(
@@ -127,7 +130,7 @@ public class BootstrapStandby implements
private void printUsage() {
System.err.println("Usage: " + this.getClass().getSimpleName() +
- "[-force] [-nonInteractive]");
+ " [-force] [-nonInteractive] [-skipSharedEditsCheck]");
}
private NamenodeProtocol createNNProtocolProxy()
@@ -200,7 +203,7 @@ public class BootstrapStandby implements
// Ensure that we have enough edits already in the shared directory to
// start up from the last checkpoint on the active.
- if (!checkLogsAvailableForRead(image, imageTxId, curTxId)) {
+ if (!skipSharedEditsCheck && !checkLogsAvailableForRead(image, imageTxId, curTxId)) {
return ERR_CODE_LOGS_UNAVAILABLE;
}