You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/11/22 04:19:04 UTC
svn commit: r1204794 [3/6] - in
/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/
src/main/java/org/apache/hadoop/hdfs/protocol/prot...
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolServerSideTranslatorR23.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolServerSideTranslatorR23.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolServerSideTranslatorR23.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolServerSideTranslatorR23.java Tue Nov 22 03:18:47 2011
@@ -21,9 +21,13 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.token.Token;
/**
* This class is used on the server side.
@@ -116,4 +120,10 @@ public class ClientDatanodeProtocolServe
public void deleteBlockPool(String bpid, boolean force) throws IOException {
server.deleteBlockPool(bpid, force);
}
+
+ @Override
+ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
+ Token<BlockTokenIdentifier> token) throws IOException {
+ return server.getBlockLocalPathInfo(block, token);
+ }
}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolTranslatorR23.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolTranslatorR23.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolTranslatorR23.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolTranslatorR23.java Tue Nov 22 03:18:47 2011
@@ -26,14 +26,17 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
/**
@@ -63,6 +66,23 @@ public class ClientDatanodeProtocolTrans
rpcProxy = createClientDatanodeProtocolProxy(addr, ticket, conf, factory);
}
+ /**
+ * Constructor.
+ * @param datanodeid Datanode to connect to.
+ * @param conf Configuration.
+ * @param socketTimeout Socket timeout to use.
+ * @throws IOException
+ */
+ public ClientDatanodeProtocolTranslatorR23(DatanodeID datanodeid,
+ Configuration conf, int socketTimeout) throws IOException {
+ InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getHost()
+ + ":" + datanodeid.getIpcPort());
+ rpcProxy = RPC.getProxy(ClientDatanodeWireProtocol.class,
+ ClientDatanodeWireProtocol.versionID, addr,
+ UserGroupInformation.getCurrentUser(), conf,
+ NetUtils.getDefaultSocketFactory(conf), socketTimeout);
+ }
+
static ClientDatanodeWireProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout,
LocatedBlock locatedBlock)
@@ -134,4 +154,9 @@ public class ClientDatanodeProtocolTrans
}
+ @Override
+ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
+ Token<BlockTokenIdentifier> token) throws IOException {
+ return rpcProxy.getBlockLocalPathInfo(block, token);
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeWireProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeWireProtocol.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeWireProtocol.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeWireProtocol.java Tue Nov 22 03:18:47 2011
@@ -24,11 +24,15 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo;
/**
@@ -78,6 +82,13 @@ public interface ClientDatanodeWireProto
void deleteBlockPool(String bpid, boolean force) throws IOException;
/**
+ * The specification of this method matches that of
+ * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
+ */
+ BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
+ Token<BlockTokenIdentifier> token) throws IOException;
+
+ /**
* This method is defined to get the protocol signature using
* the R23 protocol - hence we have added the suffix of 2 to the method name
* to avoid conflict.
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Tue Nov 22 03:18:47 2011
@@ -39,7 +39,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Nov 22 03:18:47 2011
@@ -29,8 +29,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import java.util.TreeSet;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Tue Nov 22 03:18:47 2011
@@ -20,14 +20,10 @@ package org.apache.hadoop.hdfs.server.bl
import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
-import java.util.Set;
-import java.util.TreeSet;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Tue Nov 22 03:18:47 2011
@@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs.server.bl
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java Tue Nov 22 03:18:47 2011
@@ -20,9 +20,6 @@ package org.apache.hadoop.hdfs.server.bl
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Tue Nov 22 03:18:47 2011
@@ -26,9 +26,11 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
-import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
/**
@@ -36,7 +38,9 @@ import org.apache.hadoop.util.DataChecks
* This is not related to the Block related functionality in Namenode.
* The biggest part of data block metadata is CRC for the block.
*/
-class BlockMetadataHeader {
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockMetadataHeader {
static final short METADATA_VERSION = FSDataset.METADATA_VERSION;
@@ -52,12 +56,14 @@ class BlockMetadataHeader {
this.checksum = checksum;
this.version = version;
}
-
- short getVersion() {
+
+ /** Get the version */
+ public short getVersion() {
return version;
}
- DataChecksum getChecksum() {
+ /** Get the checksum */
+ public DataChecksum getChecksum() {
return checksum;
}
@@ -68,7 +74,7 @@ class BlockMetadataHeader {
* @return Metadata Header
* @throws IOException
*/
- static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
+ public static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
return readHeader(in.readShort(), in);
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Tue Nov 22 03:18:47 2011
@@ -34,7 +34,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Random;
import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -252,8 +251,9 @@ class BlockPoolSliceScanner {
*/
long period = Math.min(scanPeriod,
Math.max(blockMap.size(),1) * 600 * 1000L);
+ int periodInt = Math.abs((int)period);
return System.currentTimeMillis() - scanPeriod +
- DFSUtil.getRandom().nextInt((int)period);
+ DFSUtil.getRandom().nextInt(periodInt);
}
/** Adds block to list of blocks */
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Nov 22 03:18:47 2011
@@ -50,7 +50,6 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.PureJavaCrc32;
/** A class that receives a block and writes to its own disk, meanwhile
* may copies it to another site. If a throttler is provided,
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Tue Nov 22 03:18:47 2011
@@ -31,7 +31,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Nov 22 03:18:47 2011
@@ -37,6 +37,7 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
@@ -48,11 +49,12 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
@@ -61,7 +63,6 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
-import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
@@ -74,7 +75,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -90,11 +90,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -103,7 +103,6 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
-import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -113,38 +112,28 @@ import org.apache.hadoop.hdfs.protocolR2
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.JspHelper;
-import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
-import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
-import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolServerSideTranslatorR23;
import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolTranslatorR23;
import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeWireProtocol;
@@ -160,8 +149,10 @@ import org.apache.hadoop.metrics2.lib.De
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -176,6 +167,8 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
+import com.google.common.base.Preconditions;
+
/**********************************************************
* DataNode is a class (and program) that stores a set of
@@ -236,8 +229,7 @@ public class DataNode extends Configured
* Use {@link NetUtils#createSocketAddr(String)} instead.
*/
@Deprecated
- public static InetSocketAddress createSocketAddr(String target
- ) throws IOException {
+ public static InetSocketAddress createSocketAddr(String target) {
return NetUtils.createSocketAddr(target);
}
@@ -331,14 +323,14 @@ public class DataNode extends Configured
}
}
- void joinAll() throws InterruptedException {
+ void joinAll() {
for (BPOfferService bpos: this.getAllNamenodeThreads()) {
bpos.join();
}
}
void refreshNamenodes(Configuration conf)
- throws IOException, InterruptedException {
+ throws IOException {
LOG.info("Refresh request received for nameservices: "
+ conf.get(DFS_FEDERATION_NAMESERVICES));
List<InetSocketAddress> newAddresses =
@@ -396,8 +388,6 @@ public class DataNode extends Configured
private volatile String hostName; // Host name of this datanode
- private static String dnThreadName;
-
boolean isBlockTokenEnabled;
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
@@ -414,6 +404,8 @@ public class DataNode extends Configured
private AbstractList<File> dataDirs;
private Configuration conf;
+ private final String userWithLocalPathAccess;
+
/**
* Create the DataNode given a configuration and an array of dataDirs.
* 'dataDirs' is where the blocks are stored.
@@ -432,6 +424,8 @@ public class DataNode extends Configured
final SecureResources resources) throws IOException {
super(conf);
+ this.userWithLocalPathAccess = conf
+ .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
try {
hostName = getHostName(conf);
startDataNode(conf, dataDirs, resources);
@@ -452,11 +446,8 @@ public class DataNode extends Configured
private static String getHostName(Configuration config)
throws UnknownHostException {
- String name = null;
// use configured nameserver & interface to get local hostname
- if (config.get(DFS_DATANODE_HOST_NAME_KEY) != null) {
- name = config.get(DFS_DATANODE_HOST_NAME_KEY);
- }
+ String name = config.get(DFS_DATANODE_HOST_NAME_KEY);
if (name == null) {
name = DNS
.getDefaultHost(config.get(DFS_DATANODE_DNS_INTERFACE_KEY,
@@ -481,11 +472,11 @@ public class DataNode extends Configured
if(LOG.isDebugEnabled()) {
LOG.debug("Datanode listening on " + infoHost + ":" + tmpInfoPort);
}
- if (conf.getBoolean("dfs.https.enable", false)) {
+ if (conf.getBoolean(DFS_HTTPS_ENABLE_KEY, false)) {
boolean needClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
- "dfs.datanode.https.address", infoHost + ":" + 0));
+ DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 0));
Configuration sslConf = new HdfsConfiguration(false);
sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
"ssl-server.xml"));
@@ -526,7 +517,7 @@ public class DataNode extends Configured
private void initIpcServer(Configuration conf) throws IOException {
InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
- conf.get("dfs.datanode.ipc.address"));
+ conf.get(DFS_DATANODE_IPC_ADDRESS_KEY));
// Add all the RPC protocols that the Datanode implements
ClientDatanodeProtocolServerSideTranslatorR23
@@ -692,679 +683,10 @@ public class DataNode extends Configured
this.heartbeatsDisabledForTests = heartbeatsDisabledForTests;
}
- /**
- * A thread per namenode to perform:
- * <ul>
- * <li> Pre-registration handshake with namenode</li>
- * <li> Registration with namenode</li>
- * <li> Send periodic heartbeats to the namenode</li>
- * <li> Handle commands received from the datanode</li>
- * </ul>
- */
- @InterfaceAudience.Private
- static class BPOfferService implements Runnable {
- final InetSocketAddress nnAddr;
- DatanodeRegistration bpRegistration;
- NamespaceInfo bpNSInfo;
- long lastBlockReport = 0;
- long lastDeletedReport = 0;
-
- boolean resetBlockReportTime = true;
-
- private Thread bpThread;
- private DatanodeProtocol bpNamenode;
- private String blockPoolId;
- private long lastHeartbeat = 0;
- private volatile boolean initialized = false;
- private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList
- = new LinkedList<ReceivedDeletedBlockInfo>();
- private volatile int pendingReceivedRequests = 0;
- private volatile boolean shouldServiceRun = true;
- UpgradeManagerDatanode upgradeManager = null;
- private final DataNode dn;
- private final DNConf dnConf;
-
- BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
- this.dn = dn;
- this.bpRegistration = dn.createRegistration();
- this.nnAddr = nnAddr;
- this.dnConf = dn.getDnConf();
- }
-
- /**
- * returns true if BP thread has completed initialization of storage
- * and has registered with the corresponding namenode
- * @return true if initialized
- */
- public boolean initialized() {
- return initialized;
- }
-
- public boolean isAlive() {
- return shouldServiceRun && bpThread.isAlive();
- }
-
- public String getBlockPoolId() {
- return blockPoolId;
- }
-
- private InetSocketAddress getNNSocketAddress() {
- return nnAddr;
- }
-
- void setNamespaceInfo(NamespaceInfo nsinfo) {
- bpNSInfo = nsinfo;
- this.blockPoolId = nsinfo.getBlockPoolID();
- }
-
- void setNameNode(DatanodeProtocol dnProtocol) {
- bpNamenode = dnProtocol;
- }
-
- private NamespaceInfo handshake() throws IOException {
- NamespaceInfo nsInfo = new NamespaceInfo();
- while (dn.shouldRun && shouldServiceRun) {
- try {
- nsInfo = bpNamenode.versionRequest();
- // verify build version
- String nsVer = nsInfo.getBuildVersion();
- String stVer = Storage.getBuildVersion();
- LOG.info("handshake: namespace info = " + nsInfo);
-
- if(! nsVer.equals(stVer)) {
- String errorMsg = "Incompatible build versions: bp = " + blockPoolId +
- "namenode BV = " + nsVer + "; datanode BV = " + stVer;
- LOG.warn(errorMsg);
- bpNamenode.errorReport( bpRegistration,
- DatanodeProtocol.NOTIFY, errorMsg );
- } else {
- break;
- }
- } catch(SocketTimeoutException e) { // namenode is busy
- LOG.warn("Problem connecting to server: " + nnAddr);
- } catch(IOException e ) { // namenode is not available
- LOG.warn("Problem connecting to server: " + nnAddr);
- }
-
- // try again in a second
- try {
- Thread.sleep(5000);
- } catch (InterruptedException ie) {}
- }
-
- assert HdfsConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
- "Data-node and name-node layout versions must be the same."
- + "Expected: "+ HdfsConstants.LAYOUT_VERSION
- + " actual "+ nsInfo.getLayoutVersion();
- return nsInfo;
- }
-
- void setupBP(Configuration conf)
- throws IOException {
- // get NN proxy
- DatanodeProtocol dnp =
- (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
- DatanodeProtocol.versionID, nnAddr, conf);
- setNameNode(dnp);
-
- // handshake with NN
- NamespaceInfo nsInfo = handshake();
- setNamespaceInfo(nsInfo);
- dn.initBlockPool(this, nsInfo);
-
- bpRegistration.setStorageID(dn.getStorageId());
- StorageInfo storageInfo = dn.storage.getBPStorage(blockPoolId);
- if (storageInfo == null) {
- // it's null in the case of SimulatedDataSet
- bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION;
- bpRegistration.setStorageInfo(nsInfo);
- } else {
- bpRegistration.setStorageInfo(storageInfo);
- }
- }
-
- /**
- * This methods arranges for the data node to send the block report at
- * the next heartbeat.
- */
- void scheduleBlockReport(long delay) {
- if (delay > 0) { // send BR after random delay
- lastBlockReport = System.currentTimeMillis()
- - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
- } else { // send at next heartbeat
- lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
- }
- resetBlockReportTime = true; // reset future BRs for randomness
- }
-
- private void reportBadBlocks(ExtendedBlock block) {
- DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
- LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) };
-
- try {
- bpNamenode.reportBadBlocks(blocks);
- } catch (IOException e){
- /* One common reason is that NameNode could be in safe mode.
- * Should we keep on retrying in that case?
- */
- LOG.warn("Failed to report bad block " + block + " to namenode : "
- + " Exception", e);
- }
-
- }
-
- /**
- * Report received blocks and delete hints to the Namenode
- *
- * @throws IOException
- */
- private void reportReceivedDeletedBlocks() throws IOException {
-
- // check if there are newly received blocks
- ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
- int currentReceivedRequestsCounter;
- synchronized (receivedAndDeletedBlockList) {
- currentReceivedRequestsCounter = pendingReceivedRequests;
- int numBlocks = receivedAndDeletedBlockList.size();
- if (numBlocks > 0) {
- //
- // Send newly-received and deleted blockids to namenode
- //
- receivedAndDeletedBlockArray = receivedAndDeletedBlockList
- .toArray(new ReceivedDeletedBlockInfo[numBlocks]);
- }
- }
- if (receivedAndDeletedBlockArray != null) {
- bpNamenode.blockReceivedAndDeleted(bpRegistration, blockPoolId,
- receivedAndDeletedBlockArray);
- synchronized (receivedAndDeletedBlockList) {
- for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) {
- receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]);
- }
- pendingReceivedRequests -= currentReceivedRequestsCounter;
- }
- }
- }
-
- /*
- * Informing the name node could take a long long time! Should we wait
- * till namenode is informed before responding with success to the
- * client? For now we don't.
- */
- void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
- if (block == null || delHint == null) {
- throw new IllegalArgumentException(block == null ? "Block is null"
- : "delHint is null");
- }
-
- if (!block.getBlockPoolId().equals(blockPoolId)) {
- LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
- + blockPoolId);
- return;
- }
-
- synchronized (receivedAndDeletedBlockList) {
- receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
- .getLocalBlock(), delHint));
- pendingReceivedRequests++;
- receivedAndDeletedBlockList.notifyAll();
- }
- }
-
- void notifyNamenodeDeletedBlock(ExtendedBlock block) {
- if (block == null) {
- throw new IllegalArgumentException("Block is null");
- }
-
- if (!block.getBlockPoolId().equals(blockPoolId)) {
- LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
- + blockPoolId);
- return;
- }
-
- synchronized (receivedAndDeletedBlockList) {
- receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
- .getLocalBlock(), ReceivedDeletedBlockInfo.TODELETE_HINT));
- }
- }
-
-
- /**
- * Report the list blocks to the Namenode
- * @throws IOException
- */
- DatanodeCommand blockReport() throws IOException {
- // send block report if timer has expired.
- DatanodeCommand cmd = null;
- long startTime = now();
- if (startTime - lastBlockReport > dnConf.blockReportInterval) {
-
- // Create block report
- long brCreateStartTime = now();
- BlockListAsLongs bReport = dn.data.getBlockReport(blockPoolId);
-
- // Send block report
- long brSendStartTime = now();
- cmd = bpNamenode.blockReport(bpRegistration, blockPoolId, bReport
- .getBlockListAsLongs());
-
- // Log the block report processing stats from Datanode perspective
- long brSendCost = now() - brSendStartTime;
- long brCreateCost = brSendStartTime - brCreateStartTime;
- dn.metrics.addBlockReport(brSendCost);
- LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
- + " blocks took " + brCreateCost + " msec to generate and "
- + brSendCost + " msecs for RPC and NN processing");
-
- // If we have sent the first block report, then wait a random
- // time before we start the periodic block reports.
- if (resetBlockReportTime) {
- lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
- resetBlockReportTime = false;
- } else {
- /* say the last block report was at 8:20:14. The current report
- * should have started around 9:20:14 (default 1 hour interval).
- * If current time is :
- * 1) normal like 9:20:18, next report should be at 10:20:14
- * 2) unexpected like 11:35:43, next report should be at 12:20:14
- */
- lastBlockReport += (now() - lastBlockReport) /
- dnConf.blockReportInterval * dnConf.blockReportInterval;
- }
- LOG.info("sent block report, processed command:" + cmd);
- }
- return cmd;
- }
-
-
- DatanodeCommand [] sendHeartBeat() throws IOException {
- return bpNamenode.sendHeartbeat(bpRegistration,
- dn.data.getCapacity(),
- dn.data.getDfsUsed(),
- dn.data.getRemaining(),
- dn.data.getBlockPoolUsed(blockPoolId),
- dn.xmitsInProgress.get(),
- dn.getXceiverCount(), dn.data.getNumFailedVolumes());
- }
-
- //This must be called only by blockPoolManager
- void start() {
- if ((bpThread != null) && (bpThread.isAlive())) {
- //Thread is started already
- return;
- }
- bpThread = new Thread(this, dnThreadName);
- bpThread.setDaemon(true); // needed for JUnit testing
- bpThread.start();
- }
-
- //This must be called only by blockPoolManager.
- void stop() {
- shouldServiceRun = false;
- if (bpThread != null) {
- bpThread.interrupt();
- }
- }
-
- //This must be called only by blockPoolManager
- void join() {
- try {
- if (bpThread != null) {
- bpThread.join();
- }
- } catch (InterruptedException ie) { }
- }
-
- //Cleanup method to be called by current thread before exiting.
- private synchronized void cleanUp() {
-
- if(upgradeManager != null)
- upgradeManager.shutdownUpgrade();
- shouldServiceRun = false;
- RPC.stopProxy(bpNamenode);
- dn.shutdownBlockPool(this);
- }
-
- /**
- * Main loop for each BP thread. Run until shutdown,
- * forever calling remote NameNode functions.
- */
- private void offerService() throws Exception {
- LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
- + dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
- + dnConf.blockReportInterval + "msec" + " Initial delay: "
- + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
- + dnConf.heartBeatInterval);
-
- //
- // Now loop for a long time....
- //
- while (dn.shouldRun && shouldServiceRun) {
- try {
- long startTime = now();
-
- //
- // Every so often, send heartbeat or block-report
- //
- if (startTime - lastHeartbeat > dnConf.heartBeatInterval) {
- //
- // All heartbeat messages include following info:
- // -- Datanode name
- // -- data transfer port
- // -- Total capacity
- // -- Bytes remaining
- //
- lastHeartbeat = startTime;
- if (!dn.heartbeatsDisabledForTests) {
- DatanodeCommand[] cmds = sendHeartBeat();
- dn.metrics.addHeartbeat(now() - startTime);
-
- long startProcessCommands = now();
- if (!processCommand(cmds))
- continue;
- long endProcessCommands = now();
- if (endProcessCommands - startProcessCommands > 2000) {
- LOG.info("Took " + (endProcessCommands - startProcessCommands) +
- "ms to process " + cmds.length + " commands from NN");
- }
- }
- }
- if (pendingReceivedRequests > 0
- || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
- reportReceivedDeletedBlocks();
- lastDeletedReport = startTime;
- }
-
- DatanodeCommand cmd = blockReport();
- processCommand(cmd);
-
- // Now safe to start scanning the block pool
- if (dn.blockScanner != null) {
- dn.blockScanner.addBlockPool(this.blockPoolId);
- }
-
- //
- // There is no work to do; sleep until hearbeat timer elapses,
- // or work arrives, and then iterate again.
- //
- long waitTime = dnConf.heartBeatInterval -
- (System.currentTimeMillis() - lastHeartbeat);
- synchronized(receivedAndDeletedBlockList) {
- if (waitTime > 0 && pendingReceivedRequests == 0) {
- try {
- receivedAndDeletedBlockList.wait(waitTime);
- } catch (InterruptedException ie) {
- LOG.warn("BPOfferService for block pool="
- + this.getBlockPoolId() + " received exception:" + ie);
- }
- }
- } // synchronized
- } catch(RemoteException re) {
- String reClass = re.getClassName();
- if (UnregisteredNodeException.class.getName().equals(reClass) ||
- DisallowedDatanodeException.class.getName().equals(reClass) ||
- IncorrectVersionException.class.getName().equals(reClass)) {
- LOG.warn("blockpool " + blockPoolId + " is shutting down", re);
- shouldServiceRun = false;
- return;
- }
- LOG.warn("RemoteException in offerService", re);
- try {
- long sleepTime = Math.min(1000, dnConf.heartBeatInterval);
- Thread.sleep(sleepTime);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- } catch (IOException e) {
- LOG.warn("IOException in offerService", e);
- }
- } // while (shouldRun && shouldServiceRun)
- } // offerService
-
- /**
- * Register one bp with the corresponding NameNode
- * <p>
- * The bpDatanode needs to register with the namenode on startup in order
- * 1) to report which storage it is serving now and
- * 2) to receive a registrationID
- *
- * issued by the namenode to recognize registered datanodes.
- *
- * @see FSNamesystem#registerDatanode(DatanodeRegistration)
- * @throws IOException
- */
- void register() throws IOException {
- LOG.info("in register: sid=" + bpRegistration.getStorageID() + ";SI="
- + bpRegistration.storageInfo);
-
- // build and layout versions should match
- String nsBuildVer = bpNamenode.versionRequest().getBuildVersion();
- String stBuildVer = Storage.getBuildVersion();
-
- if (!nsBuildVer.equals(stBuildVer)) {
- LOG.warn("Data-node and name-node Build versions must be " +
- "the same. Namenode build version: " + nsBuildVer + "Datanode " +
- "build version: " + stBuildVer);
- throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer);
- }
-
- if (HdfsConstants.LAYOUT_VERSION != bpNSInfo.getLayoutVersion()) {
- LOG.warn("Data-node and name-node layout versions must be " +
- "the same. Expected: "+ HdfsConstants.LAYOUT_VERSION +
- " actual "+ bpNSInfo.getLayoutVersion());
- throw new IncorrectVersionException
- (bpNSInfo.getLayoutVersion(), "namenode");
- }
-
- while(dn.shouldRun && shouldServiceRun) {
- try {
- // Use returned registration from namenode with updated machine name.
- bpRegistration = bpNamenode.registerDatanode(bpRegistration);
-
- LOG.info("bpReg after =" + bpRegistration.storageInfo +
- ";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName());
-
- break;
- } catch(SocketTimeoutException e) { // namenode is busy
- LOG.info("Problem connecting to server: " + nnAddr);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {}
- }
- }
-
- dn.bpRegistrationSucceeded(bpRegistration, blockPoolId);
-
- LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo);
-
- // random short delay - helps scatter the BR from all DNs
- scheduleBlockReport(dnConf.initialBlockReportDelay);
- }
-
-
- /**
- * No matter what kind of exception we get, keep retrying to offerService().
- * That's the loop that connects to the NameNode and provides basic DataNode
- * functionality.
- *
- * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
- * happen either at shutdown or due to refreshNamenodes.
- */
- @Override
- public void run() {
- LOG.info(bpRegistration + "In BPOfferService.run, data = " + dn.data
- + ";bp=" + blockPoolId);
-
- try {
- // init stuff
- try {
- // setup storage
- setupBP(dn.conf);
- register();
- } catch (IOException ioe) {
- // Initial handshake, storage recovery or registration failed
- // End BPOfferService thread
- LOG.fatal(bpRegistration + " initialization failed for block pool "
- + blockPoolId, ioe);
- return;
- }
-
- initialized = true; // bp is initialized;
-
- while (dn.shouldRun && shouldServiceRun) {
- try {
- startDistributedUpgradeIfNeeded();
- offerService();
- } catch (Exception ex) {
- LOG.error("Exception in BPOfferService", ex);
- if (dn.shouldRun && shouldServiceRun) {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException ie) {
- LOG.warn("Received exception", ie);
- }
- }
- }
- }
- } catch (Throwable ex) {
- LOG.warn("Unexpected exception", ex);
- } finally {
- LOG.warn(bpRegistration + " ending block pool service for: "
- + blockPoolId + " thread " + Thread.currentThread().getId());
- cleanUp();
- }
- }
-
- /**
- * Process an array of datanode commands
- *
- * @param cmds an array of datanode commands
- * @return true if further processing may be required or false otherwise.
- */
- private boolean processCommand(DatanodeCommand[] cmds) {
- if (cmds != null) {
- for (DatanodeCommand cmd : cmds) {
- try {
- if (processCommand(cmd) == false) {
- return false;
- }
- } catch (IOException ioe) {
- LOG.warn("Error processing datanode Command", ioe);
- }
- }
- }
- return true;
- }
-
- /**
- *
- * @param cmd
- * @return true if further processing may be required or false otherwise.
- * @throws IOException
- */
- private boolean processCommand(DatanodeCommand cmd) throws IOException {
- if (cmd == null)
- return true;
- final BlockCommand bcmd =
- cmd instanceof BlockCommand? (BlockCommand)cmd: null;
-
- switch(cmd.getAction()) {
- case DatanodeProtocol.DNA_TRANSFER:
- // Send a copy of a block to another datanode
- dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
- dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
- break;
- case DatanodeProtocol.DNA_INVALIDATE:
- //
- // Some local block(s) are obsolete and can be
- // safely garbage-collected.
- //
- Block toDelete[] = bcmd.getBlocks();
- try {
- if (dn.blockScanner != null) {
- dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
- }
- // using global fsdataset
- dn.data.invalidate(bcmd.getBlockPoolId(), toDelete);
- } catch(IOException e) {
- dn.checkDiskError();
- throw e;
- }
- dn.metrics.incrBlocksRemoved(toDelete.length);
- break;
- case DatanodeProtocol.DNA_SHUTDOWN:
- // shut down the data node
- shouldServiceRun = false;
- return false;
- case DatanodeProtocol.DNA_REGISTER:
- // namenode requested a registration - at start or if NN lost contact
- LOG.info("DatanodeCommand action: DNA_REGISTER");
- if (dn.shouldRun && shouldServiceRun) {
- register();
- }
- break;
- case DatanodeProtocol.DNA_FINALIZE:
- dn.storage.finalizeUpgrade(((FinalizeCommand) cmd)
- .getBlockPoolId());
- break;
- case UpgradeCommand.UC_ACTION_START_UPGRADE:
- // start distributed upgrade here
- processDistributedUpgradeCommand((UpgradeCommand)cmd);
- break;
- case DatanodeProtocol.DNA_RECOVERBLOCK:
- dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
- break;
- case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
- LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
- if (dn.isBlockTokenEnabled) {
- dn.blockPoolTokenSecretManager.setKeys(blockPoolId,
- ((KeyUpdateCommand) cmd).getExportedKeys());
- }
- break;
- case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
- LOG.info("DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");
- long bandwidth =
- ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();
- if (bandwidth > 0) {
- DataXceiverServer dxcs =
- (DataXceiverServer) dn.dataXceiverServer.getRunnable();
- dxcs.balanceThrottler.setBandwidth(bandwidth);
- }
- break;
- default:
- LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
- }
- return true;
- }
-
- private void processDistributedUpgradeCommand(UpgradeCommand comm)
- throws IOException {
- UpgradeManagerDatanode upgradeManager = getUpgradeManager();
- upgradeManager.processUpgradeCommand(comm);
- }
-
- synchronized UpgradeManagerDatanode getUpgradeManager() {
- if(upgradeManager == null)
- upgradeManager =
- new UpgradeManagerDatanode(dn, blockPoolId);
-
- return upgradeManager;
- }
-
- /**
- * Start distributed upgrade if it should be initiated by the data-node.
- */
- private void startDistributedUpgradeIfNeeded() throws IOException {
- UpgradeManagerDatanode um = getUpgradeManager();
-
- if(!um.getUpgradeState())
- return;
- um.setUpgradeState(false, um.getUpgradeVersion());
- um.startUpgrade();
- return;
- }
+ boolean areHeartbeatsDisabledForTests() {
+ return this.heartbeatsDisabledForTests;
}
-
+
/**
* This method starts the data node with the specified conf.
*
@@ -1407,12 +729,32 @@ public class DataNode extends Configured
}
/**
+ * Create a DatanodeRegistration for a specific block pool.
+ * @param nsInfo the namespace info from the first part of the NN handshake
+ */
+ DatanodeRegistration createBPRegistration(NamespaceInfo nsInfo) {
+ DatanodeRegistration bpRegistration = createUnknownBPRegistration();
+ String blockPoolId = nsInfo.getBlockPoolID();
+
+ bpRegistration.setStorageID(getStorageId());
+ StorageInfo storageInfo = storage.getBPStorage(blockPoolId);
+ if (storageInfo == null) {
+ // it's null in the case of SimulatedDataSet
+ bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION;
+ bpRegistration.setStorageInfo(nsInfo);
+ } else {
+ bpRegistration.setStorageInfo(storageInfo);
+ }
+ return bpRegistration;
+ }
+
+ /**
* Check that the registration returned from a NameNode is consistent
* with the information in the storage. If the storage is fresh/unformatted,
* sets the storage ID based on this registration.
* Also updates the block pool's state in the secret manager.
*/
- private synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration,
+ synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration,
String blockPoolId)
throws IOException {
hostName = bpRegistration.getHost();
@@ -1469,7 +811,7 @@ public class DataNode extends Configured
/**
* Remove the given block pool from the block scanner, dataset, and storage.
*/
- private void shutdownBlockPool(BPOfferService bpos) {
+ void shutdownBlockPool(BPOfferService bpos) {
blockPoolManager.remove(bpos);
String bpId = bpos.getBlockPoolId();
@@ -1486,11 +828,27 @@ public class DataNode extends Configured
}
}
- void initBlockPool(BPOfferService bpOfferService,
- NamespaceInfo nsInfo) throws IOException {
+ /**
+ * One of the Block Pools has successfully connected to its NN.
+ * This initializes the local storage for that block pool,
+ * checks consistency of the NN's cluster ID, etc.
+ *
+ * If this is the first block pool to register, this also initializes
+ * the datanode-scoped storage.
+ *
+ * @param nsInfo the handshake response from the NN.
+ * @throws IOException if the NN is inconsistent with the local storage.
+ */
+ void initBlockPool(BPOfferService bpos) throws IOException {
+ NamespaceInfo nsInfo = bpos.getNamespaceInfo();
+ Preconditions.checkState(nsInfo != null,
+ "Block pool " + bpos + " should have retrieved " +
+ "its namespace info before calling initBlockPool.");
+
String blockPoolId = nsInfo.getBlockPoolID();
- blockPoolManager.addBlockPool(bpOfferService);
+ // Register the new block pool with the BP manager.
+ blockPoolManager.addBlockPool(bpos);
synchronized (this) {
// we do not allow namenode from different cluster to register
@@ -1521,12 +879,21 @@ public class DataNode extends Configured
+ blockPoolId + ";lv=" + storage.getLayoutVersion() +
";nsInfo=" + nsInfo);
}
+
+ // In the case that this is the first block pool to connect, initialize
+ // the dataset, block scanners, etc.
initFsDataSet();
- initPeriodicScanners(conf);
- data.addBlockPool(nsInfo.getBlockPoolID(), conf);
+ initPeriodicScanners(conf);
+
+ data.addBlockPool(blockPoolId, conf);
}
- private DatanodeRegistration createRegistration() {
+ /**
+ * Create a DatanodeRegistration object with no valid StorageInfo.
+ * This is used when reporting an error during handshake - ie
+ * before we can load any specific block pool.
+ */
+ private DatanodeRegistration createUnknownBPRegistration() {
DatanodeRegistration reg = new DatanodeRegistration(getMachineName());
reg.setInfoPort(infoServer.getPort());
reg.setIpcPort(getIpcPort());
@@ -1717,6 +1084,68 @@ public class DataNode extends Configured
return "DS-" + rand + "-" + ip + "-" + port + "-"
+ System.currentTimeMillis();
}
+
+ /** Ensure the authentication method is kerberos */
+ private void checkKerberosAuthMethod(String msg) throws IOException {
+ // User invoking the call must be same as the datanode user
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+ if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() !=
+ AuthenticationMethod.KERBEROS) {
+ throw new AccessControlException("Error in " + msg
+ + "Only kerberos based authentication is allowed.");
+ }
+ }
+
+ private void checkBlockLocalPathAccess() throws IOException {
+ checkKerberosAuthMethod("getBlockLocalPathInfo()");
+ String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+ if (!currentUser.equals(this.userWithLocalPathAccess)) {
+ throw new AccessControlException(
+ "Can't continue with getBlockLocalPathInfo() "
+ + "authorization. The user " + currentUser
+ + " is not allowed to call getBlockLocalPathInfo");
+ }
+ }
+
+ @Override
+ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
+ Token<BlockTokenIdentifier> token) throws IOException {
+ checkBlockLocalPathAccess();
+ checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ);
+ BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
+ if (LOG.isDebugEnabled()) {
+ if (info != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("getBlockLocalPathInfo successful block=" + block
+ + " blockfile " + info.getBlockPath() + " metafile "
+ + info.getMetaPath());
+ }
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("getBlockLocalPathInfo for block=" + block
+ + " returning null");
+ }
+ }
+ }
+ metrics.incrBlocksGetLocalPathInfo();
+ return info;
+ }
+
+ private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> token,
+ AccessMode accessMode) throws IOException {
+ if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
+ BlockTokenIdentifier id = new BlockTokenIdentifier();
+ ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ id.readFields(in);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got: " + id.toString());
+ }
+ blockPoolTokenSecretManager.checkAccess(id, null, block, accessMode);
+ }
+ }
/**
* Shut down this instance of the datanode.
@@ -1917,7 +1346,7 @@ public class DataNode extends Configured
}
}
- private void transferBlocks(String poolId, Block blocks[],
+ void transferBlocks(String poolId, Block blocks[],
DatanodeInfo xferTargets[][]) {
for (int i = 0; i < blocks.length; i++) {
try {
@@ -2034,7 +1463,7 @@ public class DataNode extends Configured
* entire target list, the block, and the data.
*/
DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
- final String clientname) throws IOException {
+ final String clientname) {
if (DataTransferProtocol.LOG.isDebugEnabled()) {
DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
+ b + " (numBytes=" + b.getNumBytes() + ")"
@@ -2209,9 +1638,7 @@ public class DataNode extends Configured
System.exit(-1);
}
Collection<URI> dataDirs = getStorageDirs(conf);
- dnThreadName = "DataNode: [" +
- StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "]";
- UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation.setConfiguration(conf);
SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
DFS_DATANODE_USER_NAME_KEY);
return makeInstance(dataDirs, conf, resources);
@@ -2554,16 +1981,6 @@ public class DataNode extends Configured
return bpos.bpNamenode;
}
- /**
- * To be used by tests only to set a mock namenode in BPOfferService
- */
- void setBPNamenode(String bpid, DatanodeProtocol namenode) {
- BPOfferService bp = blockPoolManager.get(bpid);
- if (bp != null) {
- bp.setNameNode(namenode);
- }
- }
-
/** Block synchronization */
void syncBlock(RecoveringBlock rBlock,
List<BlockRecord> syncList) throws IOException {
@@ -2753,6 +2170,14 @@ public class DataNode extends Configured
}
}
+ /**
+ * Finalize a pending upgrade in response to DNA_FINALIZE.
+ * @param blockPoolId the block pool to finalize
+ */
+ void finalizeUpgradeForPool(String blockPoolId) throws IOException {
+ storage.finalizeUpgrade(blockPoolId);
+ }
+
// Determine a Datanode's streaming address
public static InetSocketAddress getStreamingAddr(Configuration conf) {
return NetUtils.createSocketAddr(
@@ -2789,7 +2214,7 @@ public class DataNode extends Configured
final Map<String, String> info = new HashMap<String, String>();
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null && bpos.bpThread != null) {
- info.put(bpos.getNNSocketAddress().getHostName(), bpos.blockPoolId);
+ info.put(bpos.getNNSocketAddress().getHostName(), bpos.getBlockPoolId());
}
}
return JSON.toString(info);
@@ -2820,13 +2245,7 @@ public class DataNode extends Configured
}
public void refreshNamenodes(Configuration conf) throws IOException {
- try {
- blockPoolManager.refreshNamenodes(conf);
- } catch (InterruptedException ex) {
- IOException eio = new IOException();
- eio.initCause(ex);
- throw eio;
- }
+ blockPoolManager.refreshNamenodes(conf);
}
@Override //ClientDatanodeProtocol
@@ -2877,7 +2296,7 @@ public class DataNode extends Configured
*/
public boolean isDatanodeFullyStarted() {
for (BPOfferService bp : blockPoolManager.getAllNamenodeThreads()) {
- if (!bp.initialized() || !bp.isAlive()) {
+ if (!bp.isInitialized() || !bp.isAlive()) {
return false;
}
}
@@ -2904,4 +2323,9 @@ public class DataNode extends Configured
DNConf getDnConf() {
return dnConf;
}
+
+ boolean shouldRun() {
+ return shouldRun;
+ }
+
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Nov 22 03:18:47 2011
@@ -48,8 +48,6 @@ import org.apache.hadoop.hdfs.protocol.d
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
@@ -128,7 +126,7 @@ class DataXceiver extends Receiver imple
public void run() {
int opsProcessed = 0;
Op op = null;
- dataXceiverServer.childSockets.put(s, s);
+ dataXceiverServer.childSockets.add(s);
try {
int stdTimeout = s.getSoTimeout();
@@ -165,14 +163,6 @@ class DataXceiver extends Receiver imple
s.setSoTimeout(stdTimeout);
}
- // Make sure the xceiver count is not exceeded
- int curXceiverCount = datanode.getXceiverCount();
- if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
- throw new IOException("xceiverCount " + curXceiverCount
- + " exceeds the limit of concurrent xcievers "
- + dataXceiverServer.maxXceiverCount);
- }
-
opStartTime = now();
processOp(op);
++opsProcessed;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Tue Nov 22 03:18:47 2011
@@ -23,9 +23,9 @@ import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.channels.AsynchronousCloseException;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
-import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
@@ -48,8 +48,8 @@ class DataXceiverServer implements Runna
ServerSocket ss;
DataNode datanode;
// Record all sockets opened for data transfer
- Map<Socket, Socket> childSockets = Collections.synchronizedMap(
- new HashMap<Socket, Socket>());
+ Set<Socket> childSockets = Collections.synchronizedSet(
+ new HashSet<Socket>());
/**
* Maximal number of concurrent xceivers per node.
@@ -135,6 +135,15 @@ class DataXceiverServer implements Runna
try {
s = ss.accept();
s.setTcpNoDelay(true);
+
+ // Make sure the xceiver count is not exceeded
+ int curXceiverCount = datanode.getXceiverCount();
+ if (curXceiverCount > maxXceiverCount) {
+ throw new IOException("Xceiver count " + curXceiverCount
+ + " exceeds the limit of concurrent xcievers: "
+ + maxXceiverCount);
+ }
+
new Daemon(datanode.threadGroup, new DataXceiver(s, datanode, this))
.start();
} catch (SocketTimeoutException ignored) {
@@ -184,7 +193,7 @@ class DataXceiverServer implements Runna
// close all the sockets that were accepted earlier
synchronized (childSockets) {
- for (Iterator<Socket> it = childSockets.values().iterator();
+ for (Iterator<Socket> it = childSockets.iterator();
it.hasNext();) {
Socket thissock = it.next();
try {
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Nov 22 03:18:47 2011
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@@ -459,7 +460,7 @@ public class FSDataset implements FSData
long metaFileLen = metaFile.length();
int crcHeaderLen = DataChecksum.getChecksumHeaderSize();
if (!blockFile.exists() || blockFileLen == 0 ||
- !metaFile.exists() || metaFileLen < (long)crcHeaderLen) {
+ !metaFile.exists() || metaFileLen < crcHeaderLen) {
return 0;
}
checksumIn = new DataInputStream(
@@ -578,7 +579,7 @@ public class FSDataset implements FSData
* reserved capacity.
* @return the unreserved number of bytes left in this filesystem. May be zero.
*/
- long getCapacity() throws IOException {
+ long getCapacity() {
long remaining = usage.getCapacity() - reserved;
return remaining > 0 ? remaining : 0;
}
@@ -818,7 +819,7 @@ public class FSDataset implements FSData
return dfsUsed;
}
- private long getCapacity() throws IOException {
+ private long getCapacity() {
long capacity = 0L;
for (FSVolume vol : volumes) {
capacity += vol.getCapacity();
@@ -1667,7 +1668,7 @@ public class FSDataset implements FSData
}
if (!oldmeta.renameTo(newmeta)) {
replicaInfo.setGenerationStamp(oldGS); // restore old GS
- throw new IOException("Block " + (Block)replicaInfo + " reopen failed. " +
+ throw new IOException("Block " + replicaInfo + " reopen failed. " +
" Unable to move meta file " + oldmeta +
" to " + newmeta);
}
@@ -2018,7 +2019,7 @@ public class FSDataset implements FSData
/**
* Find the file corresponding to the block and return it if it exists.
*/
- File validateBlockFile(String bpid, Block b) throws IOException {
+ File validateBlockFile(String bpid, Block b) {
//Should we check for metadata file too?
File f = getFile(bpid, b);
@@ -2327,7 +2328,7 @@ public class FSDataset implements FSData
if (datanode.blockScanner != null) {
datanode.blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
}
- DataNode.LOG.warn("Added missing block to memory " + (Block)diskBlockInfo);
+ DataNode.LOG.warn("Added missing block to memory " + diskBlockInfo);
return;
}
/*
@@ -2600,7 +2601,7 @@ public class FSDataset implements FSData
* get list of all bpids
* @return list of bpids
*/
- public String [] getBPIdlist() throws IOException {
+ public String [] getBPIdlist() {
return volumeMap.getBlockPoolList();
}
@@ -2658,4 +2659,14 @@ public class FSDataset implements FSData
volume.deleteBPDirectories(bpid, force);
}
}
+
+ @Override // FSDatasetInterface
+ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
+ throws IOException {
+ File datafile = getBlockFile(block);
+ File metafile = getMetaFile(datafile, block.getGenerationStamp());
+ BlockLocalPathInfo info = new BlockLocalPathInfo(block,
+ datafile.getAbsolutePath(), metafile.getAbsolutePath());
+ return info;
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Tue Nov 22 03:18:47 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.da
import java.io.Closeable;
+import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
@@ -402,4 +404,9 @@ public interface FSDatasetInterface exte
* @throws IOException
*/
public void deleteBlockPool(String bpid, boolean force) throws IOException;
+
+ /**
+ * Get {@link BlockLocalPathInfo} for the given block.
+ **/
+ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) throws IOException;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java Tue Nov 22 03:18:47 2011
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import java.io.DataInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Tue Nov 22 03:18:47 2011
@@ -60,6 +60,7 @@ public class DataNodeMetrics {
@Metric MutableCounterLong readsFromRemoteClient;
@Metric MutableCounterLong writesFromLocalClient;
@Metric MutableCounterLong writesFromRemoteClient;
+ @Metric MutableCounterLong blocksGetLocalPathInfo;
@Metric MutableCounterLong volumeFailures;
@@ -165,4 +166,9 @@ public class DataNodeMetrics {
public void incrVolumeFailures() {
volumeFailures.incr();
}
+
+ /** Increment for getBlockLocalPathInfo calls */
+ public void incrBlocksGetLocalPathInfo() {
+ blocksGetLocalPathInfo.incr();
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Tue Nov 22 03:18:47 2011
@@ -17,14 +17,10 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import java.util.zip.Checksum;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Tue Nov 22 03:18:47 2011
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.na
import java.io.IOException;
-import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
/**
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Tue Nov 22 03:18:47 2011
@@ -36,7 +36,6 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
-import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.util.Daemon;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java Tue Nov 22 03:18:47 2011
@@ -23,8 +23,6 @@ import java.net.InetSocketAddress;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Tue Nov 22 03:18:47 2011
@@ -26,10 +26,7 @@ import java.io.EOFException;
import java.io.DataInputStream;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-
import com.google.common.annotations.VisibleForTesting;
/**
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Tue Nov 22 03:18:47 2011
@@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Tue Nov 22 03:18:47 2011
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.na
import static org.apache.hadoop.hdfs.server.common.Util.now;
-import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -34,7 +33,6 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream.LogHeaderCorruptException;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ClearNSQuotaOp;
@@ -57,8 +55,6 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.util.Holder;
-import org.apache.hadoop.io.IOUtils;
-
import com.google.common.base.Joiner;
public class FSEditLogLoader {
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java?rev=1204794&r1=1204793&r2=1204794&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java Tue Nov 22 03:18:47 2011
@@ -19,8 +19,6 @@ package org.apache.hadoop.hdfs.server.na
import java.io.File;
import java.io.IOException;
-import java.util.List;
-
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;