You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2014/07/25 22:33:22 UTC
svn commit: r1613514 [2/6] - in
/hadoop/common/branches/YARN-1051/hadoop-hdfs-project:
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/
hadoop-hdfs-nfs/src/test/java/org/apac...
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Fri Jul 25 20:33:09 2014
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -71,11 +72,20 @@ public interface DataTransferProtocol {
/**
* Write a block to a datanode pipeline.
- *
+ * The receiver datanode of this call is the next datanode in the pipeline.
+ * The other downstream datanodes are specified by the targets parameter.
+ * Note that the receiver {@link DatanodeInfo} is not required in the
+ * parameter list since the receiver datanode knows its info. However, the
+ * {@link StorageType} for storing the replica in the receiver datanode is a
+ * parameter since the receiver datanode may support multiple storage types.
+ *
* @param blk the block being written.
+ * @param storageType for storing the replica in the receiver datanode.
* @param blockToken security token for accessing the block.
* @param clientName client's name.
- * @param targets target datanodes in the pipeline.
+ * @param targets other downstream datanodes in the pipeline.
+ * @param targetStorageTypes target {@link StorageType}s corresponding
+ * to the target datanodes.
* @param source source datanode.
* @param stage pipeline stage.
* @param pipelineSize the size of the pipeline.
@@ -84,9 +94,11 @@ public interface DataTransferProtocol {
* @param latestGenerationStamp the latest generation stamp of the block.
*/
public void writeBlock(final ExtendedBlock blk,
+ final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes,
final DatanodeInfo source,
final BlockConstructionStage stage,
final int pipelineSize,
@@ -110,7 +122,8 @@ public interface DataTransferProtocol {
public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
- final DatanodeInfo[] targets) throws IOException;
+ final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes) throws IOException;
/**
* Request short circuit access file descriptors from a DataNode.
@@ -148,11 +161,13 @@ public interface DataTransferProtocol {
* It is used for balancing purpose.
*
* @param blk the block being replaced.
+ * @param storageType the {@link StorageType} for storing the block.
* @param blockToken security token for accessing the block.
* @param delHint the hint for deleting the block in the original datanode.
* @param source the source datanode for receiving the block.
*/
public void replaceBlock(final ExtendedBlock blk,
+ final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
final DatanodeInfo source) throws IOException;
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java Fri Jul 25 20:33:09 2014
@@ -27,7 +27,7 @@ import java.nio.channels.ReadableByteCha
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.util.DirectBufferPool;
+import org.apache.hadoop.util.DirectBufferPool;
import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Preconditions;
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Fri Jul 25 20:33:09 2014
@@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
@@ -121,10 +122,13 @@ public abstract class Receiver implement
/** Receive OP_WRITE_BLOCK */
private void opWriteBlock(DataInputStream in) throws IOException {
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
+ final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
+ PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
- PBHelper.convert(proto.getTargetsList()),
+ targets,
+ PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
PBHelper.convert(proto.getSource()),
fromProto(proto.getStage()),
proto.getPipelineSize(),
@@ -140,10 +144,12 @@ public abstract class Receiver implement
private void opTransferBlock(DataInputStream in) throws IOException {
final OpTransferBlockProto proto =
OpTransferBlockProto.parseFrom(vintPrefixed(in));
+ final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
- PBHelper.convert(proto.getTargetsList()));
+ targets,
+ PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length));
}
/** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
@@ -176,6 +182,7 @@ public abstract class Receiver implement
private void opReplaceBlock(DataInputStream in) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
+ PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getToken()),
proto.getDelHint(),
PBHelper.convert(proto.getSource()));
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Fri Jul 25 20:33:09 2014
@@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
@@ -111,9 +112,11 @@ public class Sender implements DataTrans
@Override
public void writeBlock(final ExtendedBlock blk,
+ final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes,
final DatanodeInfo source,
final BlockConstructionStage stage,
final int pipelineSize,
@@ -130,7 +133,9 @@ public class Sender implements DataTrans
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
.setHeader(header)
+ .setStorageType(PBHelper.convertStorageType(storageType))
.addAllTargets(PBHelper.convert(targets, 1))
+ .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1))
.setStage(toProto(stage))
.setPipelineSize(pipelineSize)
.setMinBytesRcvd(minBytesRcvd)
@@ -150,12 +155,14 @@ public class Sender implements DataTrans
public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
- final DatanodeInfo[] targets) throws IOException {
+ final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes) throws IOException {
OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken))
.addAllTargets(PBHelper.convert(targets))
+ .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes))
.build();
send(out, Op.TRANSFER_BLOCK, proto);
@@ -196,11 +203,13 @@ public class Sender implements DataTrans
@Override
public void replaceBlock(final ExtendedBlock blk,
+ final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
final DatanodeInfo source) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+ .setStorageType(PBHelper.convertStorageType(storageType))
.setDelHint(delHint)
.setSource(PBHelper.convertDatanodeInfo(source))
.build();
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java Fri Jul 25 20:33:09 2014
@@ -97,7 +97,7 @@ public class DatanodeProtocolClientSideT
RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
ProtobufRpcEngine.class);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- rpcProxy = createNamenodeWithRetry(createNamenode(nameNodeAddr, conf, ugi));
+ rpcProxy = createNamenode(nameNodeAddr, conf, ugi);
}
private static DatanodeProtocolPB createNamenode(
@@ -109,33 +109,6 @@ public class DatanodeProtocolClientSideT
org.apache.hadoop.ipc.Client.getPingInterval(conf), null).getProxy();
}
- /** Create a {@link NameNode} proxy */
- static DatanodeProtocolPB createNamenodeWithRetry(
- DatanodeProtocolPB rpcNamenode) {
- RetryPolicy createPolicy = RetryPolicies
- .retryUpToMaximumCountWithFixedSleep(5,
- HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
-
- Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap =
- new HashMap<Class<? extends Exception>, RetryPolicy>();
- remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
- createPolicy);
-
- Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
- new HashMap<Class<? extends Exception>, RetryPolicy>();
- exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
- .retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
- remoteExceptionToPolicyMap));
- RetryPolicy methodPolicy = RetryPolicies.retryByException(
- RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
- Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
-
- methodNameToPolicyMap.put("create", methodPolicy);
-
- return (DatanodeProtocolPB) RetryProxy.create(DatanodeProtocolPB.class,
- rpcNamenode, methodNameToPolicyMap);
- }
-
@Override
public void close() throws IOException {
RPC.stopProxy(rpcProxy);
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java Fri Jul 25 20:33:09 2014
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
@@ -61,7 +62,7 @@ import com.google.protobuf.ServiceExcept
@InterfaceAudience.Private
@InterfaceStability.Stable
public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
- ProtocolMetaInterface, Closeable {
+ ProtocolMetaInterface, Closeable, ProtocolTranslator {
/** RpcController is not used and hence is set to null */
private final static RpcController NULL_CONTROLLER = null;
@@ -89,6 +90,11 @@ public class NamenodeProtocolTranslatorP
}
@Override
+ public Object getUnderlyingProxyObject() {
+ return rpcProxy;
+ }
+
+ @Override
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
throws IOException {
GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Jul 25 20:33:09 2014
@@ -150,6 +150,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
@@ -674,14 +675,8 @@ public class PBHelper {
targets[i] = PBHelper.convert(locs.get(i));
}
- final int storageTypesCount = proto.getStorageTypesCount();
- final StorageType[] storageTypes;
- if (storageTypesCount == 0) {
- storageTypes = null;
- } else {
- Preconditions.checkState(storageTypesCount == locs.size());
- storageTypes = convertStorageTypeProtos(proto.getStorageTypesList());
- }
+ final StorageType[] storageTypes = convertStorageTypes(
+ proto.getStorageTypesList(), locs.size());
final int storageIDsCount = proto.getStorageIDsCount();
final String[] storageIDs;
@@ -969,6 +964,20 @@ public class PBHelper {
targets[i] = PBHelper.convert(targetList.get(i));
}
+ StorageType[][] targetStorageTypes = new StorageType[targetList.size()][];
+ List<StorageTypesProto> targetStorageTypesList = blkCmd.getTargetStorageTypesList();
+ if (targetStorageTypesList.isEmpty()) { // missing storage types
+ for(int i = 0; i < targetStorageTypes.length; i++) {
+ targetStorageTypes[i] = new StorageType[targets[i].length];
+ Arrays.fill(targetStorageTypes[i], StorageType.DEFAULT);
+ }
+ } else {
+ for(int i = 0; i < targetStorageTypes.length; i++) {
+ List<StorageTypeProto> p = targetStorageTypesList.get(i).getStorageTypesList();
+ targetStorageTypes[i] = p.toArray(new StorageType[p.size()]);
+ }
+ }
+
List<StorageUuidsProto> targetStorageUuidsList = blkCmd.getTargetStorageUuidsList();
String[][] targetStorageIDs = new String[targetStorageUuidsList.size()][];
for(int i = 0; i < targetStorageIDs.length; i++) {
@@ -991,7 +1000,7 @@ public class PBHelper {
throw new AssertionError("Unknown action type: " + blkCmd.getAction());
}
return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets,
- targetStorageIDs);
+ targetStorageTypes, targetStorageIDs);
}
public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) {
@@ -1605,8 +1614,25 @@ public class PBHelper {
}
}
- private static StorageTypeProto convertStorageType(
- StorageType type) {
+ public static List<StorageTypeProto> convertStorageTypes(
+ StorageType[] types) {
+ return convertStorageTypes(types, 0);
+ }
+
+ public static List<StorageTypeProto> convertStorageTypes(
+ StorageType[] types, int startIdx) {
+ if (types == null) {
+ return null;
+ }
+ final List<StorageTypeProto> protos = new ArrayList<StorageTypeProto>(
+ types.length);
+ for (int i = startIdx; i < types.length; ++i) {
+ protos.add(convertStorageType(types[i]));
+ }
+ return protos;
+ }
+
+ public static StorageTypeProto convertStorageType(StorageType type) {
switch(type) {
case DISK:
return StorageTypeProto.DISK;
@@ -1621,7 +1647,7 @@ public class PBHelper {
public static DatanodeStorage convert(DatanodeStorageProto s) {
return new DatanodeStorage(s.getStorageUuid(),
PBHelper.convertState(s.getState()),
- PBHelper.convertType(s.getStorageType()));
+ PBHelper.convertStorageType(s.getStorageType()));
}
private static State convertState(StorageState state) {
@@ -1634,7 +1660,7 @@ public class PBHelper {
}
}
- private static StorageType convertType(StorageTypeProto type) {
+ public static StorageType convertStorageType(StorageTypeProto type) {
switch(type) {
case DISK:
return StorageType.DISK;
@@ -1646,11 +1672,16 @@ public class PBHelper {
}
}
- private static StorageType[] convertStorageTypeProtos(
- List<StorageTypeProto> storageTypesList) {
- final StorageType[] storageTypes = new StorageType[storageTypesList.size()];
- for (int i = 0; i < storageTypes.length; ++i) {
- storageTypes[i] = PBHelper.convertType(storageTypesList.get(i));
+ public static StorageType[] convertStorageTypes(
+ List<StorageTypeProto> storageTypesList, int expectedSize) {
+ final StorageType[] storageTypes = new StorageType[expectedSize];
+ if (storageTypesList.size() != expectedSize) { // missing storage types
+ Preconditions.checkState(storageTypesList.isEmpty());
+ Arrays.fill(storageTypes, StorageType.DEFAULT);
+ } else {
+ for (int i = 0; i < storageTypes.length; ++i) {
+ storageTypes[i] = convertStorageType(storageTypesList.get(i));
+ }
}
return storageTypes;
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Jul 25 20:33:09 2014
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.balancer;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.BufferedInputStream;
@@ -57,14 +59,17 @@ import org.apache.hadoop.conf.Configured
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -202,6 +207,7 @@ public class Balancer {
private final NameNodeConnector nnc;
private final BalancingPolicy policy;
+ private final SaslDataTransferClient saslClient;
private final double threshold;
// all data node lists
@@ -352,19 +358,18 @@ public class Balancer {
OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
- if (nnc.getDataEncryptionKey() != null) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufOut, unbufIn, nnc.getDataEncryptionKey());
- unbufOut = encryptedStreams.out;
- unbufIn = encryptedStreams.in;
- }
+ ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
+ Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
+ IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+ unbufIn, nnc, accessToken, target.datanode);
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.IO_FILE_BUFFER_SIZE));
in = new DataInputStream(new BufferedInputStream(unbufIn,
HdfsConstants.IO_FILE_BUFFER_SIZE));
- sendRequest(out);
+ sendRequest(out, eb, StorageType.DEFAULT, accessToken);
receiveResponse(in);
bytesMoved.addAndGet(block.getNumBytes());
LOG.info("Successfully moved " + this);
@@ -395,10 +400,10 @@ public class Balancer {
}
/* Send a block replace request to the output stream*/
- private void sendRequest(DataOutputStream out) throws IOException {
- final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
- final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
- new Sender(out).replaceBlock(eb, accessToken,
+ private void sendRequest(DataOutputStream out, ExtendedBlock eb,
+ StorageType storageType,
+ Token<BlockTokenIdentifier> accessToken) throws IOException {
+ new Sender(out).replaceBlock(eb, storageType, accessToken,
source.getStorageID(), proxySource.getDatanode());
}
@@ -876,6 +881,12 @@ public class Balancer {
this.maxConcurrentMovesPerNode =
conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+ this.saslClient = new SaslDataTransferClient(
+ DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+ TrustedChannelResolver.getInstance(conf),
+ conf.getBoolean(
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
}
/* Given a data node set, build a network topology and decide
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Fri Jul 25 20:33:09 2014
@@ -34,8 +34,8 @@ import org.apache.hadoop.hdfs.NameNodePr
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -50,7 +50,7 @@ import org.apache.hadoop.util.Daemon;
* The class provides utilities for {@link Balancer} to access a NameNode
*/
@InterfaceAudience.Private
-class NameNodeConnector {
+class NameNodeConnector implements DataEncryptionKeyFactory {
private static final Log LOG = Balancer.LOG;
private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
@@ -72,7 +72,6 @@ class NameNodeConnector {
private BlockTokenSecretManager blockTokenSecretManager;
private Daemon keyupdaterthread; // AccessKeyUpdater thread
private DataEncryptionKey encryptionKey;
- private final TrustedChannelResolver trustedChannelResolver;
NameNodeConnector(URI nameNodeUri,
Configuration conf) throws IOException {
@@ -122,7 +121,6 @@ class NameNodeConnector {
if (out == null) {
throw new IOException("Another balancer is running");
}
- this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
}
boolean shouldContinue(long dispatchBlockMoveBytes) {
@@ -154,10 +152,10 @@ class NameNodeConnector {
BlockTokenSecretManager.AccessMode.COPY));
}
}
-
- DataEncryptionKey getDataEncryptionKey()
- throws IOException {
- if (encryptDataTransfer && !this.trustedChannelResolver.isTrusted()) {
+
+ @Override
+ public DataEncryptionKey newDataEncryptionKey() {
+ if (encryptDataTransfer) {
synchronized (this) {
if (encryptionKey == null) {
encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Jul 25 20:33:09 2014
@@ -725,7 +725,6 @@ public class BlockManager {
final List<DatanodeStorageInfo> locations
= new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
- final String storageID = storage.getStorageID();
// filter invalidate replicas
if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) {
locations.add(storage);
@@ -2637,7 +2636,7 @@ public class BlockManager {
if (addedNode == delNodeHint) {
delNodeHint = null;
}
- Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
+ Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(block);
for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
@@ -2657,7 +2656,7 @@ public class BlockManager {
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
// exclude corrupt replicas
if (corruptNodes == null || !corruptNodes.contains(cur)) {
- nonExcess.add(cur);
+ nonExcess.add(storage);
}
}
}
@@ -2681,7 +2680,7 @@ public class BlockManager {
* If no such a node is available,
* then pick a node with least free space
*/
- private void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess,
+ private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess,
Block b, short replication,
DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint,
@@ -2689,28 +2688,33 @@ public class BlockManager {
assert namesystem.hasWriteLock();
// first form a rack to datanodes map and
BlockCollection bc = getBlockCollection(b);
- final Map<String, List<DatanodeDescriptor>> rackMap
- = new HashMap<String, List<DatanodeDescriptor>>();
- final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
- final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
+
+ final Map<String, List<DatanodeStorageInfo>> rackMap
+ = new HashMap<String, List<DatanodeStorageInfo>>();
+ final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
+ final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
// split nodes into two sets
// moreThanOne contains nodes on rack with more than one replica
// exactlyOne contains the remaining nodes
- replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne,
- exactlyOne);
+ replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
// pick one node to delete that favors the delete hint
// otherwise pick one with least space from priSet if it is not empty
// otherwise one node with least space from remains
boolean firstOne = true;
+ final DatanodeStorageInfo delNodeHintStorage
+ = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
+ final DatanodeStorageInfo addedNodeStorage
+ = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);
while (nonExcess.size() - replication > 0) {
// check if we can delete delNodeHint
- final DatanodeInfo cur;
- if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
- && (moreThanOne.contains(delNodeHint)
- || (addedNode != null && !moreThanOne.contains(addedNode))) ) {
- cur = delNodeHint;
+ final DatanodeStorageInfo cur;
+ if (firstOne && delNodeHintStorage != null
+ && (moreThanOne.contains(delNodeHintStorage)
+ || (addedNodeStorage != null
+ && !moreThanOne.contains(addedNodeStorage)))) {
+ cur = delNodeHintStorage;
} else { // regular excessive replica removal
cur = replicator.chooseReplicaToDelete(bc, b, replication,
moreThanOne, exactlyOne);
@@ -2722,7 +2726,7 @@ public class BlockManager {
exactlyOne, cur);
nonExcess.remove(cur);
- addToExcessReplicate(cur, b);
+ addToExcessReplicate(cur.getDatanodeDescriptor(), b);
//
// The 'excessblocks' tracks blocks until we get confirmation
@@ -2733,7 +2737,7 @@ public class BlockManager {
// should be deleted. Items are removed from the invalidate list
// upon giving instructions to the namenode.
//
- addToInvalidates(b, cur);
+ addToInvalidates(b, cur.getDatanodeDescriptor());
blockLog.info("BLOCK* chooseExcessReplicates: "
+"("+cur+", "+b+") is added to invalidated blocks set");
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Fri Jul 25 20:33:09 2014
@@ -124,11 +124,12 @@ public abstract class BlockPlacementPoli
listed in the previous parameter.
* @return the replica that is the best candidate for deletion
*/
- abstract public DatanodeDescriptor chooseReplicaToDelete(BlockCollection srcBC,
- Block block,
- short replicationFactor,
- Collection<DatanodeDescriptor> existingReplicas,
- Collection<DatanodeDescriptor> moreExistingReplicas);
+ abstract public DatanodeStorageInfo chooseReplicaToDelete(
+ BlockCollection srcBC,
+ Block block,
+ short replicationFactor,
+ Collection<DatanodeStorageInfo> existingReplicas,
+ Collection<DatanodeStorageInfo> moreExistingReplicas);
/**
* Used to setup a BlockPlacementPolicy object. This should be defined by
@@ -175,21 +176,23 @@ public abstract class BlockPlacementPoli
* @param exactlyOne The List of replica nodes on rack with only one replica
* @param cur current replica to remove
*/
- public void adjustSetsWithChosenReplica(final Map<String,
- List<DatanodeDescriptor>> rackMap,
- final List<DatanodeDescriptor> moreThanOne,
- final List<DatanodeDescriptor> exactlyOne, final DatanodeInfo cur) {
+ public void adjustSetsWithChosenReplica(
+ final Map<String, List<DatanodeStorageInfo>> rackMap,
+ final List<DatanodeStorageInfo> moreThanOne,
+ final List<DatanodeStorageInfo> exactlyOne,
+ final DatanodeStorageInfo cur) {
- String rack = getRack(cur);
- final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
- datanodes.remove(cur);
- if (datanodes.isEmpty()) {
+ final String rack = getRack(cur.getDatanodeDescriptor());
+ final List<DatanodeStorageInfo> storages = rackMap.get(rack);
+ storages.remove(cur);
+ if (storages.isEmpty()) {
rackMap.remove(rack);
}
if (moreThanOne.remove(cur)) {
- if (datanodes.size() == 1) {
- moreThanOne.remove(datanodes.get(0));
- exactlyOne.add(datanodes.get(0));
+ if (storages.size() == 1) {
+ final DatanodeStorageInfo remaining = storages.get(0);
+ moreThanOne.remove(remaining);
+ exactlyOne.add(remaining);
}
} else {
exactlyOne.remove(cur);
@@ -214,28 +217,28 @@ public abstract class BlockPlacementPoli
* @param exactlyOne remains contains the remaining nodes
*/
public void splitNodesWithRack(
- Collection<DatanodeDescriptor> dataNodes,
- final Map<String, List<DatanodeDescriptor>> rackMap,
- final List<DatanodeDescriptor> moreThanOne,
- final List<DatanodeDescriptor> exactlyOne) {
- for(DatanodeDescriptor node : dataNodes) {
- final String rackName = getRack(node);
- List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
- if (datanodeList == null) {
- datanodeList = new ArrayList<DatanodeDescriptor>();
- rackMap.put(rackName, datanodeList);
+ final Iterable<DatanodeStorageInfo> storages,
+ final Map<String, List<DatanodeStorageInfo>> rackMap,
+ final List<DatanodeStorageInfo> moreThanOne,
+ final List<DatanodeStorageInfo> exactlyOne) {
+ for(DatanodeStorageInfo s: storages) {
+ final String rackName = getRack(s.getDatanodeDescriptor());
+ List<DatanodeStorageInfo> storageList = rackMap.get(rackName);
+ if (storageList == null) {
+ storageList = new ArrayList<DatanodeStorageInfo>();
+ rackMap.put(rackName, storageList);
}
- datanodeList.add(node);
+ storageList.add(s);
}
// split nodes into two sets
- for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
- if (datanodeList.size() == 1) {
+ for(List<DatanodeStorageInfo> storageList : rackMap.values()) {
+ if (storageList.size() == 1) {
// exactlyOne contains nodes on rack with only one replica
- exactlyOne.add(datanodeList.get(0));
+ exactlyOne.add(storageList.get(0));
} else {
// moreThanOne contains nodes on rack with more than one replica
- moreThanOne.addAll(datanodeList);
+ moreThanOne.addAll(storageList);
}
}
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Fri Jul 25 20:33:09 2014
@@ -145,14 +145,14 @@ public class BlockPlacementPolicyDefault
List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
boolean avoidStaleNodes = stats != null
&& stats.isAvoidingStaleDataNodesForWrite();
- for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) {
+ for (int i = 0; i < favoredNodes.size() && results.size() < numOfReplicas; i++) {
DatanodeDescriptor favoredNode = favoredNodes.get(i);
// Choose a single node which is local to favoredNode.
// 'results' is updated within chooseLocalNode
final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
favoriteAndExcludedNodes, blocksize,
getMaxNodesPerRack(results.size(), numOfReplicas)[1],
- results, avoidStaleNodes, storageType);
+ results, avoidStaleNodes, storageType, false);
if (target == null) {
LOG.warn("Could not find a target for file " + src
+ " with favored node " + favoredNode);
@@ -271,7 +271,7 @@ public class BlockPlacementPolicyDefault
try {
if (numOfResults == 0) {
writer = chooseLocalStorage(writer, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes, storageType)
+ maxNodesPerRack, results, avoidStaleNodes, storageType, true)
.getDatanodeDescriptor();
if (--numOfReplicas == 0) {
return writer;
@@ -345,12 +345,14 @@ public class BlockPlacementPolicyDefault
int maxNodesPerRack,
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
- StorageType storageType)
+ StorageType storageType,
+ boolean fallbackToLocalRack)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
- if (localMachine == null)
+ if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType);
+ }
if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
// otherwise try local machine first
@@ -363,7 +365,11 @@ public class BlockPlacementPolicyDefault
}
}
}
- }
+ }
+
+ if (!fallbackToLocalRack) {
+ return null;
+ }
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType);
@@ -636,15 +642,11 @@ public class BlockPlacementPolicyDefault
// check the communication traffic of the target machine
if (considerLoad) {
- double avgLoad = 0;
- if (stats != null) {
- int size = stats.getNumDatanodesInService();
- if (size != 0) {
- avgLoad = (double)stats.getTotalLoad()/size;
- }
- }
- if (node.getXceiverCount() > (2.0 * avgLoad)) {
- logNodeIsNotChosen(storage, "the node is too busy ");
+ final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
+ final int nodeLoad = node.getXceiverCount();
+ if (nodeLoad > maxLoad) {
+ logNodeIsNotChosen(storage,
+ "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") ");
return false;
}
}
@@ -727,31 +729,34 @@ public class BlockPlacementPolicyDefault
}
@Override
- public DatanodeDescriptor chooseReplicaToDelete(BlockCollection bc,
+ public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc,
Block block, short replicationFactor,
- Collection<DatanodeDescriptor> first,
- Collection<DatanodeDescriptor> second) {
+ Collection<DatanodeStorageInfo> first,
+ Collection<DatanodeStorageInfo> second) {
long oldestHeartbeat =
now() - heartbeatInterval * tolerateHeartbeatMultiplier;
- DatanodeDescriptor oldestHeartbeatNode = null;
+ DatanodeStorageInfo oldestHeartbeatStorage = null;
long minSpace = Long.MAX_VALUE;
- DatanodeDescriptor minSpaceNode = null;
+ DatanodeStorageInfo minSpaceStorage = null;
// Pick the node with the oldest heartbeat or with the least free space,
// if all hearbeats are within the tolerable heartbeat interval
- for(DatanodeDescriptor node : pickupReplicaSet(first, second)) {
+ for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) {
+ final DatanodeDescriptor node = storage.getDatanodeDescriptor();
long free = node.getRemaining();
long lastHeartbeat = node.getLastUpdate();
if(lastHeartbeat < oldestHeartbeat) {
oldestHeartbeat = lastHeartbeat;
- oldestHeartbeatNode = node;
+ oldestHeartbeatStorage = storage;
}
if (minSpace > free) {
minSpace = free;
- minSpaceNode = node;
+ minSpaceStorage = storage;
}
}
- return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode;
+
+ return oldestHeartbeatStorage != null? oldestHeartbeatStorage
+ : minSpaceStorage;
}
/**
@@ -760,9 +765,9 @@ public class BlockPlacementPolicyDefault
* replica while second set contains remaining replica nodes.
* So pick up first set if not empty. If first is empty, then pick second.
*/
- protected Collection<DatanodeDescriptor> pickupReplicaSet(
- Collection<DatanodeDescriptor> first,
- Collection<DatanodeDescriptor> second) {
+ protected Collection<DatanodeStorageInfo> pickupReplicaSet(
+ Collection<DatanodeStorageInfo> first,
+ Collection<DatanodeStorageInfo> second) {
return first.isEmpty() ? second : first;
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Fri Jul 25 20:33:09 2014
@@ -70,7 +70,8 @@ public class BlockPlacementPolicyWithNod
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
- StorageType storageType) throws NotEnoughReplicasException {
+ StorageType storageType, boolean fallbackToLocalRack
+ ) throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes,
@@ -97,6 +98,10 @@ public class BlockPlacementPolicyWithNod
if (chosenStorage != null) {
return chosenStorage;
}
+
+ if (!fallbackToLocalRack) {
+ return null;
+ }
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
@@ -286,9 +291,9 @@ public class BlockPlacementPolicyWithNod
* If first is empty, then pick second.
*/
@Override
- public Collection<DatanodeDescriptor> pickupReplicaSet(
- Collection<DatanodeDescriptor> first,
- Collection<DatanodeDescriptor> second) {
+ public Collection<DatanodeStorageInfo> pickupReplicaSet(
+ Collection<DatanodeStorageInfo> first,
+ Collection<DatanodeStorageInfo> second) {
// If no replica within same rack, return directly.
if (first.isEmpty()) {
return second;
@@ -296,25 +301,24 @@ public class BlockPlacementPolicyWithNod
// Split data nodes in the first set into two sets,
// moreThanOne contains nodes on nodegroup with more than one replica
// exactlyOne contains the remaining nodes
- Map<String, List<DatanodeDescriptor>> nodeGroupMap =
- new HashMap<String, List<DatanodeDescriptor>>();
+ Map<String, List<DatanodeStorageInfo>> nodeGroupMap =
+ new HashMap<String, List<DatanodeStorageInfo>>();
- for(DatanodeDescriptor node : first) {
- final String nodeGroupName =
- NetworkTopology.getLastHalf(node.getNetworkLocation());
- List<DatanodeDescriptor> datanodeList =
- nodeGroupMap.get(nodeGroupName);
- if (datanodeList == null) {
- datanodeList = new ArrayList<DatanodeDescriptor>();
- nodeGroupMap.put(nodeGroupName, datanodeList);
+ for(DatanodeStorageInfo storage : first) {
+ final String nodeGroupName = NetworkTopology.getLastHalf(
+ storage.getDatanodeDescriptor().getNetworkLocation());
+ List<DatanodeStorageInfo> storageList = nodeGroupMap.get(nodeGroupName);
+ if (storageList == null) {
+ storageList = new ArrayList<DatanodeStorageInfo>();
+ nodeGroupMap.put(nodeGroupName, storageList);
}
- datanodeList.add(node);
+ storageList.add(storage);
}
- final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
- final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
+ final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
+ final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
// split nodes into two sets
- for(List<DatanodeDescriptor> datanodeList : nodeGroupMap.values()) {
+ for(List<DatanodeStorageInfo> datanodeList : nodeGroupMap.values()) {
if (datanodeList.size() == 1 ) {
// exactlyOne contains nodes on nodegroup with exactly one replica
exactlyOne.add(datanodeList.get(0));
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Jul 25 20:33:09 2014
@@ -345,7 +345,8 @@ public class DatanodeManager {
/** Sort the located blocks by the distance to the target host. */
public void sortLocatedBlocks(final String targethost,
- final List<LocatedBlock> locatedblocks) {
+ final List<LocatedBlock> locatedblocks,
+ boolean randomizeBlockLocationsPerBlock) {
//sort the blocks
// As it is possible for the separation of node manager and datanode,
// here we should get node but not datanode only .
@@ -372,8 +373,8 @@ public class DatanodeManager {
--lastActiveIndex;
}
int activeLen = lastActiveIndex + 1;
- networktopology.sortByDistance(client, b.getLocations(), activeLen,
- b.getBlock().getBlockId());
+ networktopology.sortByDistance(client, b.getLocations(), activeLen, b
+ .getBlock().getBlockId(), randomizeBlockLocationsPerBlock);
}
}
@@ -820,7 +821,9 @@ public class DatanodeManager {
}
/** Start decommissioning the specified datanode. */
- private void startDecommission(DatanodeDescriptor node) {
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ public void startDecommission(DatanodeDescriptor node) {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
LOG.info("Start Decommissioning " + node + " " + storage
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java Fri Jul 25 20:33:09 2014
@@ -52,6 +52,12 @@ public interface DatanodeStatistics {
/** @return the xceiver count */
public int getXceiverCount();
+ /** @return average xceiver count for non-decommission(ing|ed) nodes */
+ public int getInServiceXceiverCount();
+
+ /** @return number of non-decommission(ing|ed) nodes */
+ public int getNumDatanodesInService();
+
/**
* @return the total used space by data nodes for non-DFS purposes
* such as storing temporary files on the local file system
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java Fri Jul 25 20:33:09 2014
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -290,4 +291,21 @@ public class DatanodeStorageInfo {
public String toString() {
return "[" + storageType + "]" + storageID + ":" + state;
}
+
+ /** @return the first {@link DatanodeStorageInfo} corresponding to
+ * the given datanode
+ */
+ static DatanodeStorageInfo getDatanodeStorageInfo(
+ final Iterable<DatanodeStorageInfo> infos,
+ final DatanodeDescriptor datanode) {
+ if (datanode == null) {
+ return null;
+ }
+ for(DatanodeStorageInfo storage : infos) {
+ if (storage.getDatanodeDescriptor() == datanode) {
+ return storage;
+ }
+ }
+ return null;
+ }
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Fri Jul 25 20:33:09 2014
@@ -151,6 +151,16 @@ class HeartbeatManager implements Datano
}
@Override
+ public synchronized int getInServiceXceiverCount() {
+ return stats.nodesInServiceXceiverCount;
+ }
+
+ @Override
+ public synchronized int getNumDatanodesInService() {
+ return stats.nodesInService;
+ }
+
+ @Override
public synchronized long getCacheCapacity() {
return stats.cacheCapacity;
}
@@ -178,7 +188,7 @@ class HeartbeatManager implements Datano
}
synchronized void register(final DatanodeDescriptor d) {
- if (!datanodes.contains(d)) {
+ if (!d.isAlive) {
addDatanode(d);
//update its timestamp
@@ -191,6 +201,8 @@ class HeartbeatManager implements Datano
}
synchronized void addDatanode(final DatanodeDescriptor d) {
+ // update in-service node count
+ stats.add(d);
datanodes.add(d);
d.isAlive = true;
}
@@ -323,6 +335,9 @@ class HeartbeatManager implements Datano
private long cacheCapacity = 0L;
private long cacheUsed = 0L;
+ private int nodesInService = 0;
+ private int nodesInServiceXceiverCount = 0;
+
private int expiredHeartbeats = 0;
private void add(final DatanodeDescriptor node) {
@@ -330,6 +345,8 @@ class HeartbeatManager implements Datano
blockPoolUsed += node.getBlockPoolUsed();
xceiverCount += node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ nodesInService++;
+ nodesInServiceXceiverCount += node.getXceiverCount();
capacityTotal += node.getCapacity();
capacityRemaining += node.getRemaining();
} else {
@@ -344,6 +361,8 @@ class HeartbeatManager implements Datano
blockPoolUsed -= node.getBlockPoolUsed();
xceiverCount -= node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ nodesInService--;
+ nodesInServiceXceiverCount -= node.getXceiverCount();
capacityTotal -= node.getCapacity();
capacityRemaining -= node.getRemaining();
} else {
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java Fri Jul 25 20:33:09 2014
@@ -93,7 +93,8 @@ public final class HdfsServerConstants {
FORCE("-force"),
NONINTERACTIVE("-nonInteractive"),
RENAMERESERVED("-renameReserved"),
- METADATAVERSION("-metadataVersion");
+ METADATAVERSION("-metadataVersion"),
+ UPGRADEONLY("-upgradeOnly");
private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile(
"(\\w+)\\((\\w+)\\)");
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Fri Jul 25 20:33:09 2014
@@ -575,7 +575,8 @@ class BPOfferService {
switch(cmd.getAction()) {
case DatanodeProtocol.DNA_TRANSFER:
// Send a copy of a block to another datanode
- dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
+ dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(),
+ bcmd.getTargets(), bcmd.getTargetStorageTypes());
dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
break;
case DatanodeProtocol.DNA_INVALIDATE:
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Fri Jul 25 20:33:09 2014
@@ -84,6 +84,10 @@ class BlockPoolSliceScanner {
private final SortedSet<BlockScanInfo> blockInfoSet
= new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
+
+ private final SortedSet<BlockScanInfo> newBlockInfoSet =
+ new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
+
private final GSet<Block, BlockScanInfo> blockMap
= new LightWeightGSet<Block, BlockScanInfo>(
LightWeightGSet.computeCapacity(0.5, "BlockMap"));
@@ -195,7 +199,7 @@ class BlockPoolSliceScanner {
BlockScanInfo info = new BlockScanInfo( block );
info.lastScanTime = scanTime--;
//still keep 'info.lastScanType' to NONE.
- addBlockInfo(info);
+ addBlockInfo(info, false);
}
RollingLogs rollingLogs = null;
@@ -221,25 +225,42 @@ class BlockPoolSliceScanner {
// Should we change throttler bandwidth every time bytesLeft changes?
// not really required.
}
-
- private synchronized void addBlockInfo(BlockScanInfo info) {
- boolean added = blockInfoSet.add(info);
+
+ /**
+ * Add the BlockScanInfo to sorted set of blockScanInfo
+ * @param info BlockScanInfo to be added
+ * @param isNewBlock true if the block is the new Block, false if
+ * BlockScanInfo is being updated with new scanTime
+ */
+ private synchronized void addBlockInfo(BlockScanInfo info,
+ boolean isNewBlock) {
+ boolean added = false;
+ if (isNewBlock) {
+ // check whether the block already present
+ boolean exists = blockInfoSet.contains(info);
+ added = !exists && newBlockInfoSet.add(info);
+ } else {
+ added = blockInfoSet.add(info);
+ }
blockMap.put(info);
if (added) {
updateBytesToScan(info.getNumBytes(), info.lastScanTime);
}
}
-
+
private synchronized void delBlockInfo(BlockScanInfo info) {
boolean exists = blockInfoSet.remove(info);
+ if (!exists){
+ exists = newBlockInfoSet.remove(info);
+ }
blockMap.remove(info);
if (exists) {
updateBytesToScan(-info.getNumBytes(), info.lastScanTime);
}
}
-
+
/** Update blockMap by the given LogEntry */
private synchronized void updateBlockInfo(LogEntry e) {
BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
@@ -249,7 +270,7 @@ class BlockPoolSliceScanner {
delBlockInfo(info);
info.lastScanTime = e.verificationTime;
info.lastScanType = ScanType.VERIFICATION_SCAN;
- addBlockInfo(info);
+ addBlockInfo(info, false);
}
}
@@ -275,14 +296,14 @@ class BlockPoolSliceScanner {
info = new BlockScanInfo(block.getLocalBlock());
info.lastScanTime = getNewBlockScanTime();
- addBlockInfo(info);
+ addBlockInfo(info, true);
adjustThrottler();
}
/** Deletes the block from internal structures */
synchronized void deleteBlock(Block block) {
BlockScanInfo info = blockMap.get(block);
- if ( info != null ) {
+ if (info != null) {
delBlockInfo(info);
}
}
@@ -310,23 +331,16 @@ class BlockPoolSliceScanner {
}
}
- private synchronized void updateScanStatus(Block block,
+ private synchronized void updateScanStatus(BlockScanInfo info,
ScanType type,
boolean scanOk) {
- BlockScanInfo info = blockMap.get(block);
-
- if ( info != null ) {
- delBlockInfo(info);
- } else {
- // It might already be removed. Thats ok, it will be caught next time.
- info = new BlockScanInfo(block);
- }
-
+ delBlockInfo(info);
+
long now = Time.monotonicNow();
info.lastScanType = type;
info.lastScanTime = now;
info.lastScanOk = scanOk;
- addBlockInfo(info);
+ addBlockInfo(info, false);
// Don't update meta data if the verification failed.
if (!scanOk) {
@@ -334,8 +348,8 @@ class BlockPoolSliceScanner {
}
if (verificationLog != null) {
- verificationLog.append(now, block.getGenerationStamp(),
- block.getBlockId());
+ verificationLog.append(now, info.getGenerationStamp(),
+ info.getBlockId());
}
}
@@ -434,11 +448,13 @@ class BlockPoolSliceScanner {
totalTransientErrors++;
}
- updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, true);
+ updateScanStatus((BlockScanInfo)block.getLocalBlock(),
+ ScanType.VERIFICATION_SCAN, true);
return;
} catch (IOException e) {
- updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, false);
+ updateScanStatus((BlockScanInfo)block.getLocalBlock(),
+ ScanType.VERIFICATION_SCAN, false);
// If the block does not exists anymore, then its not an error
if (!dataset.contains(block)) {
@@ -497,7 +513,7 @@ class BlockPoolSliceScanner {
// Picks one block and verifies it
private void verifyFirstBlock() {
- Block block = null;
+ BlockScanInfo block = null;
synchronized (this) {
if (!blockInfoSet.isEmpty()) {
block = blockInfoSet.first();
@@ -583,7 +599,7 @@ class BlockPoolSliceScanner {
delBlockInfo(info);
info.lastScanTime = lastScanTime;
lastScanTime += verifyInterval;
- addBlockInfo(info);
+ addBlockInfo(info, false);
}
}
}
@@ -679,12 +695,21 @@ class BlockPoolSliceScanner {
throw e;
} finally {
rollVerificationLogs();
+ rollNewBlocksInfo();
if (LOG.isDebugEnabled()) {
LOG.debug("Done scanning block pool: " + blockPoolId);
}
}
}
-
+
+ // add new blocks to scan in next iteration
+ private synchronized void rollNewBlocksInfo() {
+ for (BlockScanInfo newBlock : newBlockInfoSet) {
+ blockInfoSet.add(newBlock);
+ }
+ newBlockInfoSet.clear();
+ }
+
private synchronized void rollVerificationLogs() {
if (verificationLog != null) {
try {
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Jul 25 20:33:09 2014
@@ -37,6 +37,7 @@ import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -122,7 +123,8 @@ class BlockReceiver implements Closeable
private boolean syncOnClose;
private long restartBudget;
- BlockReceiver(final ExtendedBlock block, final DataInputStream in,
+ BlockReceiver(final ExtendedBlock block, final StorageType storageType,
+ final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
@@ -162,11 +164,11 @@ class BlockReceiver implements Closeable
// Open local disk out
//
if (isDatanode) { //replication or move
- replicaInfo = datanode.data.createTemporary(block);
+ replicaInfo = datanode.data.createTemporary(storageType, block);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
- replicaInfo = datanode.data.createRbw(block);
+ replicaInfo = datanode.data.createRbw(storageType, block);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
break;
@@ -198,7 +200,7 @@ class BlockReceiver implements Closeable
case TRANSFER_RBW:
case TRANSFER_FINALIZED:
// this is a transfer destination
- replicaInfo = datanode.data.createTemporary(block);
+ replicaInfo = datanode.data.createTemporary(storageType, block);
break;
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Fri Jul 25 20:33:09 2014
@@ -52,7 +52,9 @@ import static org.apache.hadoop.hdfs.DFS
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.security.SaslPropertiesResolver;
/**
* Simple class encapsulating all of the configuration that the DataNode
@@ -86,6 +88,7 @@ public class DNConf {
final String minimumNameNodeVersion;
final String encryptionAlgorithm;
+ final SaslPropertiesResolver saslPropsResolver;
final TrustedChannelResolver trustedChannelResolver;
final long xceiverStopTimeout;
@@ -168,6 +171,8 @@ public class DNConf {
DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
+ this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
+ conf);
this.xceiverStopTimeout = conf.getLong(
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
@@ -186,7 +191,26 @@ public class DNConf {
String getMinimumNameNodeVersion() {
return this.minimumNameNodeVersion;
}
-
+
+ /**
+ * Returns true if encryption enabled for DataTransferProtocol.
+ *
+ * @return boolean true if encryption enabled for DataTransferProtocol
+ */
+ public boolean getEncryptDataTransfer() {
+ return encryptDataTransfer;
+ }
+
+ /**
+ * Returns encryption algorithm configured for DataTransferProtocol, or null
+ * if not configured.
+ *
+ * @return encryption algorithm configured for DataTransferProtocol
+ */
+ public String getEncryptionAlgorithm() {
+ return encryptionAlgorithm;
+ }
+
public long getXceiverStopTimeout() {
return xceiverStopTimeout;
}
@@ -194,4 +218,24 @@ public class DNConf {
public long getMaxLockedMemory() {
return maxLockedMemory;
}
+
+ /**
+ * Returns the SaslPropertiesResolver configured for use with
+ * DataTransferProtocol, or null if not configured.
+ *
+ * @return SaslPropertiesResolver configured for use with DataTransferProtocol
+ */
+ public SaslPropertiesResolver getSaslPropsResolver() {
+ return saslPropsResolver;
+ }
+
+ /**
+ * Returns the TrustedChannelResolver configured for use with
+ * DataTransferProtocol, or null if not configured.
+ *
+ * @return TrustedChannelResolver configured for use with DataTransferProtocol
+ */
+ public TrustedChannelResolver getTrustedChannelResolver() {
+ return trustedChannelResolver;
+ }
}