You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2012/02/07 17:59:49 UTC
svn commit: r1241519 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/protocolPB/
src/main/java/org/apache/hadoop/hdfs/server/datanode/
src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/jav...
Author: suresh
Date: Tue Feb 7 16:59:48 2012
New Revision: 1241519
URL: http://svn.apache.org/viewvc?rev=1241519&view=rev
Log:
HDFS-2899. Service protocol changes in DatanodeProtocol to add multiple storages. Contributed by Suresh Srinivas.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1241519&r1=1241518&r2=1241519&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Feb 7 16:59:48 2012
@@ -45,9 +45,12 @@ Trunk (unreleased changes)
HDFS-2697. Move RefreshAuthPolicy, RefreshUserMappings, GetUserMappings
protocol to protocol buffers. (jitendra)
- HDFS-2880. Protobuf chagnes in DatanodeProtocol to add multiple storages.
+ HDFS-2880. Protobuf changes in DatanodeProtocol to add multiple storages.
(suresh)
+ HDFS-2899. Service protocol changes in DatanodeProtocol to add multiple
+ storages. (suresh)
+
IMPROVEMENTS
HADOOP-7524 Change RPC to allow multiple protocols including multuple
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java?rev=1241519&r1=1241518&r2=1241519&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java Tue Feb 7 16:59:48 2012
@@ -48,15 +48,18 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
@@ -153,13 +156,17 @@ public class DatanodeProtocolClientSideT
}
@Override
- public DatanodeRegistration registerDatanode(DatanodeRegistration registration)
- throws IOException {
- RegisterDatanodeRequestProto req = RegisterDatanodeRequestProto
- .newBuilder().setRegistration(PBHelper.convert(registration)).build();
+ public DatanodeRegistration registerDatanode(DatanodeRegistration registration,
+ DatanodeStorage[] storages) throws IOException {
+ RegisterDatanodeRequestProto.Builder builder = RegisterDatanodeRequestProto
+ .newBuilder().setRegistration(PBHelper.convert(registration));
+ for (DatanodeStorage s : storages) {
+ builder.addStorages(PBHelper.convert(s));
+ }
+
RegisterDatanodeResponseProto resp;
try {
- resp = rpcProxy.registerDatanode(NULL_CONTROLLER, req);
+ resp = rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
@@ -168,22 +175,19 @@ public class DatanodeProtocolClientSideT
@Override
public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
- long capacity, long dfsUsed, long remaining, long blockPoolUsed,
- int xmitsInProgress, int xceiverCount, int failedVolumes)
- throws IOException {
- StorageReportProto report = StorageReportProto.newBuilder()
- .setBlockPoolUsed(blockPoolUsed).setCapacity(capacity)
- .setDfsUsed(dfsUsed).setRemaining(remaining)
- .setStorageID(registration.getStorageID()).build();
-
- HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder()
- .setRegistration(PBHelper.convert(registration)).addReports(report)
+ StorageReport[] reports, int xmitsInProgress, int xceiverCount,
+ int failedVolumes) throws IOException {
+ HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
+ .setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
- .setFailedVolumes(failedVolumes)
- .build();
+ .setFailedVolumes(failedVolumes);
+ for (StorageReport r : reports) {
+ builder.addReports(PBHelper.convert(r));
+ }
+
HeartbeatResponseProto resp;
try {
- resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, req);
+ resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
@@ -198,21 +202,23 @@ public class DatanodeProtocolClientSideT
@Override
public DatanodeCommand blockReport(DatanodeRegistration registration,
- String poolId, long[] blocks) throws IOException {
- StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto
- .newBuilder().setStorageID(registration.getStorageID());
+ String poolId, StorageBlockReport[] reports) throws IOException {
+ BlockReportRequestProto.Builder builder = BlockReportRequestProto
+ .newBuilder().setRegistration(PBHelper.convert(registration))
+ .setBlockPoolId(poolId);
- if (blocks != null) {
+ for (StorageBlockReport r : reports) {
+ StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto
+ .newBuilder().setStorageID(r.getStorageID());
+ long[] blocks = r.getBlocks();
for (int i = 0; i < blocks.length; i++) {
reportBuilder.addBlocks(blocks[i]);
}
+ builder.addReports(reportBuilder.build());
}
- BlockReportRequestProto req = BlockReportRequestProto
- .newBuilder().setRegistration(PBHelper.convert(registration))
- .setBlockPoolId(poolId).addReports(reportBuilder.build()).build();
BlockReportResponseProto resp;
try {
- resp = rpcProxy.blockReport(NULL_CONTROLLER, req);
+ resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
@@ -220,23 +226,24 @@ public class DatanodeProtocolClientSideT
}
@Override
- public void blockReceivedAndDeleted(DatanodeRegistration reg,
- String poolId, ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks)
+ public void blockReceivedAndDeleted(DatanodeRegistration registration,
+ String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks)
throws IOException {
- StorageReceivedDeletedBlocksProto.Builder builder =
- StorageReceivedDeletedBlocksProto.newBuilder()
- .setStorageID(reg.getStorageID());
- if (receivedAndDeletedBlocks != null) {
- for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
- builder.addBlocks(PBHelper.convert(receivedAndDeletedBlocks[i]));
+ BlockReceivedAndDeletedRequestProto.Builder builder =
+ BlockReceivedAndDeletedRequestProto.newBuilder()
+ .setRegistration(PBHelper.convert(registration))
+ .setBlockPoolId(poolId);
+ for (StorageReceivedDeletedBlocks storageBlock : receivedAndDeletedBlocks) {
+ StorageReceivedDeletedBlocksProto.Builder repBuilder =
+ StorageReceivedDeletedBlocksProto.newBuilder();
+ repBuilder.setStorageID(storageBlock.getStorageID());
+ for (ReceivedDeletedBlockInfo rdBlock : storageBlock.getBlocks()) {
+ repBuilder.addBlocks(PBHelper.convert(rdBlock));
}
+ builder.addBlocks(repBuilder.build());
}
- BlockReceivedAndDeletedRequestProto req =
- BlockReceivedAndDeletedRequestProto.newBuilder()
- .setRegistration(PBHelper.convert(reg))
- .setBlockPoolId(poolId).addBlocks(builder.build()).build();
try {
- rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, req);
+ rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java?rev=1241519&r1=1241518&r2=1241519&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java Tue Feb 7 16:59:48 2012
@@ -40,6 +40,8 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
@@ -49,8 +51,12 @@ import org.apache.hadoop.hdfs.protocolR2
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
@@ -84,8 +90,12 @@ public class DatanodeProtocolServerSideT
DatanodeRegistration registration = PBHelper.convert(request
.getRegistration());
DatanodeRegistration registrationResp;
+ DatanodeStorage[] storages = new DatanodeStorage[request.getStoragesCount()];
+ for (int i = 0; i < request.getStoragesCount(); i++) {
+ storages[i] = PBHelper.convert(request.getStorages(i));
+ }
try {
- registrationResp = impl.registerDatanode(registration);
+ registrationResp = impl.registerDatanode(registration, storages);
} catch (IOException e) {
throw new ServiceException(e);
}
@@ -98,11 +108,17 @@ public class DatanodeProtocolServerSideT
HeartbeatRequestProto request) throws ServiceException {
DatanodeCommand[] cmds = null;
try {
- StorageReportProto report = request.getReports(0);
+ List<StorageReportProto> list = request.getReportsList();
+ StorageReport[] report = new StorageReport[list.size()];
+ int i = 0;
+ for (StorageReportProto p : list) {
+ report[i++] = new StorageReport(p.getStorageID(), p.getFailed(),
+ p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
+ p.getBlockPoolUsed());
+ }
cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
- report.getCapacity(), report.getDfsUsed(), report.getRemaining(),
- report.getBlockPoolUsed(), request.getXmitsInProgress(),
- request.getXceiverCount(), request.getFailedVolumes());
+ report, request.getXmitsInProgress(), request.getXceiverCount(),
+ request.getFailedVolumes());
} catch (IOException e) {
throw new ServiceException(e);
}
@@ -122,14 +138,21 @@ public class DatanodeProtocolServerSideT
public BlockReportResponseProto blockReport(RpcController controller,
BlockReportRequestProto request) throws ServiceException {
DatanodeCommand cmd = null;
- List<Long> blockIds = request.getReports(0).getBlocksList();
- long[] blocks = new long[blockIds.size()];
- for (int i = 0; i < blockIds.size(); i++) {
- blocks[i] = blockIds.get(i);
+ StorageBlockReport[] report =
+ new StorageBlockReport[request.getReportsCount()];
+
+ int index = 0;
+ for (StorageBlockReportProto s : request.getReportsList()) {
+ List<Long> blockIds = s.getBlocksList();
+ long[] blocks = new long[blockIds.size()];
+ for (int i = 0; i < blockIds.size(); i++) {
+ blocks[i] = blockIds.get(i);
+ }
+ report[index++] = new StorageBlockReport(s.getStorageID(), blocks);
}
try {
cmd = impl.blockReport(PBHelper.convert(request.getRegistration()),
- request.getBlockPoolId(), blocks);
+ request.getBlockPoolId(), report);
} catch (IOException e) {
throw new ServiceException(e);
}
@@ -145,12 +168,18 @@ public class DatanodeProtocolServerSideT
public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted(
RpcController controller, BlockReceivedAndDeletedRequestProto request)
throws ServiceException {
- List<ReceivedDeletedBlockInfoProto> rdbip = request.getBlocks(0)
- .getBlocksList();
- ReceivedDeletedBlockInfo[] info =
- new ReceivedDeletedBlockInfo[rdbip.size()];
- for (int i = 0; i < rdbip.size(); i++) {
- info[i] = PBHelper.convert(rdbip.get(i));
+ List<StorageReceivedDeletedBlocksProto> sBlocks = request.getBlocksList();
+ StorageReceivedDeletedBlocks[] info =
+ new StorageReceivedDeletedBlocks[sBlocks.size()];
+ for (int i = 0; i < sBlocks.size(); i++) {
+ StorageReceivedDeletedBlocksProto sBlock = sBlocks.get(i);
+ List<ReceivedDeletedBlockInfoProto> list = sBlock.getBlocksList();
+ ReceivedDeletedBlockInfo[] rdBlocks =
+ new ReceivedDeletedBlockInfo[list.size()];
+ for (int j = 0; j < list.size(); j++) {
+ rdBlocks[j] = PBHelper.convert(list.get(j));
+ }
+ info[i] = new StorageReceivedDeletedBlocks(sBlock.getStorageID(), rdBlocks);
}
try {
impl.blockReceivedAndDeleted(PBHelper.convert(request.getRegistration()),
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1241519&r1=1241518&r2=1241519&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Tue Feb 7 16:59:48 2012
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.ContentSumma
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@@ -52,10 +53,13 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto.StorageState;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.UpgradeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@@ -108,6 +112,8 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -118,6 +124,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
@@ -1236,4 +1243,41 @@ public class PBHelper {
setSpaceQuota(cs.getSpaceQuota()).
build();
}
+
+ public static DatanodeStorageProto convert(DatanodeStorage s) {
+ return DatanodeStorageProto.newBuilder()
+ .setState(PBHelper.convert(s.getState()))
+ .setStorageID(s.getStorageID()).build();
+ }
+
+ private static StorageState convert(State state) {
+ switch(state) {
+ case READ_ONLY:
+ return StorageState.READ_ONLY;
+ case NORMAL:
+ default:
+ return StorageState.NORMAL;
+ }
+ }
+
+ public static DatanodeStorage convert(DatanodeStorageProto s) {
+ return new DatanodeStorage(s.getStorageID(), PBHelper.convert(s.getState()));
+ }
+
+ private static State convert(StorageState state) {
+ switch(state) {
+ case READ_ONLY:
+ return DatanodeStorage.State.READ_ONLY;
+ case NORMAL:
+ default:
+ return DatanodeStorage.State.NORMAL;
+ }
+ }
+
+ public static StorageReportProto convert(StorageReport r) {
+ return StorageReportProto.newBuilder()
+ .setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity())
+ .setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining())
+ .setStorageID(r.getStorageID()).build();
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1241519&r1=1241518&r2=1241519&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Tue Feb 7 16:59:48 2012
@@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -47,14 +46,17 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
@@ -295,8 +297,10 @@ class BPOfferService implements Runnable
}
}
if (receivedAndDeletedBlockArray != null) {
+ StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
+ bpRegistration.getStorageID(), receivedAndDeletedBlockArray) };
bpNamenode.blockReceivedAndDeleted(bpRegistration, getBlockPoolId(),
- receivedAndDeletedBlockArray);
+ report);
synchronized (receivedAndDeletedBlockList) {
for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) {
receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]);
@@ -365,8 +369,9 @@ class BPOfferService implements Runnable
// Send block report
long brSendStartTime = now();
- cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), bReport
- .getBlockListAsLongs());
+ StorageBlockReport[] report = { new StorageBlockReport(
+ bpRegistration.getStorageID(), bReport.getBlockListAsLongs()) };
+ cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), report);
// Log the block report processing stats from Datanode perspective
long brSendCost = now() - brSendStartTime;
@@ -398,11 +403,11 @@ class BPOfferService implements Runnable
DatanodeCommand [] sendHeartBeat() throws IOException {
- return bpNamenode.sendHeartbeat(bpRegistration,
- dn.data.getCapacity(),
- dn.data.getDfsUsed(),
- dn.data.getRemaining(),
- dn.data.getBlockPoolUsed(getBlockPoolId()),
+ // reports number of failed volumes
+ StorageReport[] report = { new StorageReport(bpRegistration.getStorageID(),
+ false, dn.data.getCapacity(), dn.data.getDfsUsed(),
+ dn.data.getRemaining(), dn.data.getBlockPoolUsed(getBlockPoolId())) };
+ return bpNamenode.sendHeartbeat(bpRegistration, report,
dn.xmitsInProgress.get(),
dn.getXceiverCount(), dn.data.getNumFailedVolumes());
}
@@ -572,7 +577,8 @@ class BPOfferService implements Runnable
while (shouldRun()) {
try {
// Use returned registration from namenode with updated machine name.
- bpRegistration = bpNamenode.registerDatanode(bpRegistration);
+ bpRegistration = bpNamenode.registerDatanode(bpRegistration,
+ new DatanodeStorage[0]);
break;
} catch(SocketTimeoutException e) { // namenode is busy
LOG.info("Problem connecting to server: " + nnAddr);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1241519&r1=1241518&r2=1241519&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Tue Feb 7 16:59:48 2012
@@ -87,6 +87,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -94,8 +95,10 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
@@ -841,8 +844,8 @@ class NameNodeRpcServer implements Namen
@Override // DatanodeProtocol
- public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
- throws IOException {
+ public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg,
+ DatanodeStorage[] storages) throws IOException {
verifyVersion(nodeReg.getVersion());
namesystem.registerDatanode(nodeReg);
@@ -851,19 +854,20 @@ class NameNodeRpcServer implements Namen
@Override // DatanodeProtocol
public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg,
- long capacity, long dfsUsed, long remaining, long blockPoolUsed,
- int xmitsInProgress, int xceiverCount, int failedVolumes)
- throws IOException {
+ StorageReport[] report, int xmitsInProgress, int xceiverCount,
+ int failedVolumes) throws IOException {
verifyRequest(nodeReg);
- return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining,
- blockPoolUsed, xceiverCount, xmitsInProgress, failedVolumes);
+ return namesystem.handleHeartbeat(nodeReg, report[0].getCapacity(),
+ report[0].getDfsUsed(), report[0].getRemaining(),
+ report[0].getBlockPoolUsed(), xceiverCount, xmitsInProgress,
+ failedVolumes);
}
@Override // DatanodeProtocol
public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
- String poolId, long[] blocks) throws IOException {
+ String poolId, StorageBlockReport[] reports) throws IOException {
verifyRequest(nodeReg);
- BlockListAsLongs blist = new BlockListAsLongs(blocks);
+ BlockListAsLongs blist = new BlockListAsLongs(reports[0].getBlocks());
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
+ "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks()
@@ -878,7 +882,7 @@ class NameNodeRpcServer implements Namen
@Override // DatanodeProtocol
public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
- ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException {
+ StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {
verifyRequest(nodeReg);
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
@@ -886,7 +890,7 @@ class NameNodeRpcServer implements Namen
+" blocks.");
}
namesystem.getBlockManager().blockReceivedAndDeleted(
- nodeReg, poolId, receivedAndDeletedBlocks);
+ nodeReg, poolId, receivedAndDeletedBlocks[0].getBlocks());
}
@Override // DatanodeProtocol
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1241519&r1=1241518&r2=1241519&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Tue Feb 7 16:59:48 2012
@@ -80,13 +80,16 @@ public interface DatanodeProtocol extend
* Register Datanode.
*
* @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem#registerDatanode(DatanodeRegistration)
- *
+ * @param registration datanode registration information
+ * @param storages list of storages on the datanode``
* @return updated {@link org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration}, which contains
* new storageID if the datanode did not have one and
* registration ID for further communication.
*/
- public DatanodeRegistration registerDatanode(DatanodeRegistration registration
- ) throws IOException;
+ public DatanodeRegistration registerDatanode(
+ DatanodeRegistration registration, DatanodeStorage[] storages)
+ throws IOException;
+
/**
* sendHeartbeat() tells the NameNode that the DataNode is still
* alive and well. Includes some status info, too.
@@ -95,19 +98,14 @@ public interface DatanodeProtocol extend
* A DatanodeCommand tells the DataNode to invalidate local block(s),
* or to copy them to other DataNodes, etc.
* @param registration datanode registration information
- * @param capacity total storage capacity available at the datanode
- * @param dfsUsed storage used by HDFS
- * @param remaining remaining storage available for HDFS
- * @param blockPoolUsed storage used by the block pool
+ * @param reports utilization report per storage
* @param xmitsInProgress number of transfers from this datanode to others
* @param xceiverCount number of active transceiver threads
* @param failedVolumes number of failed volumes
* @throws IOException on error
*/
public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
- long capacity,
- long dfsUsed, long remaining,
- long blockPoolUsed,
+ StorageReport[] reports,
int xmitsInProgress,
int xceiverCount,
int failedVolumes) throws IOException;
@@ -120,7 +118,7 @@ public interface DatanodeProtocol extend
* infrequently afterwards.
* @param registration
* @param poolId - the block pool ID for the blocks
- * @param blocks - the block list as an array of longs.
+ * @param reports - report of blocks per storage
* Each block is represented as 2 longs.
* This is done instead of Block[] to reduce memory used by block reports.
*
@@ -128,8 +126,7 @@ public interface DatanodeProtocol extend
* @throws IOException
*/
public DatanodeCommand blockReport(DatanodeRegistration registration,
- String poolId,
- long[] blocks) throws IOException;
+ String poolId, StorageBlockReport[] reports) throws IOException;
/**
* blockReceivedAndDeleted() allows the DataNode to tell the NameNode about
@@ -143,7 +140,7 @@ public interface DatanodeProtocol extend
*/
public void blockReceivedAndDeleted(DatanodeRegistration registration,
String poolId,
- ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks)
+ StorageReceivedDeletedBlocks[] rcvdAndDeletedBlocks)
throws IOException;
/**
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java?rev=1241519&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java Tue Feb 7 16:59:48 2012
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+/**
+ * Class capatures information about a storage in Datanode
+ */
+public class DatanodeStorage {
+ public enum State {
+ NORMAL,
+ READ_ONLY
+ }
+
+ private final String storageID;
+ private final State state;
+
+ public DatanodeStorage(String sid, State s) {
+ storageID = sid;
+ state = s;
+ }
+
+ public String getStorageID() {
+ return storageID;
+ }
+
+ public State getState() {
+ return state;
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java?rev=1241519&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java Tue Feb 7 16:59:48 2012
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.protocol;
+
+/**
+ * Block report for a Datanode storage
+ */
+public class StorageBlockReport {
+ private final String storageID;
+ private final long[] blocks;
+
+ public StorageBlockReport(String sid, long[] blocks) {
+ this.storageID = sid;
+ this.blocks = blocks;
+ }
+
+ public String getStorageID() {
+ return storageID;
+ }
+
+ public long[] getBlocks() {
+ return blocks;
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java?rev=1241519&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java Tue Feb 7 16:59:48 2012
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.protocol;
+
+/**
+ * Report of block received and deleted per Datanode
+ * storage.
+ */
+public class StorageReceivedDeletedBlocks {
+ private final String storageID;
+ private final ReceivedDeletedBlockInfo[] blocks;
+
+ public String getStorageID() {
+ return storageID;
+ }
+
+ public ReceivedDeletedBlockInfo[] getBlocks() {
+ return blocks;
+ }
+
+ public StorageReceivedDeletedBlocks(final String storageID,
+ final ReceivedDeletedBlockInfo[] blocks) {
+ this.storageID = storageID;
+ this.blocks = blocks;
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java?rev=1241519&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java Tue Feb 7 16:59:48 2012
@@ -0,0 +1,47 @@
+package org.apache.hadoop.hdfs.server.protocol;
+
+/**
+ * Utilization report for a Datanode storage
+ */
+public class StorageReport {
+ private final String storageID;
+ private final boolean failed;
+ private final long capacity;
+ private final long dfsUsed;
+ private final long remaining;
+ private final long blockPoolUsed;
+
+ public StorageReport(String sid, boolean failed, long capacity, long dfsUsed,
+ long remaining, long bpUsed) {
+ this.storageID = sid;
+ this.failed = failed;
+ this.capacity = capacity;
+ this.dfsUsed = dfsUsed;
+ this.remaining = remaining;
+ this.blockPoolUsed = bpUsed;
+ }
+
+ public String getStorageID() {
+ return storageID;
+ }
+
+ public boolean isFailed() {
+ return failed;
+ }
+
+ public long getCapacity() {
+ return capacity;
+ }
+
+ public long getDfsUsed() {
+ return dfsUsed;
+ }
+
+ public long getRemaining() {
+ return remaining;
+ }
+
+ public long getBlockPoolUsed() {
+ return blockPoolUsed;
+ }
+}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1241519&r1=1241518&r2=1241519&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Tue Feb 7 16:59:48 2012
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.log4j.Level;
@@ -146,8 +147,9 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N0);
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
- cluster.getNameNodeRpc().blockReport(dnR, poolId,
- new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+ StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
+ new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
+ cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
List<LocatedBlock> blocksAfterReport =
DFSTestUtil.getAllBlocks(fs.open(filePath));
@@ -180,7 +182,7 @@ public class TestBlockReport {
Path filePath = new Path("/" + METHOD_NAME + ".dat");
DFSTestUtil.createFile(fs, filePath,
- (long) FILE_SIZE, REPL_FACTOR, rand.nextLong());
+ FILE_SIZE, REPL_FACTOR, rand.nextLong());
// mock around with newly created blocks and delete some
File dataDir = new File(cluster.getDataDirectory());
@@ -226,8 +228,9 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N0);
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
- cluster.getNameNodeRpc().blockReport(dnR, poolId,
- new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+ StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
+ new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
+ cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
.getBlockManager());
@@ -266,9 +269,10 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N0);
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+ StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
+ new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
DatanodeCommand dnCmd =
- cluster.getNameNodeRpc().blockReport(dnR, poolId,
- new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+ cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
if(LOG.isDebugEnabled()) {
LOG.debug("Got the command: " + dnCmd);
}
@@ -284,9 +288,8 @@ public class TestBlockReport {
* This test isn't a representative case for BlockReport
* The empty method is going to be left here to keep the naming
* of the test plan in synch with the actual implementation
- * @throws IOException in case of errors
*/
- public void blockReport_04() throws IOException {
+ public void blockReport_04() {
}
// Client requests new block from NN. The test corrupts this very block
@@ -295,7 +298,7 @@ public class TestBlockReport {
// BlockScanner which is out of scope of this test
// Keeping the name to be in synch with the test plan
//
- public void blockReport_05() throws IOException {
+ public void blockReport_05() {
}
/**
@@ -319,8 +322,9 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
- cluster.getNameNodeRpc().blockReport(dnR, poolId,
- new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+ StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
+ new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
+ cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
printStats();
assertEquals("Wrong number of PendingReplication Blocks",
0, cluster.getNamesystem().getUnderReplicatedBlocks());
@@ -368,8 +372,9 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
- cluster.getNameNodeRpc().blockReport(dnR, poolId,
- new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+ StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
+ new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
+ cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
printStats();
assertEquals("Wrong number of Corrupted blocks",
1, cluster.getNamesystem().getCorruptReplicaBlocks() +
@@ -390,8 +395,9 @@ public class TestBlockReport {
LOG.debug("Done corrupting length of " + corruptedBlock.getBlockName());
}
- cluster.getNameNodeRpc().blockReport(dnR, poolId,
+ report[0] = new StorageBlockReport(dnR.getStorageID(),
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+ cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
printStats();
assertEquals("Wrong number of Corrupted blocks",
@@ -440,8 +446,9 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
- cluster.getNameNodeRpc().blockReport(dnR, poolId,
- new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+ StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
+ new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
+ cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
printStats();
assertEquals("Wrong number of PendingReplication blocks",
blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks());
@@ -486,8 +493,9 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
- cluster.getNameNodeRpc().blockReport(dnR, poolId,
- new BlockListAsLongs(blocks, null).getBlockListAsLongs());
+ StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
+ new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
+ cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
printStats();
assertEquals("Wrong number of PendingReplication blocks",
2, cluster.getNamesystem().getPendingReplicationBlocks());
@@ -550,7 +558,7 @@ public class TestBlockReport {
.when(spy).blockReport(
Mockito.<DatanodeRegistration>anyObject(),
Mockito.anyString(),
- Mockito.<long[]>anyObject());
+ Mockito.<StorageBlockReport[]>anyObject());
// Force a block report to be generated. The block report will have
// an RBW replica in it. Wait for the RPC to be sent, but block
@@ -638,8 +646,7 @@ public class TestBlockReport {
// Write file and start second data node.
private ArrayList<Block> writeFile(final String METHOD_NAME,
final long fileSize,
- Path filePath)
- throws IOException {
+ Path filePath) {
ArrayList<Block> blocks = null;
try {
REPL_FACTOR = 2;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1241519&r1=1241518&r2=1241519&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Tue Feb 7 16:59:48 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.net.NetUtils;
@@ -144,8 +145,9 @@ public class TestDataNodeVolumeFailure {
DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
String bpid = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
- long[] bReport = dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs();
- cluster.getNameNodeRpc().blockReport(dnR, bpid, bReport);
+ StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
+ dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs()) };
+ cluster.getNameNodeRpc().blockReport(dnR, bpid, report);
// verify number of blocks and files...
verify(filename, filesize);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=1241519&r1=1241518&r2=1241519&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Tue Feb 7 16:59:48 2012
@@ -25,8 +25,6 @@ import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
-import javax.security.auth.login.LoginException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
@@ -46,9 +44,13 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.EnumSetWritable;
@@ -104,7 +106,7 @@ public class NNThroughputBenchmark {
static NameNode nameNode;
static NamenodeProtocols nameNodeProto;
- NNThroughputBenchmark(Configuration conf) throws IOException, LoginException {
+ NNThroughputBenchmark(Configuration conf) throws IOException {
config = conf;
// We do not need many handlers, since each thread simulates a handler
// by calling name-node methods directly
@@ -125,7 +127,7 @@ public class NNThroughputBenchmark {
nameNodeProto = nameNode.getRpcServer();
}
- void close() throws IOException {
+ void close() {
nameNode.stop();
}
@@ -795,7 +797,10 @@ public class NNThroughputBenchmark {
dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
DataNode.setNewStorageID(dnRegistration);
// register datanode
- dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
+
+ DatanodeStorage[] storages = { new DatanodeStorage(
+ dnRegistration.getStorageID(), DatanodeStorage.State.NORMAL) };
+ dnRegistration = nameNodeProto.registerDatanode(dnRegistration, storages);
}
/**
@@ -805,8 +810,10 @@ public class NNThroughputBenchmark {
void sendHeartbeat() throws IOException {
// register datanode
// TODO:FEDERATION currently a single block pool is supported
+ StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(),
+ false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
- DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0);
+ rep, 0, 0, 0);
if(cmds != null) {
for (DatanodeCommand cmd : cmds ) {
if(LOG.isDebugEnabled()) {
@@ -849,9 +856,10 @@ public class NNThroughputBenchmark {
@SuppressWarnings("unused") // keep it for future blockReceived benchmark
int replicateBlocks() throws IOException {
// register datanode
- // TODO:FEDERATION currently a single block pool is supported
+ StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(),
+ false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
- DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0);
+ rep, 0, 0, 0);
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
@@ -881,10 +889,12 @@ public class NNThroughputBenchmark {
receivedDNReg.setStorageInfo(
new DataStorage(nsInfo, dnInfo.getStorageID()));
receivedDNReg.setInfoPort(dnInfo.getInfoPort());
+ ReceivedDeletedBlockInfo[] rdBlocks = { new ReceivedDeletedBlockInfo(
+ blocks[i], DataNode.EMPTY_DEL_HINT) };
+ StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
+ receivedDNReg.getStorageID(), rdBlocks) };
nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode
- .getNamesystem().getBlockPoolId(),
- new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
- blocks[i], DataNode.EMPTY_DEL_HINT) });
+ .getNamesystem().getBlockPoolId(), report);
}
}
return blocks.length;
@@ -916,7 +926,7 @@ public class NNThroughputBenchmark {
config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3 * 60);
parseArguments(args);
// adjust replication to the number of data-nodes
- this.replication = (short)Math.min((int)replication, getNumDatanodes());
+ this.replication = (short)Math.min(replication, getNumDatanodes());
}
/**
@@ -996,10 +1006,12 @@ public class NNThroughputBenchmark {
for(DatanodeInfo dnInfo : loc.getLocations()) {
int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock());
+ ReceivedDeletedBlockInfo[] rdBlocks = { new ReceivedDeletedBlockInfo(
+ loc.getBlock().getLocalBlock(), "") };
+ StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
+ datanodes[dnIdx].dnRegistration.getStorageID(), rdBlocks) };
nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
- .getBlock().getBlockPoolId(),
- new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(loc
- .getBlock().getLocalBlock(), "") });
+ .getBlock().getBlockPoolId(), report);
}
}
return prevBlock;
@@ -1016,8 +1028,10 @@ public class NNThroughputBenchmark {
assert daemonId < numThreads : "Wrong daemonId.";
TinyDatanode dn = datanodes[daemonId];
long start = System.currentTimeMillis();
+ StorageBlockReport[] report = { new StorageBlockReport(
+ dn.dnRegistration.getStorageID(), dn.getBlockReportList()) };
nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem()
- .getBlockPoolId(), dn.getBlockReportList());
+ .getBlockPoolId(), report);
long end = System.currentTimeMillis();
return end-start;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java?rev=1241519&r1=1241518&r2=1241519&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java Tue Feb 7 16:59:48 2012
@@ -38,6 +38,9 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.junit.After;
import org.junit.Test;
@@ -108,19 +111,22 @@ public class TestDeadDatanode {
ReceivedDeletedBlockInfo[] blocks = { new ReceivedDeletedBlockInfo(
new Block(0), "") };
+ StorageReceivedDeletedBlocks[] storageBlocks = {
+ new StorageReceivedDeletedBlocks(reg.getStorageID(), blocks) };
// Ensure blockReceived call from dead datanode is rejected with IOException
try {
- dnp.blockReceivedAndDeleted(reg, poolId, blocks);
+ dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks);
Assert.fail("Expected IOException is not thrown");
} catch (IOException ex) {
// Expected
}
// Ensure blockReport from dead datanode is rejected with IOException
- long[] blockReport = new long[] { 0L, 0L, 0L };
+ StorageBlockReport[] report = { new StorageBlockReport(reg.getStorageID(),
+ new long[] { 0L, 0L, 0L }) };
try {
- dnp.blockReport(reg, poolId, blockReport);
+ dnp.blockReport(reg, poolId, report);
Assert.fail("Expected IOException is not thrown");
} catch (IOException ex) {
// Expected
@@ -128,7 +134,9 @@ public class TestDeadDatanode {
// Ensure heartbeat from dead datanode is rejected with a command
// that asks datanode to register again
- DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, 0, 0, 0, 0, 0, 0, 0);
+ StorageReport[] rep = { new StorageReport(reg.getStorageID(), false, 0, 0,
+ 0, 0) };
+ DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0, 0, 0);
Assert.assertEquals(1, cmd.length);
Assert.assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
.getAction());