You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2014/07/22 09:41:25 UTC
svn commit: r1612493 [1/2] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/
src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/or...
Author: szetszwo
Date: Tue Jul 22 07:41:24 2014
New Revision: 1612493
URL: http://svn.apache.org/r1612493
Log:
HDFS-6702. Change DFSClient to pass the StorageType from the namenode to datanodes and change datanode to write block replicas using the specified storage type.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Jul 22 07:41:24 2014
@@ -307,6 +307,10 @@ Release 2.6.0 - UNRELEASED
HDFS-6616. Add exclude-datanodes feature to WebHDFS redirection so that it
will not redirect retries to the same datanode. (zhaoyunjiong via szetszwo)
+ HDFS-6702. Change DFSClient to pass the StorageType from the namenode to
+ datanodes and change datanode to write block replicas using the specified
+ storage type. (szetszwo)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Jul 22 07:41:24 2014
@@ -313,6 +313,7 @@ public class DFSOutputStream extends FSO
private DataInputStream blockReplyStream;
private ResponseProcessor response = null;
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
+ private volatile StorageType[] storageTypes = null;
private volatile String[] storageIDs = null;
private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =
CacheBuilder.newBuilder()
@@ -417,10 +418,12 @@ public class DFSOutputStream extends FSO
}
private void setPipeline(LocatedBlock lb) {
- setPipeline(lb.getLocations(), lb.getStorageIDs());
+ setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
}
- private void setPipeline(DatanodeInfo[] nodes, String[] storageIDs) {
+ private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
+ String[] storageIDs) {
this.nodes = nodes;
+ this.storageTypes = storageTypes;
this.storageIDs = storageIDs;
}
@@ -446,7 +449,7 @@ public class DFSOutputStream extends FSO
this.setName("DataStreamer for file " + src);
closeResponder();
closeStream();
- setPipeline(null, null);
+ setPipeline(null, null, null);
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
@@ -1031,10 +1034,12 @@ public class DFSOutputStream extends FSO
//transfer replica
final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
final DatanodeInfo[] targets = {nodes[d]};
- transfer(src, targets, lb.getBlockToken());
+ final StorageType[] targetStorageTypes = {storageTypes[d]};
+ transfer(src, targets, targetStorageTypes, lb.getBlockToken());
}
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
//transfer replica to the new datanode
Socket sock = null;
@@ -1056,7 +1061,7 @@ public class DFSOutputStream extends FSO
//send the TRANSFER_BLOCK request
new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
- targets);
+ targets, targetStorageTypes);
out.flush();
//ack
@@ -1135,16 +1140,15 @@ public class DFSOutputStream extends FSO
failed.add(nodes[errorIndex]);
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
- System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
- System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
- newnodes.length-errorIndex);
+ arraycopy(nodes, newnodes, errorIndex);
+
+ final StorageType[] newStorageTypes = new StorageType[newnodes.length];
+ arraycopy(storageTypes, newStorageTypes, errorIndex);
final String[] newStorageIDs = new String[newnodes.length];
- System.arraycopy(storageIDs, 0, newStorageIDs, 0, errorIndex);
- System.arraycopy(storageIDs, errorIndex+1, newStorageIDs, errorIndex,
- newStorageIDs.length-errorIndex);
+ arraycopy(storageIDs, newStorageIDs, errorIndex);
- setPipeline(newnodes, newStorageIDs);
+ setPipeline(newnodes, newStorageTypes, newStorageIDs);
// Just took care of a node error while waiting for a node restart
if (restartingNodeIndex >= 0) {
@@ -1181,7 +1185,7 @@ public class DFSOutputStream extends FSO
// set up the pipeline again with the remaining nodes
if (failPacket) { // for testing
- success = createBlockOutputStream(nodes, newGS, isRecovery);
+ success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
failPacket = false;
try {
// Give DNs time to send in bad reports. In real situations,
@@ -1190,7 +1194,7 @@ public class DFSOutputStream extends FSO
Thread.sleep(2000);
} catch (InterruptedException ie) {}
} else {
- success = createBlockOutputStream(nodes, newGS, isRecovery);
+ success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
}
if (restartingNodeIndex >= 0) {
@@ -1242,6 +1246,7 @@ public class DFSOutputStream extends FSO
private LocatedBlock nextBlockOutputStream() throws IOException {
LocatedBlock lb = null;
DatanodeInfo[] nodes = null;
+ StorageType[] storageTypes = null;
int count = dfsClient.getConf().nBlockWriteRetry;
boolean success = false;
ExtendedBlock oldBlock = block;
@@ -1264,11 +1269,12 @@ public class DFSOutputStream extends FSO
bytesSent = 0;
accessToken = lb.getBlockToken();
nodes = lb.getLocations();
+ storageTypes = lb.getStorageTypes();
//
// Connect to first DataNode in the list.
//
- success = createBlockOutputStream(nodes, 0L, false);
+ success = createBlockOutputStream(nodes, storageTypes, 0L, false);
if (!success) {
DFSClient.LOG.info("Abandoning " + block);
@@ -1289,8 +1295,8 @@ public class DFSOutputStream extends FSO
// connects to the first datanode in the pipeline
// Returns true if success, otherwise return failure.
//
- private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
- boolean recoveryFlag) {
+ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
+ StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
if (nodes.length == 0) {
DFSClient.LOG.info("nodes are empty for write pipeline of block "
+ block);
@@ -1332,9 +1338,10 @@ public class DFSOutputStream extends FSO
// Xmit header info to datanode
//
+ BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage;
// send the request
- new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
- nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
+ new Sender(out).writeBlock(block, nodeStorageTypes[0], accessToken,
+ dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
cachingStrategy.get());
@@ -2197,4 +2204,9 @@ public class DFSOutputStream extends FSO
public long getFileId() {
return fileId;
}
+
+ private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
+ System.arraycopy(srcs, 0, dsts, 0, skipIndex);
+ System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex);
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Tue Jul 22 07:41:24 2014
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -71,11 +72,20 @@ public interface DataTransferProtocol {
/**
* Write a block to a datanode pipeline.
- *
+ * The receiver datanode of this call is the next datanode in the pipeline.
+ * The other downstream datanodes are specified by the targets parameter.
+ * Note that the receiver {@link DatanodeInfo} is not required in the
+ * parameter list since the receiver datanode knows its info. However, the
+ * {@link StorageType} for storing the replica in the receiver datanode is a
+ * parameter since the receiver datanode may support multiple storage types.
+ *
* @param blk the block being written.
+ * @param storageType for storing the replica in the receiver datanode.
* @param blockToken security token for accessing the block.
* @param clientName client's name.
- * @param targets target datanodes in the pipeline.
+ * @param targets other downstream datanodes in the pipeline.
+ * @param targetStorageTypes target {@link StorageType}s corresponding
+ * to the target datanodes.
* @param source source datanode.
* @param stage pipeline stage.
* @param pipelineSize the size of the pipeline.
@@ -84,9 +94,11 @@ public interface DataTransferProtocol {
* @param latestGenerationStamp the latest generation stamp of the block.
*/
public void writeBlock(final ExtendedBlock blk,
+ final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes,
final DatanodeInfo source,
final BlockConstructionStage stage,
final int pipelineSize,
@@ -110,7 +122,8 @@ public interface DataTransferProtocol {
public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
- final DatanodeInfo[] targets) throws IOException;
+ final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes) throws IOException;
/**
* Request short circuit access file descriptors from a DataNode.
@@ -148,11 +161,13 @@ public interface DataTransferProtocol {
* It is used for balancing purpose.
*
* @param blk the block being replaced.
+ * @param storageType the {@link StorageType} for storing the block.
* @param blockToken security token for accessing the block.
* @param delHint the hint for deleting the block in the original datanode.
* @param source the source datanode for receiving the block.
*/
public void replaceBlock(final ExtendedBlock blk,
+ final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
final DatanodeInfo source) throws IOException;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Tue Jul 22 07:41:24 2014
@@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
@@ -121,10 +122,13 @@ public abstract class Receiver implement
/** Receive OP_WRITE_BLOCK */
private void opWriteBlock(DataInputStream in) throws IOException {
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
+ final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
+ PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
- PBHelper.convert(proto.getTargetsList()),
+ targets,
+ PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
PBHelper.convert(proto.getSource()),
fromProto(proto.getStage()),
proto.getPipelineSize(),
@@ -140,10 +144,12 @@ public abstract class Receiver implement
private void opTransferBlock(DataInputStream in) throws IOException {
final OpTransferBlockProto proto =
OpTransferBlockProto.parseFrom(vintPrefixed(in));
+ final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
- PBHelper.convert(proto.getTargetsList()));
+ targets,
+ PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length));
}
/** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
@@ -176,6 +182,7 @@ public abstract class Receiver implement
private void opReplaceBlock(DataInputStream in) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
+ PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getToken()),
proto.getDelHint(),
PBHelper.convert(proto.getSource()));
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Tue Jul 22 07:41:24 2014
@@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
@@ -111,9 +112,11 @@ public class Sender implements DataTrans
@Override
public void writeBlock(final ExtendedBlock blk,
+ final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes,
final DatanodeInfo source,
final BlockConstructionStage stage,
final int pipelineSize,
@@ -130,7 +133,9 @@ public class Sender implements DataTrans
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
.setHeader(header)
+ .setStorageType(PBHelper.convertStorageType(storageType))
.addAllTargets(PBHelper.convert(targets, 1))
+ .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1))
.setStage(toProto(stage))
.setPipelineSize(pipelineSize)
.setMinBytesRcvd(minBytesRcvd)
@@ -150,12 +155,14 @@ public class Sender implements DataTrans
public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
- final DatanodeInfo[] targets) throws IOException {
+ final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes) throws IOException {
OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken))
.addAllTargets(PBHelper.convert(targets))
+ .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes))
.build();
send(out, Op.TRANSFER_BLOCK, proto);
@@ -196,11 +203,13 @@ public class Sender implements DataTrans
@Override
public void replaceBlock(final ExtendedBlock blk,
+ final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
final DatanodeInfo source) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+ .setStorageType(PBHelper.convertStorageType(storageType))
.setDelHint(delHint)
.setSource(PBHelper.convertDatanodeInfo(source))
.build();
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Tue Jul 22 07:41:24 2014
@@ -150,6 +150,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
@@ -674,14 +675,8 @@ public class PBHelper {
targets[i] = PBHelper.convert(locs.get(i));
}
- final int storageTypesCount = proto.getStorageTypesCount();
- final StorageType[] storageTypes;
- if (storageTypesCount == 0) {
- storageTypes = null;
- } else {
- Preconditions.checkState(storageTypesCount == locs.size());
- storageTypes = convertStorageTypeProtos(proto.getStorageTypesList());
- }
+ final StorageType[] storageTypes = convertStorageTypes(
+ proto.getStorageTypesList(), locs.size());
final int storageIDsCount = proto.getStorageIDsCount();
final String[] storageIDs;
@@ -969,6 +964,20 @@ public class PBHelper {
targets[i] = PBHelper.convert(targetList.get(i));
}
+ StorageType[][] targetStorageTypes = new StorageType[targetList.size()][];
+ List<StorageTypesProto> targetStorageTypesList = blkCmd.getTargetStorageTypesList();
+ if (targetStorageTypesList.isEmpty()) { // missing storage types
+ for(int i = 0; i < targetStorageTypes.length; i++) {
+ targetStorageTypes[i] = new StorageType[targets[i].length];
+ Arrays.fill(targetStorageTypes[i], StorageType.DEFAULT);
+ }
+ } else {
+ for(int i = 0; i < targetStorageTypes.length; i++) {
+ List<StorageTypeProto> p = targetStorageTypesList.get(i).getStorageTypesList();
+ targetStorageTypes[i] = p.toArray(new StorageType[p.size()]);
+ }
+ }
+
List<StorageUuidsProto> targetStorageUuidsList = blkCmd.getTargetStorageUuidsList();
String[][] targetStorageIDs = new String[targetStorageUuidsList.size()][];
for(int i = 0; i < targetStorageIDs.length; i++) {
@@ -991,7 +1000,7 @@ public class PBHelper {
throw new AssertionError("Unknown action type: " + blkCmd.getAction());
}
return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets,
- targetStorageIDs);
+ targetStorageTypes, targetStorageIDs);
}
public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) {
@@ -1605,8 +1614,25 @@ public class PBHelper {
}
}
- private static StorageTypeProto convertStorageType(
- StorageType type) {
+ public static List<StorageTypeProto> convertStorageTypes(
+ StorageType[] types) {
+ return convertStorageTypes(types, 0);
+ }
+
+ public static List<StorageTypeProto> convertStorageTypes(
+ StorageType[] types, int startIdx) {
+ if (types == null) {
+ return null;
+ }
+ final List<StorageTypeProto> protos = new ArrayList<StorageTypeProto>(
+ types.length);
+ for (int i = startIdx; i < types.length; ++i) {
+ protos.add(convertStorageType(types[i]));
+ }
+ return protos;
+ }
+
+ public static StorageTypeProto convertStorageType(StorageType type) {
switch(type) {
case DISK:
return StorageTypeProto.DISK;
@@ -1621,7 +1647,7 @@ public class PBHelper {
public static DatanodeStorage convert(DatanodeStorageProto s) {
return new DatanodeStorage(s.getStorageUuid(),
PBHelper.convertState(s.getState()),
- PBHelper.convertType(s.getStorageType()));
+ PBHelper.convertStorageType(s.getStorageType()));
}
private static State convertState(StorageState state) {
@@ -1634,7 +1660,7 @@ public class PBHelper {
}
}
- private static StorageType convertType(StorageTypeProto type) {
+ public static StorageType convertStorageType(StorageTypeProto type) {
switch(type) {
case DISK:
return StorageType.DISK;
@@ -1646,11 +1672,16 @@ public class PBHelper {
}
}
- private static StorageType[] convertStorageTypeProtos(
- List<StorageTypeProto> storageTypesList) {
- final StorageType[] storageTypes = new StorageType[storageTypesList.size()];
- for (int i = 0; i < storageTypes.length; ++i) {
- storageTypes[i] = PBHelper.convertType(storageTypesList.get(i));
+ public static StorageType[] convertStorageTypes(
+ List<StorageTypeProto> storageTypesList, int expectedSize) {
+ final StorageType[] storageTypes = new StorageType[expectedSize];
+ if (storageTypesList.size() != expectedSize) { // missing storage types
+ Preconditions.checkState(storageTypesList.isEmpty());
+ Arrays.fill(storageTypes, StorageType.DEFAULT);
+ } else {
+ for (int i = 0; i < storageTypes.length; ++i) {
+ storageTypes[i] = convertStorageType(storageTypesList.get(i));
+ }
}
return storageTypes;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Tue Jul 22 07:41:24 2014
@@ -59,6 +59,7 @@ import org.apache.hadoop.conf.Configured
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -368,7 +369,7 @@ public class Balancer {
in = new DataInputStream(new BufferedInputStream(unbufIn,
HdfsConstants.IO_FILE_BUFFER_SIZE));
- sendRequest(out, eb, accessToken);
+ sendRequest(out, eb, StorageType.DEFAULT, accessToken);
receiveResponse(in);
bytesMoved.addAndGet(block.getNumBytes());
LOG.info("Successfully moved " + this);
@@ -400,8 +401,9 @@ public class Balancer {
/* Send a block replace request to the output stream*/
private void sendRequest(DataOutputStream out, ExtendedBlock eb,
+ StorageType storageType,
Token<BlockTokenIdentifier> accessToken) throws IOException {
- new Sender(out).replaceBlock(eb, accessToken,
+ new Sender(out).replaceBlock(eb, storageType, accessToken,
source.getStorageID(), proxySource.getDatanode());
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Tue Jul 22 07:41:24 2014
@@ -575,7 +575,8 @@ class BPOfferService {
switch(cmd.getAction()) {
case DatanodeProtocol.DNA_TRANSFER:
// Send a copy of a block to another datanode
- dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
+ dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(),
+ bcmd.getTargets(), bcmd.getTargetStorageTypes());
dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
break;
case DatanodeProtocol.DNA_INVALIDATE:
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Jul 22 07:41:24 2014
@@ -37,6 +37,7 @@ import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -122,7 +123,8 @@ class BlockReceiver implements Closeable
private boolean syncOnClose;
private long restartBudget;
- BlockReceiver(final ExtendedBlock block, final DataInputStream in,
+ BlockReceiver(final ExtendedBlock block, final StorageType storageType,
+ final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
@@ -162,11 +164,11 @@ class BlockReceiver implements Closeable
// Open local disk out
//
if (isDatanode) { //replication or move
- replicaInfo = datanode.data.createTemporary(block);
+ replicaInfo = datanode.data.createTemporary(storageType, block);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
- replicaInfo = datanode.data.createRbw(block);
+ replicaInfo = datanode.data.createRbw(storageType, block);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
break;
@@ -198,7 +200,7 @@ class BlockReceiver implements Closeable
case TRANSFER_RBW:
case TRANSFER_FINALIZED:
// this is a transfer destination
- replicaInfo = datanode.data.createTemporary(block);
+ replicaInfo = datanode.data.createTemporary(storageType, block);
break;
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Jul 22 07:41:24 2014
@@ -19,11 +19,66 @@ package org.apache.hadoop.hdfs.server.da
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
+import static org.apache.hadoop.util.ExitUtil.terminate;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.BlockingService;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.management.ObjectName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,10 +94,23 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.net.DomainPeerServer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.protocol.datatransfer.*;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
@@ -50,9 +118,20 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
-import org.apache.hadoop.hdfs.protocolPB.*;
-import org.apache.hadoop.hdfs.security.token.block.*;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -65,7 +144,11 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.*;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.http.HttpConfig;
@@ -88,22 +171,21 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.JvmPauseMonitor;
+import org.apache.hadoop.util.ServicePlugin;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
-import javax.management.ObjectName;
-
-import java.io.*;
-import java.lang.management.ManagementFactory;
-import java.net.*;
-import java.nio.channels.SocketChannel;
-import java.security.PrivilegedExceptionAction;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import static org.apache.hadoop.util.ExitUtil.terminate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
/**********************************************************
* DataNode is a class (and program) that stores a set of
@@ -1475,8 +1557,8 @@ public class DataNode extends Configured
return xmitsInProgress.get();
}
- private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[])
- throws IOException {
+ private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
+ StorageType[] xferTargetStorageTypes) throws IOException {
BPOfferService bpos = getBPOSForBlock(block);
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
@@ -1512,16 +1594,17 @@ public class DataNode extends Configured
LOG.info(bpReg + " Starting thread to transfer " +
block + " to " + xfersBuilder);
- new Daemon(new DataTransfer(xferTargets, block,
+ new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block,
BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
}
}
void transferBlocks(String poolId, Block blocks[],
- DatanodeInfo xferTargets[][]) {
+ DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) {
for (int i = 0; i < blocks.length; i++) {
try {
- transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]);
+ transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i],
+ xferTargetStorageTypes[i]);
} catch (IOException ie) {
LOG.warn("Failed to transfer block " + blocks[i], ie);
}
@@ -1624,6 +1707,7 @@ public class DataNode extends Configured
*/
private class DataTransfer implements Runnable {
final DatanodeInfo[] targets;
+ final StorageType[] targetStorageTypes;
final ExtendedBlock b;
final BlockConstructionStage stage;
final private DatanodeRegistration bpReg;
@@ -1634,7 +1718,8 @@ public class DataNode extends Configured
* Connect to the first item in the target list. Pass along the
* entire target list, the block, and the data.
*/
- DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
+ DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes,
+ ExtendedBlock b, BlockConstructionStage stage,
final String clientname) {
if (DataTransferProtocol.LOG.isDebugEnabled()) {
DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
@@ -1644,6 +1729,7 @@ public class DataNode extends Configured
+ ", targests=" + Arrays.asList(targets));
}
this.targets = targets;
+ this.targetStorageTypes = targetStorageTypes;
this.b = b;
this.stage = stage;
BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
@@ -1702,7 +1788,8 @@ public class DataNode extends Configured
false, false, true, DataNode.this, null, cachingStrategy);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
- new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
+ new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
+ clientname, targets, targetStorageTypes, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
// send data & checksum
@@ -2403,7 +2490,8 @@ public class DataNode extends Configured
* @param client client name
*/
void transferReplicaForPipelineRecovery(final ExtendedBlock b,
- final DatanodeInfo[] targets, final String client) throws IOException {
+ final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
+ final String client) throws IOException {
final long storedGS;
final long visible;
final BlockConstructionStage stage;
@@ -2436,7 +2524,7 @@ public class DataNode extends Configured
b.setNumBytes(visible);
if (targets.length > 0) {
- new DataTransfer(targets, b, stage, client).run();
+ new DataTransfer(targets, targetStorageTypes, b, stage, client).run();
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Jul 22 07:41:24 2014
@@ -45,6 +45,7 @@ import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -524,9 +525,11 @@ class DataXceiver extends Receiver imple
@Override
public void writeBlock(final ExtendedBlock block,
+ final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientname,
final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes,
final DatanodeInfo srcDataNode,
final BlockConstructionStage stage,
final int pipelineSize,
@@ -590,12 +593,13 @@ class DataXceiver extends Receiver imple
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
- blockReceiver = new BlockReceiver(block, in,
+ blockReceiver = new BlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy);
+
storageUuid = blockReceiver.getStorageUuid();
} else {
storageUuid = datanode.data.recoverClose(
@@ -636,10 +640,10 @@ class DataXceiver extends Receiver imple
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn);
- new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
- clientname, targets, srcDataNode, stage, pipelineSize,
- minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum,
- cachingStrategy);
+ new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
+ blockToken, clientname, targets, targetStorageTypes, srcDataNode,
+ stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+ latestGenerationStamp, requestedChecksum, cachingStrategy);
mirrorOut.flush();
@@ -754,7 +758,8 @@ class DataXceiver extends Receiver imple
public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
- final DatanodeInfo[] targets) throws IOException {
+ final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes) throws IOException {
checkAccess(socketOut, true, blk, blockToken,
Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
previousOpClientName = clientName;
@@ -763,7 +768,8 @@ class DataXceiver extends Receiver imple
final DataOutputStream out = new DataOutputStream(
getOutputStream());
try {
- datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
+ datanode.transferReplicaForPipelineRecovery(blk, targets,
+ targetStorageTypes, clientName);
writeResponse(Status.SUCCESS, null, out);
} finally {
IOUtils.closeStream(out);
@@ -941,6 +947,7 @@ class DataXceiver extends Receiver imple
@Override
public void replaceBlock(final ExtendedBlock block,
+ final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
final DatanodeInfo proxySource) throws IOException {
@@ -1026,8 +1033,8 @@ class DataXceiver extends Receiver imple
DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
checksumInfo.getChecksum());
// open a block receiver and check if the block does not exist
- blockReceiver = new BlockReceiver(
- block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
+ blockReceiver = new BlockReceiver(block, storageType,
+ proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum,
CachingStrategy.newDropBehind());
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Tue Jul 22 07:41:24 2014
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -176,8 +177,8 @@ public interface FsDatasetSpi<V extends
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
- public ReplicaInPipelineInterface createTemporary(ExtendedBlock b
- ) throws IOException;
+ public ReplicaInPipelineInterface createTemporary(StorageType storageType,
+ ExtendedBlock b) throws IOException;
/**
* Creates a RBW replica and returns the meta info of the replica
@@ -186,8 +187,8 @@ public interface FsDatasetSpi<V extends
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
- public ReplicaInPipelineInterface createRbw(ExtendedBlock b
- ) throws IOException;
+ public ReplicaInPipelineInterface createRbw(StorageType storageType,
+ ExtendedBlock b) throws IOException;
/**
* Recovers a RBW replica and returns the meta info of the replica
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Tue Jul 22 07:41:24 2014
@@ -17,6 +17,28 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -24,12 +46,37 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.datanode.*;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.hdfs.server.datanode.Replica;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -43,15 +90,6 @@ import org.apache.hadoop.util.DiskChecke
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.*;
-import java.util.concurrent.Executor;
-
/**************************************************
* FSDataset manages a set of data blocks. Each block
* has a unique name and an extent on disk.
@@ -736,8 +774,8 @@ class FsDatasetImpl implements FsDataset
}
@Override // FsDatasetSpi
- public synchronized ReplicaInPipeline createRbw(ExtendedBlock b)
- throws IOException {
+ public synchronized ReplicaInPipeline createRbw(StorageType storageType,
+ ExtendedBlock b) throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
@@ -746,7 +784,7 @@ class FsDatasetImpl implements FsDataset
" and thus cannot be created.");
}
// create a new block
- FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
+ FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
// create a rbw file to hold block in the designated volume
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
@@ -874,8 +912,8 @@ class FsDatasetImpl implements FsDataset
}
@Override // FsDatasetSpi
- public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b)
- throws IOException {
+ public synchronized ReplicaInPipeline createTemporary(StorageType storageType,
+ ExtendedBlock b) throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (replicaInfo != null) {
throw new ReplicaAlreadyExistsException("Block " + b +
@@ -883,7 +921,7 @@ class FsDatasetImpl implements FsDataset
" and thus cannot be created.");
}
- FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
+ FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
// create a temporary file to hold block in the designated volume
File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Tue Jul 22 07:41:24 2014
@@ -18,13 +18,17 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
-import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.Time;
class FsVolumeList {
/**
@@ -52,11 +56,18 @@ class FsVolumeList {
* by a single thread and next volume is chosen with no concurrent
* update to {@link #volumes}.
* @param blockSize free space needed on the volume
+ * @param storageType the desired {@link StorageType}
* @return next volume to store the block in.
*/
- // TODO should choose volume with storage type
- synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException {
- return blockChooser.chooseVolume(volumes, blockSize);
+ synchronized FsVolumeImpl getNextVolume(StorageType storageType,
+ long blockSize) throws IOException {
+ final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
+ for(FsVolumeImpl v : volumes) {
+ if (v.getStorageType() == storageType) {
+ list.add(v);
+ }
+ }
+ return blockChooser.chooseVolume(list, blockSize);
}
long getDfsUsed() throws IOException {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java Tue Jul 22 07:41:24 2014
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
@@ -50,6 +51,7 @@ public class BlockCommand extends Datano
final String poolId;
final Block[] blocks;
final DatanodeInfo[][] targets;
+ final StorageType[][] targetStorageTypes;
final String[][] targetStorageIDs;
/**
@@ -62,17 +64,20 @@ public class BlockCommand extends Datano
this.poolId = poolId;
blocks = new Block[blocktargetlist.size()];
targets = new DatanodeInfo[blocks.length][];
+ targetStorageTypes = new StorageType[blocks.length][];
targetStorageIDs = new String[blocks.length][];
for(int i = 0; i < blocks.length; i++) {
BlockTargetPair p = blocktargetlist.get(i);
blocks[i] = p.block;
targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets);
+ targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets);
targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets);
}
}
private static final DatanodeInfo[][] EMPTY_TARGET_DATANODES = {};
+ private static final StorageType[][] EMPTY_TARGET_STORAGE_TYPES = {};
private static final String[][] EMPTY_TARGET_STORAGEIDS = {};
/**
@@ -81,7 +86,7 @@ public class BlockCommand extends Datano
*/
public BlockCommand(int action, String poolId, Block blocks[]) {
this(action, poolId, blocks, EMPTY_TARGET_DATANODES,
- EMPTY_TARGET_STORAGEIDS);
+ EMPTY_TARGET_STORAGE_TYPES, EMPTY_TARGET_STORAGEIDS);
}
/**
@@ -89,11 +94,13 @@ public class BlockCommand extends Datano
* @param blocks blocks related to the action
*/
public BlockCommand(int action, String poolId, Block[] blocks,
- DatanodeInfo[][] targets, String[][] targetStorageIDs) {
+ DatanodeInfo[][] targets, StorageType[][] targetStorageTypes,
+ String[][] targetStorageIDs) {
super(action);
this.poolId = poolId;
this.blocks = blocks;
this.targets = targets;
+ this.targetStorageTypes = targetStorageTypes;
this.targetStorageIDs = targetStorageIDs;
}
@@ -109,6 +116,10 @@ public class BlockCommand extends Datano
return targets;
}
+ public StorageType[][] getTargetStorageTypes() {
+ return targetStorageTypes;
+ }
+
public String[][] getTargetStorageIDs() {
return targetStorageIDs;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto Tue Jul 22 07:41:24 2014
@@ -113,6 +113,7 @@ message BlockCommandProto {
repeated BlockProto blocks = 3;
repeated DatanodeInfosProto targets = 4;
repeated StorageUuidsProto targetStorageUuids = 5;
+ repeated StorageTypesProto targetStorageTypes = 6;
}
/**
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Tue Jul 22 07:41:24 2014
@@ -107,17 +107,21 @@ message OpWriteBlockProto {
*/
required ChecksumProto requestedChecksum = 9;
optional CachingStrategyProto cachingStrategy = 10;
+ optional StorageTypeProto storageType = 11 [default = DISK];
+ repeated StorageTypeProto targetStorageTypes = 12;
}
message OpTransferBlockProto {
required ClientOperationHeaderProto header = 1;
repeated DatanodeInfoProto targets = 2;
+ repeated StorageTypeProto targetStorageTypes = 3;
}
message OpReplaceBlockProto {
required BaseHeaderProto header = 1;
required string delHint = 2;
required DatanodeInfoProto source = 3;
+ optional StorageTypeProto storageType = 4 [default = DISK];
}
message OpCopyBlockProto {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto Tue Jul 22 07:41:24 2014
@@ -137,6 +137,13 @@ enum StorageTypeProto {
}
/**
+ * A list of storage types.
+ */
+message StorageTypesProto {
+ repeated StorageTypeProto storageTypes = 1;
+}
+
+/**
* A list of storage IDs.
*/
message StorageUuidsProto {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Tue Jul 22 07:41:24 2014
@@ -380,7 +380,7 @@ public class DFSTestUtil {
*/
public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b,
int racks, int replicas, int neededReplicas)
- throws IOException, TimeoutException, InterruptedException {
+ throws TimeoutException, InterruptedException {
int curRacks = 0;
int curReplicas = 0;
int curNeededReplicas = 0;
@@ -414,7 +414,7 @@ public class DFSTestUtil {
*/
public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns,
Path file, ExtendedBlock b, int corruptRepls)
- throws IOException, TimeoutException, InterruptedException {
+ throws TimeoutException, InterruptedException {
int count = 0;
final int ATTEMPTS = 50;
int repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
@@ -839,7 +839,8 @@ public class DFSTestUtil {
// send the request
new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(),
- dfsClient.clientName, new DatanodeInfo[]{datanodes[1]});
+ dfsClient.clientName, new DatanodeInfo[]{datanodes[1]},
+ new StorageType[]{StorageType.DEFAULT});
out.flush();
return BlockOpResponseProto.parseDelimitedFrom(in);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Tue Jul 22 07:41:24 2014
@@ -125,17 +125,16 @@ public class TestDataTransferProtocol {
throw eof;
}
- LOG.info("Received: " +new String(retBuf));
- LOG.info("Expected: " + StringUtils.byteToHexString(recvBuf.toByteArray()));
+ String received = StringUtils.byteToHexString(retBuf);
+ String expected = StringUtils.byteToHexString(recvBuf.toByteArray());
+ LOG.info("Received: " + received);
+ LOG.info("Expected: " + expected);
if (eofExpected) {
throw new IOException("Did not recieve IOException when an exception " +
"is expected while reading from " + datanode);
}
-
- byte[] needed = recvBuf.toByteArray();
- assertEquals(StringUtils.byteToHexString(needed),
- StringUtils.byteToHexString(retBuf));
+ assertEquals(expected, received);
} finally {
IOUtils.closeSocket(sock);
}
@@ -184,10 +183,7 @@ public class TestDataTransferProtocol {
String description, Boolean eofExcepted) throws IOException {
sendBuf.reset();
recvBuf.reset();
- sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
- new DatanodeInfo[1], null, stage,
- 0, block.getNumBytes(), block.getNumBytes(), newGS,
- DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
+ writeBlock(block, stage, newGS, DEFAULT_CHECKSUM);
if (eofExcepted) {
sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData(description, true);
@@ -343,10 +339,7 @@ public class TestDataTransferProtocol {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
try {
cluster.waitActive();
- DFSClient dfsClient = new DFSClient(
- new InetSocketAddress("localhost", cluster.getNameNodePort()),
- conf);
- datanode = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0];
+ datanode = cluster.getFileSystem().getDataNodeStats(DatanodeReportType.LIVE)[0];
dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
FileSystem fileSys = cluster.getFileSystem();
@@ -381,23 +374,14 @@ public class TestDataTransferProtocol {
DataChecksum badChecksum = Mockito.spy(DEFAULT_CHECKSUM);
Mockito.doReturn(-1).when(badChecksum).getBytesPerChecksum();
- sender.writeBlock(new ExtendedBlock(poolId, newBlockId),
- BlockTokenSecretManager.DUMMY_TOKEN, "cl",
- new DatanodeInfo[1], null,
- BlockConstructionStage.PIPELINE_SETUP_CREATE,
- 0, 0L, 0L, 0L,
- badChecksum, CachingStrategy.newDefaultStrategy());
+ writeBlock(poolId, newBlockId, badChecksum);
recvBuf.reset();
sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData("wrong bytesPerChecksum while writing", true);
sendBuf.reset();
recvBuf.reset();
- sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
- BlockTokenSecretManager.DUMMY_TOKEN, "cl",
- new DatanodeInfo[1], null,
- BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
- DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
+ writeBlock(poolId, ++newBlockId, DEFAULT_CHECKSUM);
PacketHeader hdr = new PacketHeader(
4, // size of packet
@@ -416,11 +400,7 @@ public class TestDataTransferProtocol {
// test for writing a valid zero size block
sendBuf.reset();
recvBuf.reset();
- sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
- BlockTokenSecretManager.DUMMY_TOKEN, "cl",
- new DatanodeInfo[1], null,
- BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
- DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
+ writeBlock(poolId, ++newBlockId, DEFAULT_CHECKSUM);
hdr = new PacketHeader(
8, // size of packet
@@ -532,4 +512,18 @@ public class TestDataTransferProtocol {
assertTrue(hdr.sanityCheck(99));
assertFalse(hdr.sanityCheck(100));
}
+
+ void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException {
+ writeBlock(new ExtendedBlock(poolId, blockId),
+ BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum);
+ }
+
+ void writeBlock(ExtendedBlock block, BlockConstructionStage stage,
+ long newGS, DataChecksum checksum) throws IOException {
+ sender.writeBlock(block, StorageType.DEFAULT,
+ BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+ new DatanodeInfo[1], new StorageType[1], null, stage,
+ 0, block.getNumBytes(), block.getNumBytes(), newGS,
+ checksum, CachingStrategy.newDefaultStrategy());
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java Tue Jul 22 07:41:24 2014
@@ -550,8 +550,10 @@ public class TestPBHelper {
dnInfos[1][0] = DFSTestUtil.getLocalDatanodeInfo();
dnInfos[1][1] = DFSTestUtil.getLocalDatanodeInfo();
String[][] storageIDs = {{"s00"}, {"s10", "s11"}};
+ StorageType[][] storageTypes = {{StorageType.DEFAULT},
+ {StorageType.DEFAULT, StorageType.DEFAULT}};
BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1",
- blocks, dnInfos, storageIDs);
+ blocks, dnInfos, storageTypes, storageIDs);
BlockCommandProto bcProto = PBHelper.convert(bc);
BlockCommand bc2 = PBHelper.convert(bcProto);
assertEquals(bc.getAction(), bc2.getAction());
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java Tue Jul 22 07:41:24 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -324,7 +325,7 @@ public abstract class BlockReportTestBas
public void blockReport_03() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
- ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
+ writeFile(METHOD_NAME, FILE_SIZE, filePath);
// all blocks belong to the same file, hence same BP
DataNode dn = cluster.getDataNodes().get(DN_N0);
@@ -363,7 +364,7 @@ public abstract class BlockReportTestBas
// Create a bogus new block which will not be present on the namenode.
ExtendedBlock b = new ExtendedBlock(
poolId, rand.nextLong(), 1024L, rand.nextLong());
- dn.getFSDataset().createRbw(b);
+ dn.getFSDataset().createRbw(StorageType.DEFAULT, b);
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Tue Jul 22 07:41:24 2014
@@ -744,14 +744,14 @@ public class SimulatedFSDataset implemen
}
@Override // FsDatasetSpi
- public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
- throws IOException {
- return createTemporary(b);
+ public synchronized ReplicaInPipelineInterface createRbw(
+ StorageType storageType, ExtendedBlock b) throws IOException {
+ return createTemporary(storageType, b);
}
@Override // FsDatasetSpi
- public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
- throws IOException {
+ public synchronized ReplicaInPipelineInterface createTemporary(
+ StorageType storageType, ExtendedBlock b) throws IOException {
if (isValidBlock(b)) {
throw new ReplicaAlreadyExistsException("Block " + b +
" is valid, and cannot be written to.");
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java Tue Jul 22 07:41:24 2014
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.Distribute
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -531,7 +532,7 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
- dn.data.createRbw(block);
+ dn.data.createRbw(StorageType.DEFAULT, block);
try {
dn.syncBlock(rBlock, initBlockRecords(dn));
fail("Sync should fail");
@@ -554,7 +555,8 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
- ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block);
+ ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(
+ StorageType.DEFAULT, block);
ReplicaOutputStreams streams = null;
try {
streams = replicaInfo.createStreams(true,
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Tue Jul 22 07:41:24 2014
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@@ -264,7 +265,8 @@ public class TestBlockReplacement {
sock.setKeepAlive(true);
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
- new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN,
+ new Sender(out).replaceBlock(block, StorageType.DEFAULT,
+ BlockTokenSecretManager.DUMMY_TOKEN,
source.getDatanodeUuid(), sourceProxy);
out.flush();
// receiveResponse
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Tue Jul 22 07:41:24 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -147,9 +148,9 @@ public class TestDiskError {
DataChecksum checksum = DataChecksum.newDataChecksum(
DataChecksum.Type.CRC32, 512);
- new Sender(out).writeBlock(block.getBlock(),
+ new Sender(out).writeBlock(block.getBlock(), StorageType.DEFAULT,
BlockTokenSecretManager.DUMMY_TOKEN, "",
- new DatanodeInfo[0], null,
+ new DatanodeInfo[0], new StorageType[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
checksum, CachingStrategy.newDefaultStrategy());
out.flush();
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java?rev=1612493&r1=1612492&r2=1612493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java Tue Jul 22 07:41:24 2014
@@ -29,6 +29,7 @@ import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -65,7 +66,8 @@ public class TestSimulatedFSDataset {
ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0);
// we pass expected len as zero, - fsdataset should use the sizeof actual
// data written
- ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b);
+ ReplicaInPipelineInterface bInfo = fsdataset.createRbw(
+ StorageType.DEFAULT, b);
ReplicaOutputStreams out = bInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
try {