You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/05/29 19:49:09 UTC
[1/3] hadoop git commit: HDDS-81. Moving ContainerReport inside
Datanode heartbeat. Contributed by Nanda Kumar.
Repository: hadoop
Updated Branches:
refs/heads/trunk 4827e9a90 -> 201440b98
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationDatanodeStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationDatanodeStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationDatanodeStateManager.java
deleted file mode 100644
index 50fd18f..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationDatanodeStateManager.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.container.testutils;
-
-import com.google.common.primitives.Longs;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.node.NodePoolManager;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerInfo;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
- .HEALTHY;
-
-/**
- * This class manages the state of datanode
- * in conjunction with the node pool and node managers.
- */
-public class ReplicationDatanodeStateManager {
- private final NodeManager nodeManager;
- private final NodePoolManager poolManager;
- private final Random r;
-
- /**
- * The datanode state Manager.
- *
- * @param nodeManager
- * @param poolManager
- */
- public ReplicationDatanodeStateManager(NodeManager nodeManager,
- NodePoolManager poolManager) {
- this.nodeManager = nodeManager;
- this.poolManager = poolManager;
- r = new Random();
- }
-
- /**
- * Get Container Report as if it is from a datanode in the cluster.
- * @param containerID - Container ID.
- * @param poolName - Pool Name.
- * @param dataNodeCount - Datanode Count.
- * @return List of Container Reports.
- */
- public List<ContainerReportsRequestProto> getContainerReport(
- long containerID, String poolName, int dataNodeCount) {
- List<ContainerReportsRequestProto> containerList = new LinkedList<>();
- List<DatanodeDetails> nodesInPool = poolManager.getNodes(poolName);
-
- if (nodesInPool == null) {
- return containerList;
- }
-
- if (nodesInPool.size() < dataNodeCount) {
- throw new IllegalStateException("Not enough datanodes to create " +
- "required container reports");
- }
-
- while (containerList.size() < dataNodeCount && nodesInPool.size() > 0) {
- DatanodeDetails id = nodesInPool.get(r.nextInt(nodesInPool.size()));
- nodesInPool.remove(id);
- containerID++;
- // We return container reports only for nodes that are healthy.
- if (nodeManager.getNodeState(id) == HEALTHY) {
- ContainerInfo info = ContainerInfo.newBuilder()
- .setContainerID(containerID)
- .setFinalhash(DigestUtils.sha256Hex(
- Longs.toByteArray(containerID)))
- .setContainerID(containerID)
- .build();
- ContainerReportsRequestProto containerReport =
- ContainerReportsRequestProto.newBuilder().addReports(info)
- .setDatanodeDetails(id.getProtoBufMessage())
- .setType(ContainerReportsRequestProto.reportType.fullReport)
- .build();
- containerList.add(containerReport);
- }
- }
- return containerList;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 3f814d0..072d821 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -24,13 +24,13 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodePoolManager;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.mockito.Mockito;
@@ -277,12 +277,12 @@ public class ReplicationNodeManagerMock implements NodeManager {
* Register the node if the node finds that it is not registered with any SCM.
*
* @param dd DatanodeDetailsProto
- * @param nodeReport SCMNodeReport
+ * @param nodeReport NodeReportProto
* @return SCMHeartbeatResponseProto
*/
@Override
- public SCMCommand register(HddsProtos.DatanodeDetailsProto dd,
- SCMNodeReport nodeReport) {
+ public RegisteredCommand register(DatanodeDetails dd,
+ NodeReportProto nodeReport) {
return null;
}
@@ -294,8 +294,8 @@ public class ReplicationNodeManagerMock implements NodeManager {
* @return SCMheartbeat response list
*/
@Override
- public List<SCMCommand> sendHeartbeat(HddsProtos.DatanodeDetailsProto dd,
- SCMNodeReport nodeReport) {
+ public List<SCMCommand> sendHeartbeat(DatanodeDetails dd,
+ NodeReportProto nodeReport) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index a0d41a8..0c1d8f2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -32,8 +32,10 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
@@ -302,12 +304,11 @@ public class TestStorageContainerManager {
NodeManager nodeManager = cluster.getStorageContainerManager()
.getScmNodeManager();
List<SCMCommand> commands = nodeManager.sendHeartbeat(
- nodeManager.getNodes(NodeState.HEALTHY).get(0).getProtoBufMessage(),
- null);
+ nodeManager.getNodes(NodeState.HEALTHY).get(0), null);
if (commands != null) {
for (SCMCommand cmd : commands) {
- if (cmd.getType() == SCMCmdType.deleteBlocksCommand) {
+ if (cmd.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
List<DeletedBlocksTransaction> deletedTXs =
((DeleteBlocksCommand) cmd).blocksTobeDeleted();
return deletedTXs != null && deletedTXs.size() == limitSize;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
index 1d19bb3..1dbe760 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
@@ -32,8 +32,10 @@ import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
@@ -75,11 +77,11 @@ public class TestSCMMetrics {
ContainerStat stat = new ContainerStat(size, used, keyCount, readBytes,
writeBytes, readCount, writeCount);
StorageContainerManager scmManager = cluster.getStorageContainerManager();
-
- ContainerReportsRequestProto request = createContainerReport(numReport,
- stat, null);
- String fstDatanodeUuid = request.getDatanodeDetails().getUuid();
- scmManager.getDatanodeProtocolServer().sendContainerReport(request);
+ DatanodeDetails fstDatanodeDetails = TestUtils.getDatanodeDetails();
+ ContainerReportsProto request = createContainerReport(numReport, stat);
+ String fstDatanodeUuid = fstDatanodeDetails.getUuidString();
+ scmManager.getDatanodeProtocolServer().processContainerReports(
+ fstDatanodeDetails, request);
// verify container stat metrics
MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
@@ -100,9 +102,11 @@ public class TestSCMMetrics {
getLongGauge("LastContainerReportWriteCount", scmMetrics));
// add one new report
- request = createContainerReport(1, stat, null);
- String sndDatanodeUuid = request.getDatanodeDetails().getUuid();
- scmManager.getDatanodeProtocolServer().sendContainerReport(request);
+ DatanodeDetails sndDatanodeDetails = TestUtils.getDatanodeDetails();
+ request = createContainerReport(1, stat);
+ String sndDatanodeUuid = sndDatanodeDetails.getUuidString();
+ scmManager.getDatanodeProtocolServer().processContainerReports(
+ sndDatanodeDetails, request);
scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
assertEquals(size * (numReport + 1),
@@ -124,12 +128,12 @@ public class TestSCMMetrics {
// Re-send reports but with different value for validating
// the aggregation.
stat = new ContainerStat(100, 50, 3, 50, 60, 5, 6);
- scmManager.getDatanodeProtocolServer().sendContainerReport(
- createContainerReport(1, stat, fstDatanodeUuid));
+ scmManager.getDatanodeProtocolServer().processContainerReports(
+ fstDatanodeDetails, createContainerReport(1, stat));
stat = new ContainerStat(1, 1, 1, 1, 1, 1, 1);
- scmManager.getDatanodeProtocolServer().sendContainerReport(
- createContainerReport(1, stat, sndDatanodeUuid));
+ scmManager.getDatanodeProtocolServer().processContainerReports(
+ sndDatanodeDetails, createContainerReport(1, stat));
// the global container metrics value should be updated
scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
@@ -170,11 +174,11 @@ public class TestSCMMetrics {
writeBytes, readCount, writeCount);
StorageContainerManager scmManager = cluster.getStorageContainerManager();
- String datanodeUuid = cluster.getHddsDatanodes().get(0)
- .getDatanodeDetails().getUuidString();
- ContainerReportsRequestProto request = createContainerReport(numReport,
- stat, datanodeUuid);
- scmManager.getDatanodeProtocolServer().sendContainerReport(request);
+ DatanodeDetails datanodeDetails = cluster.getHddsDatanodes().get(0)
+ .getDatanodeDetails();
+ ContainerReportsProto request = createContainerReport(numReport, stat);
+ scmManager.getDatanodeProtocolServer().processContainerReports(
+ datanodeDetails, request);
MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
assertEquals(size * numReport,
@@ -216,11 +220,11 @@ public class TestSCMMetrics {
}
}
- private ContainerReportsRequestProto createContainerReport(int numReport,
- ContainerStat stat, String datanodeUuid) {
- StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder
+ private ContainerReportsProto createContainerReport(int numReport,
+ ContainerStat stat) {
+ StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder
reportsBuilder = StorageContainerDatanodeProtocolProtos
- .ContainerReportsRequestProto.newBuilder();
+ .ContainerReportsProto.newBuilder();
for (int i = 0; i < numReport; i++) {
ContainerReport report = new ContainerReport(
@@ -234,24 +238,6 @@ public class TestSCMMetrics {
report.setWriteBytes(stat.getWriteBytes().get());
reportsBuilder.addReports(report.getProtoBufMessage());
}
-
- DatanodeDetails datanodeDetails;
- if (datanodeUuid == null) {
- datanodeDetails = TestUtils.getDatanodeDetails();
- } else {
- datanodeDetails = DatanodeDetails.newBuilder()
- .setUuid(datanodeUuid)
- .setIpAddress("127.0.0.1")
- .setHostName("localhost")
- .setContainerPort(0)
- .setRatisPort(0)
- .setOzoneRestPort(0)
- .build();
- }
-
- reportsBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage());
- reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
- .ContainerReportsRequestProto.reportType.fullReport);
return reportsBuilder.build();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[3/3] hadoop git commit: HDDS-81. Moving ContainerReport inside
Datanode heartbeat. Contributed by Nanda Kumar.
Posted by ae...@apache.org.
HDDS-81. Moving ContainerReport inside Datanode heartbeat.
Contributed by Nanda Kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/201440b9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/201440b9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/201440b9
Branch: refs/heads/trunk
Commit: 201440b987d5ef3910c2045b2411c213ed6eec1f
Parents: 4827e9a
Author: Anu Engineer <ae...@apache.org>
Authored: Tue May 29 12:40:27 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Tue May 29 12:48:50 2018 -0700
----------------------------------------------------------------------
.../common/impl/ContainerManagerImpl.java | 22 +-
.../common/impl/StorageLocationReport.java | 8 +-
.../common/interfaces/ContainerManager.java | 8 +-
.../statemachine/DatanodeStateMachine.java | 7 +-
.../common/statemachine/StateContext.java | 16 +-
.../CloseContainerCommandHandler.java | 113 ++++++++
.../commandhandler/CloseContainerHandler.java | 113 --------
.../commandhandler/CommandDispatcher.java | 5 +-
.../commandhandler/CommandHandler.java | 8 +-
.../DeleteBlocksCommandHandler.java | 12 +-
.../states/endpoint/HeartbeatEndpointTask.java | 30 +-
.../states/endpoint/RegisterEndpointTask.java | 12 +-
.../container/ozoneimpl/OzoneContainer.java | 10 +-
.../StorageContainerDatanodeProtocol.java | 30 +-
.../protocol/StorageContainerNodeProtocol.java | 15 +-
.../commands/CloseContainerCommand.java | 18 +-
.../protocol/commands/DeleteBlocksCommand.java | 18 +-
.../protocol/commands/RegisteredCommand.java | 26 +-
.../protocol/commands/ReregisterCommand.java | 16 +-
.../ozone/protocol/commands/SCMCommand.java | 4 +-
...rDatanodeProtocolClientSideTranslatorPB.java | 50 +---
...rDatanodeProtocolServerSideTranslatorPB.java | 53 ++--
.../StorageContainerDatanodeProtocol.proto | 256 ++++++++---------
.../ozone/container/common/ScmTestMock.java | 78 ++----
.../hdds/scm/container/ContainerMapping.java | 10 +-
.../hadoop/hdds/scm/container/Mapping.java | 6 +-
.../replication/ContainerSupervisor.java | 13 +-
.../container/replication/InProgressPool.java | 15 +-
.../hdds/scm/node/HeartbeatQueueItem.java | 14 +-
.../hadoop/hdds/scm/node/SCMNodeManager.java | 58 ++--
.../hdds/scm/node/SCMNodeStorageStatMap.java | 14 +-
.../scm/server/SCMDatanodeProtocolServer.java | 195 +++++++------
.../org/apache/hadoop/hdds/scm/TestUtils.java | 19 +-
.../hdds/scm/container/MockNodeManager.java | 26 +-
.../scm/container/TestContainerMapping.java | 24 +-
.../container/closer/TestContainerCloser.java | 12 +-
.../hdds/scm/node/TestContainerPlacement.java | 6 +-
.../hadoop/hdds/scm/node/TestNodeManager.java | 83 +++---
.../scm/node/TestSCMNodeStorageStatMap.java | 16 +-
.../ozone/container/common/TestEndPoint.java | 113 ++------
.../replication/TestContainerSupervisor.java | 275 -------------------
.../ReplicationDatanodeStateManager.java | 101 -------
.../testutils/ReplicationNodeManagerMock.java | 14 +-
.../ozone/TestStorageContainerManager.java | 11 +-
.../apache/hadoop/ozone/scm/TestSCMMetrics.java | 68 ++---
45 files changed, 706 insertions(+), 1315 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
index 9355364..af47015 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
@@ -35,11 +35,11 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+ .StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
@@ -854,11 +854,11 @@ public class ContainerManagerImpl implements ContainerManager {
* @return node report.
*/
@Override
- public SCMNodeReport getNodeReport() throws IOException {
+ public NodeReportProto getNodeReport() throws IOException {
StorageLocationReport[] reports = locationManager.getLocationReport();
- SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
+ NodeReportProto.Builder nrb = NodeReportProto.newBuilder();
for (int i = 0; i < reports.length; i++) {
- SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+ StorageReportProto.Builder srb = StorageReportProto.newBuilder();
nrb.addStorageReport(reports[i].getProtoBufMessage());
}
return nrb.build();
@@ -891,7 +891,7 @@ public class ContainerManagerImpl implements ContainerManager {
* @throws IOException
*/
@Override
- public ContainerReportsRequestProto getContainerReport() throws IOException {
+ public ContainerReportsProto getContainerReport() throws IOException {
LOG.debug("Starting container report iteration.");
// No need for locking since containerMap is a ConcurrentSkipListMap
// And we can never get the exact state since close might happen
@@ -899,12 +899,8 @@ public class ContainerManagerImpl implements ContainerManager {
List<ContainerData> containers = containerMap.values().stream()
.collect(Collectors.toList());
- ContainerReportsRequestProto.Builder crBuilder =
- ContainerReportsRequestProto.newBuilder();
-
- // TODO: support delta based container report
- crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
- .setType(ContainerReportsRequestProto.reportType.fullReport);
+ ContainerReportsProto.Builder crBuilder =
+ ContainerReportsProto.newBuilder();
for (ContainerData container: containers) {
long containerId = container.getContainerID();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
index a5ad6c2..87b9656 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.ozone.container.common.impl;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdds.protocol.proto.
- StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+ StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.StorageTypeProto;
@@ -137,8 +137,8 @@ public class StorageLocationReport {
* @return SCMStorageReport
* @throws IOException In case, the storage type specified is invalid.
*/
- public SCMStorageReport getProtoBufMessage() throws IOException{
- SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+ public StorageReportProto getProtoBufMessage() throws IOException{
+ StorageReportProto.Builder srb = StorageReportProto.newBuilder();
return srb.setStorageUuid(getId())
.setCapacity(getCapacity())
.setScmUsed(getScmUsed())
@@ -156,7 +156,7 @@ public class StorageLocationReport {
* @throws IOException in case of invalid storage type
*/
- public static StorageLocationReport getFromProtobuf(SCMStorageReport report)
+ public static StorageLocationReport getFromProtobuf(StorageReportProto report)
throws IOException {
StorageLocationReport.Builder builder = StorageLocationReport.newBuilder();
builder.setId(report.getStorageUuid())
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
index ba70953..49b68dc 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
@@ -27,9 +27,9 @@ import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.util.RwLock;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import java.io.IOException;
@@ -171,14 +171,14 @@ public interface ContainerManager extends RwLock {
* Get the Node Report of container storage usage.
* @return node report.
*/
- SCMNodeReport getNodeReport() throws IOException;
+ NodeReportProto getNodeReport() throws IOException;
/**
* Gets container report.
* @return container report.
* @throws IOException
*/
- ContainerReportsRequestProto getContainerReport() throws IOException;
+ ContainerReportsProto getContainerReport() throws IOException;
/**
* Gets container reports.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index a8fe494..d0a4217 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -21,8 +21,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
- .CloseContainerHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CommandDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
@@ -86,7 +85,7 @@ public class DatanodeStateMachine implements Closeable {
// When we add new handlers just adding a new handler here should do the
// trick.
commandDispatcher = CommandDispatcher.newBuilder()
- .addHandler(new CloseContainerHandler())
+ .addHandler(new CloseContainerCommandHandler())
.addHandler(new DeleteBlocksCommandHandler(
container.getContainerManager(), conf))
.setConnectionManager(connectionManager)
@@ -131,7 +130,7 @@ public class DatanodeStateMachine implements Closeable {
try {
LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
nextHB.set(Time.monotonicNow() + heartbeatFrequency);
- context.setReportState(container.getNodeReport());
+ context.setNodeReport(container.getNodeReport());
context.execute(executorService, heartbeatFrequency,
TimeUnit.MILLISECONDS);
now = Time.monotonicNow();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 27eb57e..4e3c610 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.container.common.statemachine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.InitDatanodeState;
@@ -52,7 +52,7 @@ public class StateContext {
private final AtomicLong stateExecutionCount;
private final Configuration conf;
private DatanodeStateMachine.DatanodeStates state;
- private SCMNodeReport nrState;
+ private NodeReportProto dnReport;
/**
* Constructs a StateContext.
@@ -69,7 +69,7 @@ public class StateContext {
commandQueue = new LinkedList<>();
lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0);
- nrState = SCMNodeReport.getDefaultInstance();
+ dnReport = NodeReportProto.getDefaultInstance();
}
/**
@@ -144,16 +144,16 @@ public class StateContext {
* Returns the node report of the datanode state context.
* @return the node report.
*/
- public SCMNodeReport getNodeReport() {
- return nrState;
+ public NodeReportProto getNodeReport() {
+ return dnReport;
}
/**
* Sets the storage location report of the datanode state context.
- * @param nrReport - node report
+ * @param nodeReport node report
*/
- public void setReportState(SCMNodeReport nrReport) {
- this.nrState = nrReport;
+ public void setNodeReport(NodeReportProto nodeReport) {
+ this.dnReport = nodeReport;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
new file mode 100644
index 0000000..e8c602d
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handler for close container command received from SCM.
+ */
+public class CloseContainerCommandHandler implements CommandHandler {
+ static final Logger LOG =
+ LoggerFactory.getLogger(CloseContainerCommandHandler.class);
+ private int invocationCount;
+ private long totalTime;
+
+ /**
+ * Constructs a ContainerReport handler.
+ */
+ public CloseContainerCommandHandler() {
+ }
+
+ /**
+ * Handles a given SCM command.
+ *
+ * @param command - SCM Command
+ * @param container - Ozone Container.
+ * @param context - Current Context.
+ * @param connectionManager - The SCMs that we are talking to.
+ */
+ @Override
+ public void handle(SCMCommand command, OzoneContainer container,
+ StateContext context, SCMConnectionManager connectionManager) {
+ LOG.debug("Processing Close Container command.");
+ invocationCount++;
+ long startTime = Time.monotonicNow();
+ // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA)
+ long containerID = -1;
+ try {
+
+ CloseContainerCommandProto
+ closeContainerProto =
+ CloseContainerCommandProto
+ .parseFrom(command.getProtoBufMessage());
+ containerID = closeContainerProto.getContainerID();
+
+ container.getContainerManager().closeContainer(containerID);
+
+ } catch (Exception e) {
+ LOG.error("Can't close container " + containerID, e);
+ } finally {
+ long endTime = Time.monotonicNow();
+ totalTime += endTime - startTime;
+ }
+ }
+
+ /**
+ * Returns the command type that this command handler handles.
+ *
+ * @return Type
+ */
+ @Override
+ public SCMCommandProto.Type getCommandType() {
+ return SCMCommandProto.Type.closeContainerCommand;
+ }
+
+ /**
+ * Returns number of times this handler has been invoked.
+ *
+ * @return int
+ */
+ @Override
+ public int getInvocationCount() {
+ return invocationCount;
+ }
+
+ /**
+ * Returns the average time this function takes to run.
+ *
+ * @return long
+ */
+ @Override
+ public long getAverageRunTime() {
+ if (invocationCount > 0) {
+ return totalTime / invocationCount;
+ }
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
deleted file mode 100644
index d8adc7d..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCmdType;
-import org.apache.hadoop.ozone.container.common.statemachine
- .SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Container Report handler.
- */
-public class CloseContainerHandler implements CommandHandler {
- static final Logger LOG =
- LoggerFactory.getLogger(CloseContainerHandler.class);
- private int invocationCount;
- private long totalTime;
-
- /**
- * Constructs a ContainerReport handler.
- */
- public CloseContainerHandler() {
- }
-
- /**
- * Handles a given SCM command.
- *
- * @param command - SCM Command
- * @param container - Ozone Container.
- * @param context - Current Context.
- * @param connectionManager - The SCMs that we are talking to.
- */
- @Override
- public void handle(SCMCommand command, OzoneContainer container,
- StateContext context, SCMConnectionManager connectionManager) {
- LOG.debug("Processing Close Container command.");
- invocationCount++;
- long startTime = Time.monotonicNow();
- // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA)
- long containerID = -1;
- try {
-
- SCMCloseContainerCmdResponseProto
- closeContainerProto =
- SCMCloseContainerCmdResponseProto
- .parseFrom(command.getProtoBufMessage());
- containerID = closeContainerProto.getContainerID();
-
- container.getContainerManager().closeContainer(containerID);
-
- } catch (Exception e) {
- LOG.error("Can't close container " + containerID, e);
- } finally {
- long endTime = Time.monotonicNow();
- totalTime += endTime - startTime;
- }
- }
-
- /**
- * Returns the command type that this command handler handles.
- *
- * @return Type
- */
- @Override
- public SCMCmdType getCommandType() {
- return SCMCmdType.closeContainerCommand;
- }
-
- /**
- * Returns number of times this handler has been invoked.
- *
- * @return int
- */
- @Override
- public int getInvocationCount() {
- return invocationCount;
- }
-
- /**
- * Returns the average time this function takes to run.
- *
- * @return long
- */
- @Override
- public long getAverageRunTime() {
- if (invocationCount > 0) {
- return totalTime / invocationCount;
- }
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
index 40feca3..aedd78f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
@@ -18,7 +18,8 @@
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -38,7 +39,7 @@ public final class CommandDispatcher {
static final Logger LOG =
LoggerFactory.getLogger(CommandDispatcher.class);
private final StateContext context;
- private final Map<SCMCmdType, CommandHandler> handlerMap;
+ private final Map<Type, CommandHandler> handlerMap;
private final OzoneContainer container;
private final SCMConnectionManager connectionManager;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
index 13d9f72..60e2dc4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
@@ -17,8 +17,10 @@
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
-import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@@ -42,7 +44,7 @@ public interface CommandHandler {
* Returns the command type that this command handler handles.
* @return Type
*/
- SCMCmdType getCommandType();
+ SCMCommandProto.Type getCommandType();
/**
* Returns number of times this handler has been invoked.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index 5231660..ab69bdc 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
@@ -26,8 +28,6 @@ import org.apache.hadoop.hdds.protocol.proto
.DeleteBlockTransactionResult;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCmdType;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.helpers
@@ -73,10 +73,10 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
@Override
public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
- if (command.getType() != SCMCmdType.deleteBlocksCommand) {
+ if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) {
LOG.warn("Skipping handling command, expected command "
+ "type {} but found {}",
- SCMCmdType.deleteBlocksCommand, command.getType());
+ SCMCommandProto.Type.deleteBlocksCommand, command.getType());
return;
}
LOG.debug("Processing block deletion command.");
@@ -193,8 +193,8 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
}
@Override
- public SCMCmdType getCommandType() {
- return SCMCmdType.deleteBlocksCommand;
+ public SCMCommandProto.Type getCommandType() {
+ return SCMCommandProto.Type.deleteBlocksCommand;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 01b4c72..337cdfb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -23,7 +23,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
+ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.container.common.helpers
@@ -97,8 +99,13 @@ public class HeartbeatEndpointTask
try {
Preconditions.checkState(this.datanodeDetailsProto != null);
+ SCMHeartbeatRequestProto request = SCMHeartbeatRequestProto.newBuilder()
+ .setDatanodeDetails(datanodeDetailsProto)
+ .setNodeReport(context.getNodeReport())
+ .build();
+
SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
- .sendHeartbeat(datanodeDetailsProto, this.context.getNodeReport());
+ .sendHeartbeat(request);
processResponse(reponse, datanodeDetailsProto);
rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
rpcEndpoint.zeroMissedCount();
@@ -125,13 +132,13 @@ public class HeartbeatEndpointTask
*/
private void processResponse(SCMHeartbeatResponseProto response,
final DatanodeDetailsProto datanodeDetails) {
- for (SCMCommandResponseProto commandResponseProto : response
+ Preconditions.checkState(response.getDatanodeUUID()
+ .equalsIgnoreCase(datanodeDetails.getUuid()),
+ "Unexpected datanode ID in the response.");
+ // Verify the response is indeed for this datanode.
+ for (SCMCommandProto commandResponseProto : response
.getCommandsList()) {
- // Verify the response is indeed for this datanode.
- Preconditions.checkState(commandResponseProto.getDatanodeUUID()
- .equalsIgnoreCase(datanodeDetails.getUuid()),
- "Unexpected datanode ID in the response.");
- switch (commandResponseProto.getCmdType()) {
+ switch (commandResponseProto.getCommandType()) {
case reregisterCommand:
if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) {
if (LOG.isDebugEnabled()) {
@@ -148,7 +155,8 @@ public class HeartbeatEndpointTask
break;
case deleteBlocksCommand:
DeleteBlocksCommand db = DeleteBlocksCommand
- .getFromProtobuf(commandResponseProto.getDeleteBlocksProto());
+ .getFromProtobuf(
+ commandResponseProto.getDeleteBlocksCommandProto());
if (!db.blocksTobeDeleted().isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug(DeletedContainerBlocksSummary
@@ -161,7 +169,7 @@ public class HeartbeatEndpointTask
case closeContainerCommand:
CloseContainerCommand closeContainer =
CloseContainerCommand.getFromProtobuf(
- commandResponseProto.getCloseContainerProto());
+ commandResponseProto.getCloseContainerCommandProto());
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM container close request for container {}",
closeContainer.getContainerID());
@@ -170,7 +178,7 @@ public class HeartbeatEndpointTask
break;
default:
throw new IllegalArgumentException("Unknown response : "
- + commandResponseProto.getCmdType().name());
+ + commandResponseProto.getCommandType().name());
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
index 77a7084..12b48ab 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
@@ -24,11 +24,11 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+ .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,11 +104,11 @@ public final class RegisterEndpointTask implements
rpcEndPoint.lock();
try {
- ContainerReportsRequestProto contianerReport = datanodeContainerManager
+ ContainerReportsProto contianerReport = datanodeContainerManager
.getContainerReport();
- SCMNodeReport nodeReport = datanodeContainerManager.getNodeReport();
+ NodeReportProto nodeReport = datanodeContainerManager.getNodeReport();
// TODO : Add responses to the command Queue.
- SCMRegisteredCmdResponseProto response = rpcEndPoint.getEndPoint()
+ SCMRegisteredResponseProto response = rpcEndPoint.getEndPoint()
.register(datanodeDetails.getProtoBufMessage(), nodeReport,
contianerReport);
Preconditions.checkState(UUID.fromString(response.getDatanodeUUID())
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 6758479..b357fef 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -19,14 +19,14 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
@@ -219,7 +219,7 @@ public class OzoneContainer {
/**
* Returns node report of container storage usage.
*/
- public SCMNodeReport getNodeReport() throws IOException {
+ public NodeReportProto getNodeReport() throws IOException {
return this.manager.getNodeReport();
}
@@ -255,7 +255,7 @@ public class OzoneContainer {
* @return - container report.
* @throws IOException
*/
- public ContainerReportsRequestProto getContainerReport() throws IOException {
+ public ContainerReportsProto getContainerReport() throws IOException {
return this.manager.getContainerReport();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
index e2a3bf5..a950a31 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
@@ -19,20 +19,20 @@ package org.apache.hadoop.ozone.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos
.ContainerBlocksDeletionACKResponseProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+ .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.hdds.protocol.proto
@@ -55,13 +55,12 @@ public interface StorageContainerDatanodeProtocol {
/**
* Used by data node to send a Heartbeat.
- * @param datanodeDetails - Datanode Details.
- * @param nodeReport - node report state
+ * @param heartbeat Heartbeat
* @return - SCMHeartbeatResponseProto
* @throws IOException
*/
- SCMHeartbeatResponseProto sendHeartbeat(DatanodeDetailsProto datanodeDetails,
- SCMNodeReport nodeReport) throws IOException;
+ SCMHeartbeatResponseProto sendHeartbeat(SCMHeartbeatRequestProto heartbeat)
+ throws IOException;
/**
* Register Datanode.
@@ -70,20 +69,11 @@ public interface StorageContainerDatanodeProtocol {
* @param containerReportsRequestProto - Container Reports.
* @return SCM Command.
*/
- SCMRegisteredCmdResponseProto register(DatanodeDetailsProto datanodeDetails,
- SCMNodeReport nodeReport, ContainerReportsRequestProto
+ SCMRegisteredResponseProto register(DatanodeDetailsProto datanodeDetails,
+ NodeReportProto nodeReport, ContainerReportsProto
containerReportsRequestProto) throws IOException;
/**
- * Send a container report.
- * @param reports -- Container report.
- * @return container reports response.
- * @throws IOException
- */
- ContainerReportsResponseProto sendContainerReport(
- ContainerReportsRequestProto reports) throws IOException;
-
- /**
* Used by datanode to send block deletion ACK to SCM.
* @param request block deletion transactions.
* @return block deletion transaction response.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
index 14038fb..790f58a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
@@ -18,11 +18,12 @@
package org.apache.hadoop.ozone.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import java.util.List;
@@ -49,11 +50,11 @@ public interface StorageContainerNodeProtocol {
/**
* Register the node if the node finds that it is not registered with any SCM.
* @param datanodeDetails DatanodeDetails
- * @param nodeReport SCMNodeReport
+ * @param nodeReport NodeReportProto
* @return SCMHeartbeatResponseProto
*/
- SCMCommand register(DatanodeDetailsProto datanodeDetails, SCMNodeReport
- nodeReport);
+ RegisteredCommand register(DatanodeDetails datanodeDetails,
+ NodeReportProto nodeReport);
/**
* Send heartbeat to indicate the datanode is alive and doing well.
@@ -61,7 +62,7 @@ public interface StorageContainerNodeProtocol {
* @param nodeReport - node report.
* @return SCMheartbeat response list
*/
- List<SCMCommand> sendHeartbeat(DatanodeDetailsProto datanodeDetails,
- SCMNodeReport nodeReport);
+ List<SCMCommand> sendHeartbeat(DatanodeDetails datanodeDetails,
+ NodeReportProto nodeReport);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
index d1d6488..4f4f82b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
@@ -19,18 +19,16 @@ package org.apache.hadoop.ozone.protocol.commands;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto;
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCmdType;
+ .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
-import static org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCmdType.closeContainerCommand;
/**
* Asks datanode to close a container.
*/
public class CloseContainerCommand
- extends SCMCommand<SCMCloseContainerCmdResponseProto> {
+ extends SCMCommand<CloseContainerCommandProto> {
private long containerID;
@@ -44,8 +42,8 @@ public class CloseContainerCommand
* @return Type
*/
@Override
- public SCMCmdType getType() {
- return closeContainerCommand;
+ public SCMCommandProto.Type getType() {
+ return SCMCommandProto.Type.closeContainerCommand;
}
/**
@@ -58,13 +56,13 @@ public class CloseContainerCommand
return getProto().toByteArray();
}
- public SCMCloseContainerCmdResponseProto getProto() {
- return SCMCloseContainerCmdResponseProto.newBuilder()
+ public CloseContainerCommandProto getProto() {
+ return CloseContainerCommandProto.newBuilder()
.setContainerID(containerID).build();
}
public static CloseContainerCommand getFromProtobuf(
- SCMCloseContainerCmdResponseProto closeContainerProto) {
+ CloseContainerCommandProto closeContainerProto) {
Preconditions.checkNotNull(closeContainerProto);
return new CloseContainerCommand(closeContainerProto.getContainerID());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
index a11ca25..4fa33f6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
@@ -18,11 +18,11 @@
package org.apache.hadoop.ozone.protocol.commands;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCmdType;
+ .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMDeleteBlocksCmdResponseProto;
+ .StorageContainerDatanodeProtocolProtos.DeleteBlocksCommandProto;
import java.util.List;
@@ -30,7 +30,7 @@ import java.util.List;
* A SCM command asks a datanode to delete a number of blocks.
*/
public class DeleteBlocksCommand extends
- SCMCommand<SCMDeleteBlocksCmdResponseProto> {
+ SCMCommand<DeleteBlocksCommandProto> {
private List<DeletedBlocksTransaction> blocksTobeDeleted;
@@ -44,8 +44,8 @@ public class DeleteBlocksCommand extends
}
@Override
- public SCMCmdType getType() {
- return SCMCmdType.deleteBlocksCommand;
+ public SCMCommandProto.Type getType() {
+ return SCMCommandProto.Type.deleteBlocksCommand;
}
@Override
@@ -54,13 +54,13 @@ public class DeleteBlocksCommand extends
}
public static DeleteBlocksCommand getFromProtobuf(
- SCMDeleteBlocksCmdResponseProto deleteBlocksProto) {
+ DeleteBlocksCommandProto deleteBlocksProto) {
return new DeleteBlocksCommand(deleteBlocksProto
.getDeletedBlocksTransactionsList());
}
- public SCMDeleteBlocksCmdResponseProto getProto() {
- return SCMDeleteBlocksCmdResponseProto.newBuilder()
+ public DeleteBlocksCommandProto getProto() {
+ return DeleteBlocksCommandProto.newBuilder()
.addAllDeletedBlocksTransactions(blocksTobeDeleted).build();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java
index 69f2c18..3a5da72 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java
@@ -19,18 +19,15 @@ package org.apache.hadoop.ozone.protocol.commands;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCmdType;
+ .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
+ .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto
.ErrorCode;
/**
* Response to Datanode Register call.
*/
-public class RegisteredCommand extends
- SCMCommand<SCMRegisteredCmdResponseProto> {
+public class RegisteredCommand {
private String datanodeUUID;
private String clusterID;
private ErrorCode error;
@@ -60,16 +57,6 @@ public class RegisteredCommand extends
}
/**
- * Returns the type of this command.
- *
- * @return Type
- */
- @Override
- public SCMCmdType getType() {
- return SCMCmdType.registeredCommand;
- }
-
- /**
* Returns datanode UUID.
*
* @return - Datanode ID.
@@ -117,10 +104,9 @@ public class RegisteredCommand extends
*
* @return A protobuf message.
*/
- @Override
public byte[] getProtoBufMessage() {
- SCMRegisteredCmdResponseProto.Builder builder =
- SCMRegisteredCmdResponseProto.newBuilder()
+ SCMRegisteredResponseProto.Builder builder =
+ SCMRegisteredResponseProto.newBuilder()
.setClusterID(this.clusterID)
.setDatanodeUUID(this.datanodeUUID)
.setErrorCode(this.error);
@@ -157,7 +143,7 @@ public class RegisteredCommand extends
* @param response - RegisteredCmdResponseProto
* @return RegisteredCommand
*/
- public RegisteredCommand getFromProtobuf(SCMRegisteredCmdResponseProto
+ public RegisteredCommand getFromProtobuf(SCMRegisteredResponseProto
response) {
Preconditions.checkNotNull(response);
if (response.hasHostname() && response.hasIpAddress()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
index c167d59..953e31a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
@@ -18,18 +18,16 @@
package org.apache.hadoop.ozone.protocol.commands;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCmdType;
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import static org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCmdType.reregisterCommand;
-import static org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
+ .StorageContainerDatanodeProtocolProtos.ReregisterCommandProto;
/**
* Informs a datanode to register itself with SCM again.
*/
public class ReregisterCommand extends
- SCMCommand<SCMReregisterCmdResponseProto>{
+ SCMCommand<ReregisterCommandProto>{
/**
* Returns the type of this command.
@@ -37,8 +35,8 @@ public class ReregisterCommand extends
* @return Type
*/
@Override
- public SCMCmdType getType() {
- return reregisterCommand;
+ public SCMCommandProto.Type getType() {
+ return SCMCommandProto.Type.reregisterCommand;
}
/**
@@ -51,8 +49,8 @@ public class ReregisterCommand extends
return getProto().toByteArray();
}
- public SCMReregisterCmdResponseProto getProto() {
- return SCMReregisterCmdResponseProto
+ public ReregisterCommandProto getProto() {
+ return ReregisterCommandProto
.newBuilder()
.build();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
index 73e4194..35ca802 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.protocol.commands;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCmdType;
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
/**
* A class that acts as the base class to convert between Java and SCM
@@ -31,7 +31,7 @@ public abstract class SCMCommand<T extends GeneratedMessage> {
* Returns the type of this command.
* @return Type
*/
- public abstract SCMCmdType getType();
+ public abstract SCMCommandProto.Type getType();
/**
* Gets the protobuf message of this object.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
index a56c57a..40fe189 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
@@ -20,24 +20,23 @@ import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos
.ContainerBlocksDeletionACKResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
+
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+ .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.hdds.protocol.proto
@@ -123,22 +122,16 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
/**
* Send by datanode to SCM.
*
- * @param datanodeDetailsProto - Datanode Details
- * @param nodeReport - node report
+ * @param heartbeat node heartbeat
* @throws IOException
*/
@Override
public SCMHeartbeatResponseProto sendHeartbeat(
- DatanodeDetailsProto datanodeDetailsProto,
- SCMNodeReport nodeReport) throws IOException {
- SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto
- .newBuilder();
- req.setDatanodeDetails(datanodeDetailsProto);
- req.setNodeReport(nodeReport);
+ SCMHeartbeatRequestProto heartbeat) throws IOException {
final SCMHeartbeatResponseProto resp;
try {
- resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build());
+ resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, heartbeat);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@@ -154,16 +147,16 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
* @return SCM Command.
*/
@Override
- public SCMRegisteredCmdResponseProto register(
- DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport,
- ContainerReportsRequestProto containerReportsRequestProto)
+ public SCMRegisteredResponseProto register(
+ DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport,
+ ContainerReportsProto containerReportsRequestProto)
throws IOException {
SCMRegisterRequestProto.Builder req =
SCMRegisterRequestProto.newBuilder();
req.setDatanodeDetails(datanodeDetailsProto);
req.setContainerReport(containerReportsRequestProto);
req.setNodeReport(nodeReport);
- final SCMRegisteredCmdResponseProto response;
+ final SCMRegisteredResponseProto response;
try {
response = rpcProxy.register(NULL_RPC_CONTROLLER, req.build());
} catch (ServiceException e) {
@@ -172,25 +165,6 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
return response;
}
- /**
- * Send a container report.
- *
- * @param reports -- Container report
- * @return HeartbeatRespose.nullcommand.
- * @throws IOException
- */
- @Override
- public ContainerReportsResponseProto sendContainerReport(
- ContainerReportsRequestProto reports) throws IOException {
- final ContainerReportsResponseProto resp;
- try {
- resp = rpcProxy.sendContainerReport(NULL_RPC_CONTROLLER, reports);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
- return resp;
- }
-
@Override
public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
ContainerBlocksDeletionACKProto deletedBlocks) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
index 07dba57..7e8bd8a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
@@ -19,18 +19,22 @@ package org.apache.hadoop.ozone.protocolPB;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos;
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+ .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos
- .ContainerBlocksDeletionACKResponseProto;
+ .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+ .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+ .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos
+ .ContainerBlocksDeletionACKResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
@@ -55,9 +59,8 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
}
@Override
- public StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto
- getVersion(RpcController controller,
- StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto request)
+ public SCMVersionResponseProto getVersion(RpcController controller,
+ SCMVersionRequestProto request)
throws ServiceException {
try {
return impl.getVersion(request);
@@ -67,15 +70,13 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
}
@Override
- public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
- register(RpcController controller, StorageContainerDatanodeProtocolProtos
- .SCMRegisterRequestProto request) throws ServiceException {
+ public SCMRegisteredResponseProto register(RpcController controller,
+ SCMRegisterRequestProto request) throws ServiceException {
try {
- ContainerReportsRequestProto containerRequestProto = null;
- SCMNodeReport scmNodeReport = null;
- containerRequestProto = request.getContainerReport();
- scmNodeReport = request.getNodeReport();
- return impl.register(request.getDatanodeDetails(), scmNodeReport,
+ ContainerReportsProto containerRequestProto = request
+ .getContainerReport();
+ NodeReportProto dnNodeReport = request.getNodeReport();
+ return impl.register(request.getDatanodeDetails(), dnNodeReport,
containerRequestProto);
} catch (IOException e) {
throw new ServiceException(e);
@@ -83,27 +84,15 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
}
@Override
- public SCMHeartbeatResponseProto
- sendHeartbeat(RpcController controller,
+ public SCMHeartbeatResponseProto sendHeartbeat(RpcController controller,
SCMHeartbeatRequestProto request) throws ServiceException {
try {
- return impl.sendHeartbeat(request.getDatanodeDetails(),
- request.getNodeReport());
+ return impl.sendHeartbeat(request);
} catch (IOException e) {
throw new ServiceException(e);
}
}
- @Override
- public ContainerReportsResponseProto sendContainerReport(
- RpcController controller, ContainerReportsRequestProto request)
- throws ServiceException {
- try {
- return impl.sendContainerReport(request);
- } catch (IOException e) {
- throw new ServiceException(e);
- }
- }
@Override
public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 20e6af8..cc131e0 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -34,81 +34,74 @@ package hadoop.hdds;
import "hdds.proto";
+/**
+ * Request for version info of the software stack on the server.
+ */
+message SCMVersionRequestProto {}
/**
-* This message is send by data node to indicate that it is alive or it is
-* registering with the node manager.
+* Generic response that is send to a version request. This allows keys to be
+* added on the fly and protocol to remain stable.
*/
-message SCMHeartbeatRequestProto {
- required DatanodeDetailsProto datanodeDetails = 1;
- optional SCMNodeReport nodeReport = 2;
+message SCMVersionResponseProto {
+ required uint32 softwareVersion = 1;
+ repeated hadoop.hdds.KeyValue keys = 2;
}
-/**
-A container report contains the following information.
-*/
-message ContainerInfo {
- optional string finalhash = 2;
- optional int64 size = 3;
- optional int64 used = 4;
- optional int64 keyCount = 5;
- // TODO: move the io count to separate message
- optional int64 readCount = 6;
- optional int64 writeCount = 7;
- optional int64 readBytes = 8;
- optional int64 writeBytes = 9;
- required int64 containerID = 10;
- optional hadoop.hdds.LifeCycleState state = 11;
+message SCMRegisterRequestProto {
+ required DatanodeDetailsProto datanodeDetails = 1;
+ required NodeReportProto nodeReport = 2;
+ required ContainerReportsProto containerReport = 3;
}
-// The deleted blocks which are stored in deletedBlock.db of scm.
-// We don't use BlockID because this only contians multiple localIDs
-// of the same containerID.
-message DeletedBlocksTransaction {
- required int64 txID = 1;
- required int64 containerID = 2;
- repeated int64 localID = 3;
- // the retry time of sending deleting command to datanode.
- required int32 count = 4;
+/**
+ * Datanode ID returned by the SCM. This is similar to name node
+ * registeration of a datanode.
+ */
+message SCMRegisteredResponseProto {
+ enum ErrorCode {
+ success = 1;
+ errorNodeNotPermitted = 2;
+ }
+ required ErrorCode errorCode = 1;
+ required string datanodeUUID = 2;
+ required string clusterID = 3;
+ optional SCMNodeAddressList addressList = 4;
+ optional string hostname = 5;
+ optional string ipAddress = 6;
}
/**
-A set of container reports, max count is generally set to
-8192 since that keeps the size of the reports under 1 MB.
+* This message is send by data node to indicate that it is alive or it is
+* registering with the node manager.
*/
-message ContainerReportsRequestProto {
- enum reportType {
- fullReport = 0;
- deltaReport = 1;
- }
+message SCMHeartbeatRequestProto {
required DatanodeDetailsProto datanodeDetails = 1;
- repeated ContainerInfo reports = 2;
- required reportType type = 3;
+ optional NodeReportProto nodeReport = 2;
+ optional ContainerReportsProto containerReport = 3;
}
-message ContainerReportsResponseProto {
+/*
+ * A group of commands for the datanode to execute
+ */
+message SCMHeartbeatResponseProto {
+ required string datanodeUUID = 1;
+ repeated SCMCommandProto commands = 2;
}
-/**
-* This message is send along with the heart beat to report datanode
-* storage utilization by SCM.
-*/
-message SCMNodeReport {
- repeated SCMStorageReport storageReport = 1;
+message SCMNodeAddressList {
+ repeated string addressList = 1;
}
/**
- * Types of recognized storage media.
- */
-enum StorageTypeProto {
- DISK = 1;
- SSD = 2;
- ARCHIVE = 3;
- RAM_DISK = 4;
- PROVIDED = 5;
+* This message is send along with the heart beat to report datanode
+* storage utilization to SCM.
+*/
+message NodeReportProto {
+ repeated StorageReportProto storageReport = 1;
}
-message SCMStorageReport {
+message StorageReportProto {
required string storageUuid = 1;
required string storageLocation = 2;
optional uint64 capacity = 3 [default = 0];
@@ -118,107 +111,82 @@ message SCMStorageReport {
optional bool failed = 7 [default = false];
}
-message SCMRegisterRequestProto {
- required DatanodeDetailsProto datanodeDetails = 1;
- required SCMNodeReport nodeReport = 2;
- required ContainerReportsRequestProto containerReport = 3;
-}
-
-/**
- * Request for version info of the software stack on the server.
- */
-message SCMVersionRequestProto {
-
-}
-
-/**
-* Generic response that is send to a version request. This allows keys to be
-* added on the fly and protocol to remain stable.
-*/
-message SCMVersionResponseProto {
- required uint32 softwareVersion = 1;
- repeated hadoop.hdds.KeyValue keys = 2;
-}
-
-message SCMNodeAddressList {
- repeated string addressList = 1;
-}
-
/**
- * Datanode ID returned by the SCM. This is similar to name node
- * registeration of a datanode.
+ * Types of recognized storage media.
*/
-message SCMRegisteredCmdResponseProto {
- enum ErrorCode {
- success = 1;
- errorNodeNotPermitted = 2;
- }
- required ErrorCode errorCode = 2;
- required string datanodeUUID = 3;
- required string clusterID = 4;
- optional SCMNodeAddressList addressList = 5;
- optional string hostname = 6;
- optional string ipAddress = 7;
+enum StorageTypeProto {
+ DISK = 1;
+ SSD = 2;
+ ARCHIVE = 3;
+ RAM_DISK = 4;
+ PROVIDED = 5;
}
/**
- * SCM informs a datanode to register itself again.
- * With recieving this command, datanode will transit to REGISTER state.
- */
-message SCMReregisterCmdResponseProto {}
-
-/**
-This command tells the data node to send in the container report when possible
+A set of container reports, max count is generally set to
+8192 since that keeps the size of the reports under 1 MB.
*/
-message SendContainerReportProto {
+message ContainerReportsProto {
+ repeated ContainerInfo reports = 2;
}
-/**
-This command asks the datanode to close a specific container.
-*/
-message SCMCloseContainerCmdResponseProto {
- required int64 containerID = 1;
-}
/**
-Type of commands supported by SCM to datanode protocol.
+A container report contains the following information.
*/
-enum SCMCmdType {
- versionCommand = 2;
- registeredCommand = 3;
- reregisterCommand = 4;
- deleteBlocksCommand = 5;
- closeContainerCommand = 6;
+message ContainerInfo {
+ optional string finalhash = 1;
+ optional int64 size = 2;
+ optional int64 used = 3;
+ optional int64 keyCount = 4;
+ // TODO: move the io count to separate message
+ optional int64 readCount = 5;
+ optional int64 writeCount = 6;
+ optional int64 readBytes = 7;
+ optional int64 writeBytes = 8;
+ required int64 containerID = 9;
+ optional hadoop.hdds.LifeCycleState state = 10;
}
/*
* These are commands returned by SCM for to the datanode to execute.
*/
-message SCMCommandResponseProto {
- required SCMCmdType cmdType = 2; // Type of the command
- optional SCMRegisteredCmdResponseProto registeredProto = 3;
- optional SCMVersionResponseProto versionProto = 4;
- optional SCMReregisterCmdResponseProto reregisterProto = 5;
- optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 6;
- required string datanodeUUID = 7;
- optional SCMCloseContainerCmdResponseProto closeContainerProto = 8;
+message SCMCommandProto {
+ enum Type {
+ reregisterCommand = 1;
+ deleteBlocksCommand = 2;
+ closeContainerCommand = 3;
+ deleteContainerCommand = 4;
+ }
+ // TODO: once we start using protoc 3.x, refactor this message using "oneof"
+ required Type commandType = 1;
+ optional ReregisterCommandProto reregisterCommandProto = 2;
+ optional DeleteBlocksCommandProto deleteBlocksCommandProto = 3;
+ optional CloseContainerCommandProto closeContainerCommandProto = 4;
+ optional DeleteContainerCommandProto deleteContainerCommandProto = 5;
}
-
-/*
- * A group of commands for the datanode to execute
+/**
+ * SCM informs a datanode to register itself again.
+ * With recieving this command, datanode will transit to REGISTER state.
*/
-message SCMHeartbeatResponseProto {
- repeated SCMCommandResponseProto commands = 1;
-}
+message ReregisterCommandProto {}
+
// HB response from SCM, contains a list of block deletion transactions.
-message SCMDeleteBlocksCmdResponseProto {
+message DeleteBlocksCommandProto {
repeated DeletedBlocksTransaction deletedBlocksTransactions = 1;
}
-// SendACK response returned by datanode to SCM, currently empty.
-message ContainerBlocksDeletionACKResponseProto {
+// The deleted blocks which are stored in deletedBlock.db of scm.
+// We don't use BlockID because this only contians multiple localIDs
+// of the same containerID.
+message DeletedBlocksTransaction {
+ required int64 txID = 1;
+ required int64 containerID = 2;
+ repeated int64 localID = 3;
+ // the retry time of sending deleting command to datanode.
+ required int32 count = 4;
}
// ACK message datanode sent to SCM, contains the result of
@@ -231,6 +199,24 @@ message ContainerBlocksDeletionACKProto {
repeated DeleteBlockTransactionResult results = 1;
}
+// SendACK response returned by datanode to SCM, currently empty.
+message ContainerBlocksDeletionACKResponseProto {
+}
+
+/**
+This command asks the datanode to close a specific container.
+*/
+message CloseContainerCommandProto {
+ required int64 containerID = 1;
+}
+
+/**
+This command asks the datanode to close a specific container.
+*/
+message DeleteContainerCommandProto {
+ required int64 containerID = 1;
+}
+
/**
* Protocol used from a datanode to StorageContainerManager.
*
@@ -305,7 +291,7 @@ service StorageContainerDatanodeProtocolService {
/**
* Registers a data node with SCM.
*/
- rpc register (SCMRegisterRequestProto) returns (SCMRegisteredCmdResponseProto);
+ rpc register (SCMRegisterRequestProto) returns (SCMRegisteredResponseProto);
/**
* Send heartbeat from datanode to SCM. HB's under SCM looks more
@@ -315,12 +301,6 @@ service StorageContainerDatanodeProtocolService {
rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto);
/**
- send container reports sends the container report to SCM. This will
- return a null command as response.
- */
- rpc sendContainerReport(ContainerReportsRequestProto) returns (ContainerReportsResponseProto);
-
- /**
* Sends the block deletion ACK to SCM.
*/
rpc sendContainerBlocksDeletionACK (ContainerBlocksDeletionACKProto) returns (ContainerBlocksDeletionACKResponseProto);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index c57a366..0ee6321 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -16,12 +16,12 @@
*/
package org.apache.hadoop.ozone.container.common;
-import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.VersionInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos;
+ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.hdds.protocol.proto
@@ -30,13 +30,13 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+ .StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.VersionResponse;
@@ -56,7 +56,7 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
// Map of datanode to containers
private Map<DatanodeDetails, Map<String, ContainerInfo>> nodeContainers =
new HashMap();
- private Map<DatanodeDetails, SCMNodeReport> nodeReports = new HashMap<>();
+ private Map<DatanodeDetails, NodeReportProto> nodeReports = new HashMap<>();
/**
* Returns the number of heartbeats made to this class.
*
@@ -166,20 +166,17 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
/**
* Used by data node to send a Heartbeat.
*
- * @param datanodeDetailsProto - DatanodeDetailsProto.
- * @param nodeReport - node report.
+ * @param heartbeat - node heartbeat.
* @return - SCMHeartbeatResponseProto
* @throws IOException
*/
@Override
public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
- sendHeartbeat(DatanodeDetailsProto datanodeDetailsProto,
- SCMNodeReport nodeReport)
- throws IOException {
+ sendHeartbeat(SCMHeartbeatRequestProto heartbeat) throws IOException {
rpcCount.incrementAndGet();
heartbeatCount.incrementAndGet();
sleepIfNeeded();
- List<SCMCommandResponseProto>
+ List<SCMCommandProto>
cmdResponses = new LinkedList<>();
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
.build();
@@ -193,21 +190,19 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
*/
@Override
public StorageContainerDatanodeProtocolProtos
- .SCMRegisteredCmdResponseProto register(
- DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport,
- StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto
+ .SCMRegisteredResponseProto register(
+ DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport,
+ StorageContainerDatanodeProtocolProtos.ContainerReportsProto
containerReportsRequestProto)
throws IOException {
rpcCount.incrementAndGet();
- sendContainerReport(containerReportsRequestProto);
updateNodeReport(datanodeDetailsProto, nodeReport);
sleepIfNeeded();
- return StorageContainerDatanodeProtocolProtos
- .SCMRegisteredCmdResponseProto
+ return StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto
.newBuilder().setClusterID(UUID.randomUUID().toString())
.setDatanodeUUID(datanodeDetailsProto.getUuid()).setErrorCode(
StorageContainerDatanodeProtocolProtos
- .SCMRegisteredCmdResponseProto.ErrorCode.success).build();
+ .SCMRegisteredResponseProto.ErrorCode.success).build();
}
/**
@@ -216,19 +211,19 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
* @param nodeReport
*/
public void updateNodeReport(DatanodeDetailsProto datanodeDetailsProto,
- SCMNodeReport nodeReport) {
+ NodeReportProto nodeReport) {
DatanodeDetails datanode = DatanodeDetails.getFromProtoBuf(
datanodeDetailsProto);
- SCMNodeReport.Builder datanodeReport = SCMNodeReport.newBuilder();
+ NodeReportProto.Builder nodeReportProto = NodeReportProto.newBuilder();
- List<SCMStorageReport> storageReports =
+ List<StorageReportProto> storageReports =
nodeReport.getStorageReportList();
- for(SCMStorageReport report : storageReports) {
- datanodeReport.addStorageReport(report);
+ for(StorageReportProto report : storageReports) {
+ nodeReportProto.addStorageReport(report);
}
- nodeReports.put(datanode, datanodeReport.build());
+ nodeReports.put(datanode, nodeReportProto.build());
}
@@ -254,39 +249,6 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
return 0;
}
- /**
- * Send a container report.
- *
- * @param reports -- Container report
- * @return HeartbeatResponse.nullcommand.
- * @throws IOException
- */
- @Override
- public StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto
- sendContainerReport(StorageContainerDatanodeProtocolProtos
- .ContainerReportsRequestProto reports) throws IOException {
- Preconditions.checkNotNull(reports);
- containerReportsCount.incrementAndGet();
-
- DatanodeDetails datanode = DatanodeDetails.getFromProtoBuf(
- reports.getDatanodeDetails());
- if (reports.getReportsCount() > 0) {
- Map containers = nodeContainers.get(datanode);
- if (containers == null) {
- containers = new LinkedHashMap();
- nodeContainers.put(datanode, containers);
- }
-
- for (StorageContainerDatanodeProtocolProtos.ContainerInfo report:
- reports.getReportsList()) {
- containers.put(report.getContainerID(), report);
- }
- }
-
- return StorageContainerDatanodeProtocolProtos
- .ContainerReportsResponseProto.newBuilder().build();
- }
-
@Override
public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
ContainerBlocksDeletionACKProto request) throws IOException {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/3] hadoop git commit: HDDS-81. Moving ContainerReport inside
Datanode heartbeat. Contributed by Nanda Kumar.
Posted by ae...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 2d88621..f5fe46a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
@@ -33,7 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.lease.Lease;
import org.apache.hadoop.ozone.lease.LeaseException;
@@ -368,11 +369,12 @@ public class ContainerMapping implements Mapping {
* @param reports Container report
*/
@Override
- public void processContainerReports(ContainerReportsRequestProto reports)
+ public void processContainerReports(DatanodeDetails datanodeDetails,
+ ContainerReportsProto reports)
throws IOException {
List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
containerInfos = reports.getReportsList();
- containerSupervisor.handleContainerReport(reports);
+ containerSupervisor.handleContainerReport(datanodeDetails, reports);
for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
containerInfos) {
byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID());
@@ -402,7 +404,7 @@ public class ContainerMapping implements Mapping {
// Container not found in our container db.
LOG.error("Error while processing container report from datanode :" +
" {}, for container: {}, reason: container doesn't exist in" +
- "container database.", reports.getDatanodeDetails(),
+ "container database.", datanodeDetails,
datanodeState.getContainerID());
}
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
index f560174..ee8e344 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
@@ -16,10 +16,11 @@
*/
package org.apache.hadoop.hdds.scm.container;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import java.io.Closeable;
import java.io.IOException;
@@ -98,7 +99,8 @@ public interface Mapping extends Closeable {
*
* @param reports Container report
*/
- void processContainerReports(ContainerReportsRequestProto reports)
+ void processContainerReports(DatanodeDetails datanodeDetails,
+ ContainerReportsProto reports)
throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
index c14303f..5bd0574 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodePoolManager;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
@@ -295,24 +295,21 @@ public class ContainerSupervisor implements Closeable {
* @param containerReport -- Container report for a specific container from
* a datanode.
*/
- public void handleContainerReport(
- ContainerReportsRequestProto containerReport) {
- DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf(
- containerReport.getDatanodeDetails());
+ public void handleContainerReport(DatanodeDetails datanodeDetails,
+ ContainerReportsProto containerReport) {
inProgressPoolListLock.readLock().lock();
try {
String poolName = poolManager.getNodePool(datanodeDetails);
for (InProgressPool ppool : inProgressPoolList) {
if (ppool.getPoolName().equalsIgnoreCase(poolName)) {
- ppool.handleContainerReport(containerReport);
+ ppool.handleContainerReport(datanodeDetails, containerReport);
return;
}
}
// TODO: Decide if we can do anything else with this report.
LOG.debug("Discarding the container report for pool {}. " +
"That pool is not currently in the pool reconciliation process." +
- " Container Name: {}", poolName,
- containerReport.getDatanodeDetails());
+ " Container Name: {}", poolName, datanodeDetails);
} catch (SCMException e) {
LOG.warn("Skipping processing container report from datanode {}, "
+ "cause: failed to get the corresponding node pool",
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
index c444e90..4b54731 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -178,21 +178,20 @@ public final class InProgressPool {
*
* @param containerReport - ContainerReport
*/
- public void handleContainerReport(
- ContainerReportsRequestProto containerReport) {
+ public void handleContainerReport(DatanodeDetails datanodeDetails,
+ ContainerReportsProto containerReport) {
if (status == ProgressStatus.InProgress) {
- executorService.submit(processContainerReport(containerReport));
+ executorService.submit(processContainerReport(datanodeDetails,
+ containerReport));
} else {
LOG.debug("Cannot handle container report when the pool is in {} status.",
status);
}
}
- private Runnable processContainerReport(
- ContainerReportsRequestProto reports) {
+ private Runnable processContainerReport(DatanodeDetails datanodeDetails,
+ ContainerReportsProto reports) {
return () -> {
- DatanodeDetails datanodeDetails =
- DatanodeDetails.getFromProtoBuf(reports.getDatanodeDetails());
if (processedNodeSet.computeIfAbsent(datanodeDetails.getUuid(),
(k) -> true)) {
nodeProcessed.incrementAndGet();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
index 05a9fc3..04658bd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hdds.scm.node;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import static org.apache.hadoop.util.Time.monotonicNow;
@@ -31,7 +31,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
public class HeartbeatQueueItem {
private DatanodeDetails datanodeDetails;
private long recvTimestamp;
- private SCMNodeReport nodeReport;
+ private NodeReportProto nodeReport;
/**
*
@@ -40,7 +40,7 @@ public class HeartbeatQueueItem {
* @param nodeReport - node report associated with the heartbeat if any.
*/
HeartbeatQueueItem(DatanodeDetails datanodeDetails, long recvTimestamp,
- SCMNodeReport nodeReport) {
+ NodeReportProto nodeReport) {
this.datanodeDetails = datanodeDetails;
this.recvTimestamp = recvTimestamp;
this.nodeReport = nodeReport;
@@ -56,7 +56,7 @@ public class HeartbeatQueueItem {
/**
* @return node report.
*/
- public SCMNodeReport getNodeReport() {
+ public NodeReportProto getNodeReport() {
return nodeReport;
}
@@ -72,7 +72,7 @@ public class HeartbeatQueueItem {
*/
public static class Builder {
private DatanodeDetails datanodeDetails;
- private SCMNodeReport nodeReport;
+ private NodeReportProto nodeReport;
private long recvTimestamp = monotonicNow();
public Builder setDatanodeDetails(DatanodeDetails dnDetails) {
@@ -80,8 +80,8 @@ public class HeartbeatQueueItem {
return this;
}
- public Builder setNodeReport(SCMNodeReport scmNodeReport) {
- this.nodeReport = scmNodeReport;
+ public Builder setNodeReport(NodeReportProto report) {
+ this.nodeReport = report;
return this;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 353a069..b339fb7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -28,15 +28,14 @@ import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
+ .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto
.ErrorCode;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+ .StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ipc.Server;
@@ -592,7 +591,7 @@ public class SCMNodeManager
DatanodeDetails datanodeDetails = hbItem.getDatanodeDetails();
UUID datanodeUuid = datanodeDetails.getUuid();
- SCMNodeReport nodeReport = hbItem.getNodeReport();
+ NodeReportProto nodeReport = hbItem.getNodeReport();
long recvTimestamp = hbItem.getRecvTimestamp();
long processTimestamp = Time.monotonicNow();
if (LOG.isTraceEnabled()) {
@@ -637,7 +636,7 @@ public class SCMNodeManager
new ReregisterCommand());
}
- private void updateNodeStat(UUID dnId, SCMNodeReport nodeReport) {
+ private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) {
SCMNodeStat stat = nodeStats.get(dnId);
if (stat == null) {
LOG.debug("SCM updateNodeStat based on heartbeat from previous" +
@@ -649,8 +648,9 @@ public class SCMNodeManager
long totalCapacity = 0;
long totalRemaining = 0;
long totalScmUsed = 0;
- List<SCMStorageReport> storageReports = nodeReport.getStorageReportList();
- for (SCMStorageReport report : storageReports) {
+ List<StorageReportProto> storageReports = nodeReport
+ .getStorageReportList();
+ for (StorageReportProto report : storageReports) {
totalCapacity += report.getCapacity();
totalRemaining += report.getRemaining();
totalScmUsed+= report.getScmUsed();
@@ -710,7 +710,7 @@ public class SCMNodeManager
* Register the node if the node finds that it is not registered with any
* SCM.
*
- * @param datanodeDetailsProto - Send datanodeDetails with Node info.
+ * @param datanodeDetails - Send datanodeDetails with Node info.
* This function generates and assigns new datanode ID
* for the datanode. This allows SCM to be run independent
* of Namenode if required.
@@ -719,13 +719,11 @@ public class SCMNodeManager
* @return SCMHeartbeatResponseProto
*/
@Override
- public SCMCommand register(DatanodeDetailsProto datanodeDetailsProto,
- SCMNodeReport nodeReport) {
+ public RegisteredCommand register(
+ DatanodeDetails datanodeDetails, NodeReportProto nodeReport) {
String hostname = null;
String ip = null;
- DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf(
- datanodeDetailsProto);
InetAddress dnAddress = Server.getRemoteIp();
if (dnAddress != null) {
// Mostly called inside an RPC, update ip and peer hostname
@@ -734,7 +732,7 @@ public class SCMNodeManager
datanodeDetails.setHostName(hostname);
datanodeDetails.setIpAddress(ip);
}
- SCMCommand responseCommand = verifyDatanodeUUID(datanodeDetails);
+ RegisteredCommand responseCommand = verifyDatanodeUUID(datanodeDetails);
if (responseCommand != null) {
return responseCommand;
}
@@ -785,7 +783,8 @@ public class SCMNodeManager
* @param datanodeDetails - Datanode Details.
* @return SCMCommand
*/
- private SCMCommand verifyDatanodeUUID(DatanodeDetails datanodeDetails) {
+ private RegisteredCommand verifyDatanodeUUID(
+ DatanodeDetails datanodeDetails) {
if (datanodeDetails.getUuid() != null &&
nodes.containsKey(datanodeDetails.getUuid())) {
LOG.trace("Datanode is already registered. Datanode: {}",
@@ -802,34 +801,23 @@ public class SCMNodeManager
/**
* Send heartbeat to indicate the datanode is alive and doing well.
*
- * @param datanodeDetailsProto - DatanodeDetailsProto.
+ * @param datanodeDetails - DatanodeDetailsProto.
* @param nodeReport - node report.
* @return SCMheartbeat response.
* @throws IOException
*/
@Override
public List<SCMCommand> sendHeartbeat(
- DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport) {
+ DatanodeDetails datanodeDetails, NodeReportProto nodeReport) {
- Preconditions.checkNotNull(datanodeDetailsProto, "Heartbeat is missing " +
+ Preconditions.checkNotNull(datanodeDetails, "Heartbeat is missing " +
"DatanodeDetails.");
- DatanodeDetails datanodeDetails = DatanodeDetails
- .getFromProtoBuf(datanodeDetailsProto);
- // Checking for NULL to make sure that we don't get
- // an exception from ConcurrentList.
- // This could be a problem in tests, if this function is invoked via
- // protobuf, transport layer will guarantee that this is not null.
- if (datanodeDetails != null) {
- heartbeatQueue.add(
- new HeartbeatQueueItem.Builder()
- .setDatanodeDetails(datanodeDetails)
- .setNodeReport(nodeReport)
- .build());
- return commandQueue.getCommand(datanodeDetails.getUuid());
- } else {
- LOG.error("Datanode ID in heartbeat is null");
- }
- return null;
+ heartbeatQueue.add(
+ new HeartbeatQueueItem.Builder()
+ .setDatanodeDetails(datanodeDetails)
+ .setNodeReport(nodeReport)
+ .build());
+ return commandQueue.getCommand(datanodeDetails.getUuid());
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
index fa423bb..6ea83df 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java
@@ -23,7 +23,7 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.
- StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+ StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -33,7 +33,11 @@ import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.IOException;
-import java.util.*;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -159,7 +163,7 @@ public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean {
}
public StorageReportResult processNodeReport(UUID datanodeID,
- StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport)
+ StorageContainerDatanodeProtocolProtos.NodeReportProto nodeReport)
throws IOException {
Preconditions.checkNotNull(datanodeID);
Preconditions.checkNotNull(nodeReport);
@@ -170,9 +174,9 @@ public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean {
Set<StorageLocationReport> storagReportSet = new HashSet<>();
Set<StorageLocationReport> fullVolumeSet = new HashSet<>();
Set<StorageLocationReport> failedVolumeSet = new HashSet<>();
- List<SCMStorageReport>
+ List<StorageReportProto>
storageReports = nodeReport.getStorageReportList();
- for (SCMStorageReport report : storageReports) {
+ for (StorageReportProto report : storageReports) {
StorageLocationReport storageReport =
StorageLocationReport.getFromProtobuf(report);
storagReportSet.add(storageReport);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 6e5b7de..1b1645d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -25,29 +25,47 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ReregisterCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos
+ .ContainerBlocksDeletionACKResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos
+ .ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+
+import static org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto
+ .Type.closeContainerCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto
+ .Type.deleteBlocksCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto
+ .Type.reregisterCommand;
-import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.versionCommand;
-import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.registeredCommand;
-import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.reregisterCommand;
-import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.deleteBlocksCommand;
-import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.closeContainerCommand;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
@@ -150,96 +168,81 @@ public class SCMDatanodeProtocolServer implements
@Override
public SCMHeartbeatResponseProto sendHeartbeat(
- HddsProtos.DatanodeDetailsProto datanodeDetails,
- StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport)
+ SCMHeartbeatRequestProto heartbeat)
throws IOException {
+ // TODO: Add a heartbeat dispatcher.
+ DatanodeDetails datanodeDetails = DatanodeDetails
+ .getFromProtoBuf(heartbeat.getDatanodeDetails());
+ NodeReportProto nodeReport = heartbeat.getNodeReport();
List<SCMCommand> commands =
scm.getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport);
- List<SCMCommandResponseProto> cmdResponses = new LinkedList<>();
+ List<SCMCommandProto> cmdResponses = new LinkedList<>();
for (SCMCommand cmd : commands) {
- cmdResponses.add(getCommandResponse(cmd, datanodeDetails.getUuid()));
+ cmdResponses.add(getCommandResponse(cmd));
}
return SCMHeartbeatResponseProto.newBuilder()
+ .setDatanodeUUID(datanodeDetails.getUuidString())
.addAllCommands(cmdResponses).build();
}
@Override
- public SCMRegisteredCmdResponseProto register(
- HddsProtos.DatanodeDetailsProto datanodeDetails, SCMNodeReport nodeReport,
- ContainerReportsRequestProto containerReportsRequestProto)
+ public SCMRegisteredResponseProto register(
+ HddsProtos.DatanodeDetailsProto datanodeDetailsProto,
+ NodeReportProto nodeReport,
+ ContainerReportsProto containerReportsProto)
throws IOException {
+ DatanodeDetails datanodeDetails = DatanodeDetails
+ .getFromProtoBuf(datanodeDetailsProto);
// TODO : Return the list of Nodes that forms the SCM HA.
- RegisteredCommand registeredCommand = (RegisteredCommand) scm
- .getScmNodeManager().register(datanodeDetails, nodeReport);
- SCMCmdType type = registeredCommand.getType();
- if (type == SCMCmdType.registeredCommand && registeredCommand.getError()
- == SCMRegisteredCmdResponseProto.ErrorCode.success) {
- scm.getScmContainerManager().processContainerReports(
- containerReportsRequestProto);
+ RegisteredCommand registeredCommand = scm.getScmNodeManager()
+ .register(datanodeDetails, nodeReport);
+ if (registeredCommand.getError()
+ == SCMRegisteredResponseProto.ErrorCode.success) {
+ scm.getScmContainerManager().processContainerReports(datanodeDetails,
+ containerReportsProto);
}
return getRegisteredResponse(registeredCommand);
}
@VisibleForTesting
- public static SCMRegisteredCmdResponseProto getRegisteredResponse(
- SCMCommand cmd) {
- Preconditions.checkState(cmd.getClass() == RegisteredCommand.class);
- RegisteredCommand rCmd = (RegisteredCommand) cmd;
- SCMCmdType type = cmd.getType();
- if (type != SCMCmdType.registeredCommand) {
- throw new IllegalArgumentException(
- "Registered command is not well " + "formed. Internal Error.");
- }
- return SCMRegisteredCmdResponseProto.newBuilder()
+ public static SCMRegisteredResponseProto getRegisteredResponse(
+ RegisteredCommand cmd) {
+ return SCMRegisteredResponseProto.newBuilder()
// TODO : Fix this later when we have multiple SCM support.
// .setAddressList(addressList)
- .setErrorCode(rCmd.getError())
- .setClusterID(rCmd.getClusterID())
- .setDatanodeUUID(rCmd.getDatanodeUUID())
+ .setErrorCode(cmd.getError())
+ .setClusterID(cmd.getClusterID())
+ .setDatanodeUUID(cmd.getDatanodeUUID())
.build();
}
- @Override
- public ContainerReportsResponseProto sendContainerReport(
- ContainerReportsRequestProto reports)
+ public void processContainerReports(DatanodeDetails datanodeDetails,
+ ContainerReportsProto reports)
throws IOException {
- updateContainerReportMetrics(reports);
-
+ updateContainerReportMetrics(datanodeDetails, reports);
// should we process container reports async?
- scm.getScmContainerManager().processContainerReports(reports);
- return ContainerReportsResponseProto.newBuilder().build();
+ scm.getScmContainerManager()
+ .processContainerReports(datanodeDetails, reports);
}
- private void updateContainerReportMetrics(
- ContainerReportsRequestProto reports) {
- ContainerStat newStat = null;
- // TODO: We should update the logic once incremental container report
- // type is supported.
- if (reports
- .getType() == StorageContainerDatanodeProtocolProtos
- .ContainerReportsRequestProto.reportType.fullReport) {
- newStat = new ContainerStat();
- for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
- .getReportsList()) {
- newStat.add(new ContainerStat(info.getSize(), info.getUsed(),
- info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
- info.getReadCount(), info.getWriteCount()));
- }
-
- // update container metrics
- StorageContainerManager.getMetrics().setLastContainerStat(newStat);
+ private void updateContainerReportMetrics(DatanodeDetails datanodeDetails,
+ ContainerReportsProto reports) {
+ ContainerStat newStat = new ContainerStat();
+ for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
+ .getReportsList()) {
+ newStat.add(new ContainerStat(info.getSize(), info.getUsed(),
+ info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
+ info.getReadCount(), info.getWriteCount()));
}
+ // update container metrics
+ StorageContainerManager.getMetrics().setLastContainerStat(newStat);
// Update container stat entry, this will trigger a removal operation if it
// exists in cache.
- synchronized (scm.getContainerReportCache()) {
- String datanodeUuid = reports.getDatanodeDetails().getUuid();
- if (datanodeUuid != null && newStat != null) {
- scm.getContainerReportCache().put(datanodeUuid, newStat);
- // update global view container metrics
- StorageContainerManager.getMetrics().incrContainerStat(newStat);
- }
- }
+ String datanodeUuid = datanodeDetails.getUuidString();
+ scm.getContainerReportCache().put(datanodeUuid, newStat);
+ // update global view container metrics
+ StorageContainerManager.getMetrics().incrContainerStat(newStat);
}
@@ -298,28 +301,15 @@ public class SCMDatanodeProtocolServer implements
* @throws IOException
*/
@VisibleForTesting
- public StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto
- getCommandResponse(
- SCMCommand cmd, final String datanodeID) throws IOException {
- SCMCmdType type = cmd.getType();
- SCMCommandResponseProto.Builder builder =
- SCMCommandResponseProto.newBuilder().setDatanodeUUID(datanodeID);
- switch (type) {
- case registeredCommand:
- return builder
- .setCmdType(registeredCommand)
- .setRegisteredProto(SCMRegisteredCmdResponseProto
- .getDefaultInstance())
- .build();
- case versionCommand:
- return builder
- .setCmdType(versionCommand)
- .setVersionProto(SCMVersionResponseProto.getDefaultInstance())
- .build();
+ public SCMCommandProto getCommandResponse(SCMCommand cmd)
+ throws IOException {
+ SCMCommandProto.Builder builder =
+ SCMCommandProto.newBuilder();
+ switch (cmd.getType()) {
case reregisterCommand:
return builder
- .setCmdType(reregisterCommand)
- .setReregisterProto(SCMReregisterCmdResponseProto
+ .setCommandType(reregisterCommand)
+ .setReregisterCommandProto(ReregisterCommandProto
.getDefaultInstance())
.build();
case deleteBlocksCommand:
@@ -335,13 +325,14 @@ public class SCMDatanodeProtocolServer implements
.collect(Collectors.toList());
scm.getScmBlockManager().getDeletedBlockLog().incrementCount(txs);
return builder
- .setCmdType(deleteBlocksCommand)
- .setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto())
+ .setCommandType(deleteBlocksCommand)
+ .setDeleteBlocksCommandProto(((DeleteBlocksCommand) cmd).getProto())
.build();
case closeContainerCommand:
return builder
- .setCmdType(closeContainerCommand)
- .setCloseContainerProto(((CloseContainerCommand) cmd).getProto())
+ .setCommandType(closeContainerCommand)
+ .setCloseContainerCommandProto(
+ ((CloseContainerCommand) cmd).getProto())
.build();
default:
throw new IllegalArgumentException("Not implemented");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 5cf0a92..b8036d7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -18,9 +18,9 @@ package org.apache.hadoop.hdds.scm;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol
- .proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+ .proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+ .StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageTypeProto;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
@@ -53,16 +53,17 @@ public final class TestUtils {
public static DatanodeDetails getDatanodeDetails(SCMNodeManager nodeManager,
String uuid) {
DatanodeDetails datanodeDetails = getDatanodeDetails(uuid);
- nodeManager.register(datanodeDetails.getProtoBufMessage(), null);
+ nodeManager.register(datanodeDetails, null);
return datanodeDetails;
}
/**
* Create Node Report object.
- * @return SCMNodeReport
+ * @return NodeReportProto
*/
- public static SCMNodeReport createNodeReport(List<SCMStorageReport> reports) {
- SCMNodeReport.Builder nodeReport = SCMNodeReport.newBuilder();
+ public static NodeReportProto createNodeReport(
+ List<StorageReportProto> reports) {
+ NodeReportProto.Builder nodeReport = NodeReportProto.newBuilder();
nodeReport.addAllStorageReport(reports);
return nodeReport.build();
}
@@ -71,14 +72,14 @@ public final class TestUtils {
* Create SCM Storage Report object.
* @return list of SCMStorageReport
*/
- public static List<SCMStorageReport> createStorageReport(long capacity,
+ public static List<StorageReportProto> createStorageReport(long capacity,
long used, long remaining, String path, StorageTypeProto type, String id,
int count) {
- List<SCMStorageReport> reportList = new ArrayList<>();
+ List<StorageReportProto> reportList = new ArrayList<>();
for (int i = 0; i < count; i++) {
Preconditions.checkNotNull(path);
Preconditions.checkNotNull(id);
- SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+ StorageReportProto.Builder srb = StorageReportProto.newBuilder();
srb.setStorageUuid(id).setStorageLocation(path).setCapacity(capacity)
.setScmUsed(used).setRemaining(remaining);
StorageTypeProto storageTypeProto =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index a46d7ba..8c59462 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -24,13 +24,14 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+ .StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+ .StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.assertj.core.util.Preconditions;
import org.mockito.Mockito;
@@ -370,13 +371,13 @@ public class MockNodeManager implements NodeManager {
* Register the node if the node finds that it is not registered with any
* SCM.
*
- * @param datanodeDetails DatanodeDetailsProto
- * @param nodeReport SCMNodeReport
+ * @param datanodeDetails DatanodeDetails
+ * @param nodeReport NodeReportProto
* @return SCMHeartbeatResponseProto
*/
@Override
- public SCMCommand register(HddsProtos.DatanodeDetailsProto datanodeDetails,
- SCMNodeReport nodeReport) {
+ public RegisteredCommand register(DatanodeDetails datanodeDetails,
+ NodeReportProto nodeReport) {
return null;
}
@@ -388,9 +389,8 @@ public class MockNodeManager implements NodeManager {
* @return SCMheartbeat response list
*/
@Override
- public List<SCMCommand> sendHeartbeat(
- HddsProtos.DatanodeDetailsProto datanodeDetails,
- SCMNodeReport nodeReport) {
+ public List<SCMCommand> sendHeartbeat(DatanodeDetails datanodeDetails,
+ NodeReportProto nodeReport) {
if ((datanodeDetails != null) && (nodeReport != null) && (nodeReport
.getStorageReportCount() > 0)) {
SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid());
@@ -398,8 +398,9 @@ public class MockNodeManager implements NodeManager {
long totalCapacity = 0L;
long totalRemaining = 0L;
long totalScmUsed = 0L;
- List<SCMStorageReport> storageReports = nodeReport.getStorageReportList();
- for (SCMStorageReport report : storageReports) {
+ List<StorageReportProto> storageReports = nodeReport
+ .getStorageReportList();
+ for (StorageReportProto report : storageReports) {
totalCapacity += report.getCapacity();
totalRemaining += report.getRemaining();
totalScmUsed += report.getScmUsed();
@@ -407,8 +408,7 @@ public class MockNodeManager implements NodeManager {
aggregateStat.subtract(stat);
stat.set(totalCapacity, totalScmUsed, totalRemaining);
aggregateStat.add(stat);
- nodeMetricMap.put(DatanodeDetails
- .getFromProtoBuf(datanodeDetails).getUuid(), stat);
+ nodeMetricMap.put(datanodeDetails.getUuid(), stat);
}
return null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
index f318316..ba2ab64 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
@@ -191,8 +191,6 @@ public class TestContainerMapping {
public void testFullContainerReport() throws IOException {
ContainerInfo info = createContainer();
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
- ContainerReportsRequestProto.reportType reportType =
- ContainerReportsRequestProto.reportType.fullReport;
List<StorageContainerDatanodeProtocolProtos.ContainerInfo> reports =
new ArrayList<>();
StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
@@ -209,12 +207,11 @@ public class TestContainerMapping {
reports.add(ciBuilder.build());
- ContainerReportsRequestProto.Builder crBuilder =
- ContainerReportsRequestProto.newBuilder();
- crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
- .setType(reportType).addAllReports(reports);
+ ContainerReportsProto.Builder crBuilder = ContainerReportsProto
+ .newBuilder();
+ crBuilder.addAllReports(reports);
- mapping.processContainerReports(crBuilder.build());
+ mapping.processContainerReports(datanodeDetails, crBuilder.build());
ContainerInfo updatedContainer =
mapping.getContainer(info.getContainerID());
@@ -227,8 +224,6 @@ public class TestContainerMapping {
public void testContainerCloseWithContainerReport() throws IOException {
ContainerInfo info = createContainer();
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
- ContainerReportsRequestProto.reportType reportType =
- ContainerReportsRequestProto.reportType.fullReport;
List<StorageContainerDatanodeProtocolProtos.ContainerInfo> reports =
new ArrayList<>();
@@ -246,12 +241,11 @@ public class TestContainerMapping {
reports.add(ciBuilder.build());
- ContainerReportsRequestProto.Builder crBuilder =
- ContainerReportsRequestProto.newBuilder();
- crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
- .setType(reportType).addAllReports(reports);
+ ContainerReportsProto.Builder crBuilder =
+ ContainerReportsProto.newBuilder();
+ crBuilder.addAllReports(reports);
- mapping.processContainerReports(crBuilder.build());
+ mapping.processContainerReports(datanodeDetails, crBuilder.build());
ContainerInfo updatedContainer =
mapping.getContainer(info.getContainerID());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
index 15ecbad..0a3efda 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
@@ -199,9 +199,8 @@ public class TestContainerCloser {
private void sendContainerReport(ContainerInfo info, long used) throws
IOException {
- ContainerReportsRequestProto.Builder
- reports = ContainerReportsRequestProto.newBuilder();
- reports.setType(ContainerReportsRequestProto.reportType.fullReport);
+ ContainerReportsProto.Builder
+ reports = ContainerReportsProto.newBuilder();
StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
@@ -214,9 +213,8 @@ public class TestContainerCloser {
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L);
- reports.setDatanodeDetails(
- TestUtils.getDatanodeDetails().getProtoBufMessage());
reports.addReports(ciBuilder);
- mapping.processContainerReports(reports.build());
+ mapping.processContainerReports(TestUtils.getDatanodeDetails(),
+ reports.build());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 09b6cd1..5ad28f6 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+ .StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.test.GenericTestUtils;
@@ -133,9 +133,9 @@ public class TestContainerPlacement {
for (DatanodeDetails datanodeDetails : datanodes) {
String id = UUID.randomUUID().toString();
String path = testDir.getAbsolutePath() + "/" + id;
- List<SCMStorageReport> reports = TestUtils
+ List<StorageReportProto> reports = TestUtils
.createStorageReport(capacity, used, remaining, path, null, id, 1);
- nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+ nodeManager.sendHeartbeat(datanodeDetails,
TestUtils.createNodeReport(reports));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
index de87e50..2b04d6b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.node;
import com.google.common.base.Supplier;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
@@ -26,7 +28,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+ .StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.test.GenericTestUtils;
@@ -63,8 +65,6 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
.HEALTHY;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
-import static org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCmdType;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.assertEquals;
@@ -144,7 +144,7 @@ public class TestNodeManager {
for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) {
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
nodeManager);
- nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+ nodeManager.sendHeartbeat(datanodeDetails,
null);
}
@@ -191,8 +191,8 @@ public class TestNodeManager {
// Need 100 nodes to come out of chill mode, only one node is sending HB.
nodeManager.setMinimumChillModeNodes(100);
- nodeManager.sendHeartbeat(TestUtils.getDatanodeDetails(nodeManager)
- .getProtoBufMessage(), null);
+ nodeManager.sendHeartbeat(TestUtils.getDatanodeDetails(nodeManager),
+ null);
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000);
assertFalse("Not enough heartbeat, Node manager should have" +
@@ -219,7 +219,7 @@ public class TestNodeManager {
// Send 10 heartbeat from same node, and assert we never leave chill mode.
for (int x = 0; x < 10; x++) {
- nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+ nodeManager.sendHeartbeat(datanodeDetails,
null);
}
@@ -250,7 +250,7 @@ public class TestNodeManager {
nodeManager.close();
// These should never be processed.
- nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+ nodeManager.sendHeartbeat(datanodeDetails,
null);
// Let us just wait for 2 seconds to prove that HBs are not processed.
@@ -274,13 +274,13 @@ public class TestNodeManager {
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
String dnId = datanodeDetails.getUuidString();
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
- List<SCMStorageReport> reports =
+ List<StorageReportProto> reports =
TestUtils.createStorageReport(100, 10, 90, storagePath, null, dnId, 1);
try (SCMNodeManager nodemanager = createNodeManager(conf)) {
- nodemanager.register(datanodeDetails.getProtoBufMessage(),
+ nodemanager.register(datanodeDetails,
TestUtils.createNodeReport(reports));
List<SCMCommand> command = nodemanager.sendHeartbeat(
- datanodeDetails.getProtoBufMessage(), null);
+ datanodeDetails, null);
Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails));
Assert.assertTrue("On regular HB calls, SCM responses a "
+ "datanode with an empty command list", command.isEmpty());
@@ -298,10 +298,10 @@ public class TestNodeManager {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
List<SCMCommand> command =
- nodemanager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+ nodemanager.sendHeartbeat(datanodeDetails,
null);
return command.size() == 1 && command.get(0).getType()
- .equals(SCMCmdType.reregisterCommand);
+ .equals(SCMCommandProto.Type.reregisterCommand);
}
}, 100, 3 * 1000);
} catch (TimeoutException e) {
@@ -330,7 +330,7 @@ public class TestNodeManager {
for (int x = 0; x < count; x++) {
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
nodeManager);
- nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+ nodeManager.sendHeartbeat(datanodeDetails,
null);
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
@@ -422,19 +422,19 @@ public class TestNodeManager {
DatanodeDetails staleNode = TestUtils.getDatanodeDetails(nodeManager);
// Heartbeat once
- nodeManager.sendHeartbeat(staleNode.getProtoBufMessage(),
+ nodeManager.sendHeartbeat(staleNode,
null);
// Heartbeat all other nodes.
for (DatanodeDetails dn : nodeList) {
- nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
+ nodeManager.sendHeartbeat(dn, null);
}
// Wait for 2 seconds .. and heartbeat good nodes again.
Thread.sleep(2 * 1000);
for (DatanodeDetails dn : nodeList) {
- nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
+ nodeManager.sendHeartbeat(dn, null);
}
// Wait for 2 seconds, wait a total of 4 seconds to make sure that the
@@ -451,7 +451,7 @@ public class TestNodeManager {
// heartbeat good nodes again.
for (DatanodeDetails dn : nodeList) {
- nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
+ nodeManager.sendHeartbeat(dn, null);
}
// 6 seconds is the dead window for this test , so we wait a total of
@@ -565,11 +565,11 @@ public class TestNodeManager {
DatanodeDetails deadNode =
TestUtils.getDatanodeDetails(nodeManager);
nodeManager.sendHeartbeat(
- healthyNode.getProtoBufMessage(), null);
+ healthyNode, null);
nodeManager.sendHeartbeat(
- staleNode.getProtoBufMessage(), null);
+ staleNode, null);
nodeManager.sendHeartbeat(
- deadNode.getProtoBufMessage(), null);
+ deadNode, null);
// Sleep so that heartbeat processing thread gets to run.
Thread.sleep(500);
@@ -596,15 +596,15 @@ public class TestNodeManager {
*/
nodeManager.sendHeartbeat(
- healthyNode.getProtoBufMessage(), null);
+ healthyNode, null);
nodeManager.sendHeartbeat(
- staleNode.getProtoBufMessage(), null);
+ staleNode, null);
nodeManager.sendHeartbeat(
- deadNode.getProtoBufMessage(), null);
+ deadNode, null);
Thread.sleep(1500);
nodeManager.sendHeartbeat(
- healthyNode.getProtoBufMessage(), null);
+ healthyNode, null);
Thread.sleep(2 * 1000);
assertEquals(1, nodeManager.getNodeCount(HEALTHY));
@@ -625,12 +625,12 @@ public class TestNodeManager {
*/
nodeManager.sendHeartbeat(
- healthyNode.getProtoBufMessage(), null);
+ healthyNode, null);
nodeManager.sendHeartbeat(
- staleNode.getProtoBufMessage(), null);
+ staleNode, null);
Thread.sleep(1500);
nodeManager.sendHeartbeat(
- healthyNode.getProtoBufMessage(), null);
+ healthyNode, null);
Thread.sleep(2 * 1000);
// 3.5 seconds have elapsed for stale node, so it moves into Stale.
@@ -664,11 +664,11 @@ public class TestNodeManager {
* back all the nodes in healthy state.
*/
nodeManager.sendHeartbeat(
- healthyNode.getProtoBufMessage(), null);
+ healthyNode, null);
nodeManager.sendHeartbeat(
- staleNode.getProtoBufMessage(), null);
+ staleNode, null);
nodeManager.sendHeartbeat(
- deadNode.getProtoBufMessage(), null);
+ deadNode, null);
Thread.sleep(500);
//Assert all nodes are healthy.
assertEquals(3, nodeManager.getAllNodes().size());
@@ -689,7 +689,7 @@ public class TestNodeManager {
int sleepDuration) throws InterruptedException {
while (!Thread.currentThread().isInterrupted()) {
for (DatanodeDetails dn : list) {
- manager.sendHeartbeat(dn.getProtoBufMessage(), null);
+ manager.sendHeartbeat(dn, null);
}
Thread.sleep(sleepDuration);
}
@@ -775,7 +775,7 @@ public class TestNodeManager {
// No Thread just one time HBs the node manager, so that these will be
// marked as dead nodes eventually.
for (DatanodeDetails dn : deadNodeList) {
- nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
+ nodeManager.sendHeartbeat(dn, null);
}
@@ -940,7 +940,7 @@ public class TestNodeManager {
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
nodeManager);
nodeManager.sendHeartbeat(
- datanodeDetails.getProtoBufMessage(), null);
+ datanodeDetails, null);
String status = nodeManager.getChillModeStatus();
Assert.assertThat(status, containsString("Still in chill " +
"mode, waiting on nodes to report in."));
@@ -967,8 +967,7 @@ public class TestNodeManager {
// Assert that node manager force enter cannot be overridden by nodes HBs.
for (int x = 0; x < 20; x++) {
DatanodeDetails datanode = TestUtils.getDatanodeDetails(nodeManager);
- nodeManager.sendHeartbeat(datanode.getProtoBufMessage(),
- null);
+ nodeManager.sendHeartbeat(datanode, null);
}
Thread.sleep(500);
@@ -1009,10 +1008,10 @@ public class TestNodeManager {
String dnId = datanodeDetails.getUuidString();
long free = capacity - used;
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
- List<SCMStorageReport> reports = TestUtils
+ List<StorageReportProto> reports = TestUtils
.createStorageReport(capacity, used, free, storagePath,
null, dnId, 1);
- nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+ nodeManager.sendHeartbeat(datanodeDetails,
TestUtils.createNodeReport(reports));
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
@@ -1058,11 +1057,11 @@ public class TestNodeManager {
long scmUsed = x * usedPerHeartbeat;
long remaining = capacity - scmUsed;
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
- List<SCMStorageReport> reports = TestUtils
+ List<StorageReportProto> reports = TestUtils
.createStorageReport(capacity, scmUsed, remaining, storagePath,
null, dnId, 1);
- nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+ nodeManager.sendHeartbeat(datanodeDetails,
TestUtils.createNodeReport(reports));
Thread.sleep(100);
}
@@ -1140,10 +1139,10 @@ public class TestNodeManager {
// Send a new report to bring the dead node back to healthy
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
- List<SCMStorageReport> reports = TestUtils
+ List<StorageReportProto> reports = TestUtils
.createStorageReport(capacity, expectedScmUsed, expectedRemaining,
storagePath, null, dnId, 1);
- nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
+ nodeManager.sendHeartbeat(datanodeDetails,
TestUtils.createNodeReport(reports));
// Wait up to 5 seconds so that the dead node becomes healthy
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java
index b824412..072dee7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java
@@ -21,9 +21,9 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.
- StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+ StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.
- StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+ StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.OzoneConsts;
@@ -134,7 +134,7 @@ public class TestSCMNodeStorageStatMap {
@Test
public void testProcessNodeReportCheckOneNode() throws IOException {
UUID key = getFirstKey();
- List<SCMStorageReport> reportList = new ArrayList<>();
+ List<StorageReportProto> reportList = new ArrayList<>();
Set<StorageLocationReport> reportSet = testData.get(key);
SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf);
map.insertNewDatanode(key, reportSet);
@@ -146,16 +146,16 @@ public class TestSCMNodeStorageStatMap {
long reportCapacity = report.getCapacity();
long reportScmUsed = report.getScmUsed();
long reportRemaining = report.getRemaining();
- List<SCMStorageReport> reports = TestUtils
+ List<StorageReportProto> reports = TestUtils
.createStorageReport(reportCapacity, reportScmUsed, reportRemaining,
path, null, storageId, 1);
StorageReportResult result =
map.processNodeReport(key, TestUtils.createNodeReport(reports));
Assert.assertEquals(result.getStatus(),
SCMNodeStorageStatMap.ReportStatus.ALL_IS_WELL);
- StorageContainerDatanodeProtocolProtos.SCMNodeReport.Builder nrb =
- SCMNodeReport.newBuilder();
- SCMStorageReport srb = reportSet.iterator().next().getProtoBufMessage();
+ StorageContainerDatanodeProtocolProtos.NodeReportProto.Builder nrb =
+ NodeReportProto.newBuilder();
+ StorageReportProto srb = reportSet.iterator().next().getProtoBufMessage();
reportList.add(srb);
result = map.processNodeReport(key, TestUtils.createNodeReport(reportList));
Assert.assertEquals(result.getStatus(),
@@ -168,7 +168,7 @@ public class TestSCMNodeStorageStatMap {
Assert.assertEquals(result.getStatus(),
SCMNodeStorageStatMap.ReportStatus.STORAGE_OUT_OF_SPACE);
// Mark a disk failed
- SCMStorageReport srb2 = SCMStorageReport.newBuilder()
+ StorageReportProto srb2 = StorageReportProto.newBuilder()
.setStorageUuid(UUID.randomUUID().toString())
.setStorageLocation(srb.getStorageLocation()).setScmUsed(reportCapacity)
.setCapacity(reportCapacity).setRemaining(0).setFailed(true).build();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 1d92cdc..34779da 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -20,22 +20,21 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.VersionInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos;
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
+ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+ .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
+ .StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ipc.RPC;
@@ -200,7 +199,7 @@ public class TestEndPoint {
DatanodeDetails nodeToRegister = getDatanodeDetails();
try (EndpointStateMachine rpcEndPoint = createEndpoint(
SCMTestUtils.getConf(), serverAddress, 1000)) {
- SCMRegisteredCmdResponseProto responseProto = rpcEndPoint.getEndPoint()
+ SCMRegisteredResponseProto responseProto = rpcEndPoint.getEndPoint()
.register(nodeToRegister.getProtoBufMessage(), TestUtils
.createNodeReport(
getStorageReports(nodeToRegister.getUuidString())),
@@ -215,7 +214,7 @@ public class TestEndPoint {
}
}
- private List<SCMStorageReport> getStorageReports(String id) {
+ private List<StorageReportProto> getStorageReports(String id) {
String storagePath = testDir.getAbsolutePath() + "/" + id;
return TestUtils.createStorageReport(100, 10, 90, storagePath, null, id, 1);
}
@@ -293,9 +292,14 @@ public class TestEndPoint {
createEndpoint(SCMTestUtils.getConf(),
serverAddress, 1000)) {
String storageId = UUID.randomUUID().toString();
+ SCMHeartbeatRequestProto request = SCMHeartbeatRequestProto.newBuilder()
+ .setDatanodeDetails(dataNode.getProtoBufMessage())
+ .setNodeReport(TestUtils.createNodeReport(
+ getStorageReports(storageId)))
+ .build();
+
SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
- .sendHeartbeat(dataNode.getProtoBufMessage(),
- TestUtils.createNodeReport(getStorageReports(storageId)));
+ .sendHeartbeat(request);
Assert.assertNotNull(responseProto);
Assert.assertEquals(0, responseProto.getCommandsCount());
}
@@ -361,86 +365,11 @@ public class TestEndPoint {
lessThanOrEqualTo(rpcTimeout + tolerance));
}
- /**
- * Returns a new container report.
- * @return
- */
- ContainerReport getRandomContainerReport() {
- return new ContainerReport(RandomUtils.nextLong(),
- DigestUtils.sha256Hex("Random"));
- }
-
- /**
- * Creates dummy container reports.
- * @param count - The number of closed containers to create.
- * @return ContainerReportsProto
- */
- StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto
- createDummyContainerReports(int count) {
- StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder
- reportsBuilder = StorageContainerDatanodeProtocolProtos
- .ContainerReportsRequestProto.newBuilder();
- for (int x = 0; x < count; x++) {
- reportsBuilder.addReports(getRandomContainerReport()
- .getProtoBufMessage());
- }
- reportsBuilder.setDatanodeDetails(getDatanodeDetails()
- .getProtoBufMessage());
- reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
- .ContainerReportsRequestProto.reportType.fullReport);
- return reportsBuilder.build();
- }
-
- /**
- * Tests that rpcEndpoint sendContainerReport works as expected.
- * @throws Exception
- */
- @Test
- public void testContainerReportSend() throws Exception {
- final int count = 1000;
- scmServerImpl.reset();
- try (EndpointStateMachine rpcEndPoint =
- createEndpoint(SCMTestUtils.getConf(),
- serverAddress, 1000)) {
- ContainerReportsResponseProto responseProto = rpcEndPoint
- .getEndPoint().sendContainerReport(createDummyContainerReports(
- count));
- Assert.assertNotNull(responseProto);
- }
- Assert.assertEquals(1, scmServerImpl.getContainerReportsCount());
- Assert.assertEquals(count, scmServerImpl.getContainerCount());
- }
-
-
- /**
- * Tests that rpcEndpoint sendContainerReport works as expected.
- * @throws Exception
- */
- @Test
- public void testContainerReport() throws Exception {
- final int count = 1000;
- scmServerImpl.reset();
- try (EndpointStateMachine rpcEndPoint =
- createEndpoint(SCMTestUtils.getConf(),
- serverAddress, 1000)) {
- ContainerReportsResponseProto responseProto = rpcEndPoint
- .getEndPoint().sendContainerReport(createContainerReport(count,
- null));
- Assert.assertNotNull(responseProto);
- }
- Assert.assertEquals(1, scmServerImpl.getContainerReportsCount());
- Assert.assertEquals(count, scmServerImpl.getContainerCount());
- final long expectedKeyCount = count * 1000;
- Assert.assertEquals(expectedKeyCount, scmServerImpl.getKeyCount());
- final long expectedBytesUsed = count * OzoneConsts.GB * 2;
- Assert.assertEquals(expectedBytesUsed, scmServerImpl.getBytesUsed());
- }
-
- private ContainerReportsRequestProto createContainerReport(
+ private ContainerReportsProto createContainerReport(
int count, DatanodeDetails datanodeDetails) {
- StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder
+ StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder
reportsBuilder = StorageContainerDatanodeProtocolProtos
- .ContainerReportsRequestProto.newBuilder();
+ .ContainerReportsProto.newBuilder();
for (int x = 0; x < count; x++) {
long containerID = RandomUtils.nextLong();
ContainerReport report = new ContainerReport(containerID,
@@ -455,14 +384,6 @@ public class TestEndPoint {
reportsBuilder.addReports(report.getProtoBufMessage());
}
- if(datanodeDetails == null) {
- reportsBuilder.setDatanodeDetails(getDatanodeDetails()
- .getProtoBufMessage());
- } else {
- reportsBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage());
- }
- reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
- .ContainerReportsRequestProto.reportType.fullReport);
return reportsBuilder.build();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java
deleted file mode 100644
index e197886..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.container.replication.ContainerSupervisor;
-import org.apache.hadoop.hdds.scm.container.replication.InProgressPool;
-import org.apache.hadoop.hdds.scm.node.CommandQueue;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.node.NodePoolManager;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.ozone.container.common.SCMTestUtils;
-import org.apache.hadoop.ozone.container.testutils
- .ReplicationDatanodeStateManager;
-import org.apache.hadoop.ozone.container.testutils.ReplicationNodeManagerMock;
-import org.apache.hadoop.ozone.container.testutils
- .ReplicationNodePoolManagerMock;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.event.Level;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
- .HEALTHY;
-import static org.apache.ratis.shaded.com.google.common.util.concurrent
- .Uninterruptibles.sleepUninterruptibly;
-
-/**
- * Tests for the container manager.
- */
-public class TestContainerSupervisor {
- final static String POOL_NAME_TEMPLATE = "Pool%d";
- static final int MAX_DATANODES = 72;
- static final int POOL_SIZE = 24;
- static final int POOL_COUNT = 3;
- private LogCapturer logCapturer = LogCapturer.captureLogs(
- LogFactory.getLog(ContainerSupervisor.class));
- private List<DatanodeDetails> datanodes = new LinkedList<>();
- private NodeManager nodeManager;
- private NodePoolManager poolManager;
- private CommandQueue commandQueue;
- private ContainerSupervisor containerSupervisor;
- private ReplicationDatanodeStateManager datanodeStateManager;
-
- @After
- public void tearDown() throws Exception {
- logCapturer.stopCapturing();
- GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.INFO);
- }
-
- @Before
- public void setUp() throws Exception {
- GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.DEBUG);
- Map<DatanodeDetails, NodeState> nodeStateMap = new HashMap<>();
- // We are setting up 3 pools with 24 nodes each in this cluster.
- // First we create 72 Datanodes.
- for (int x = 0; x < MAX_DATANODES; x++) {
- DatanodeDetails datanode = TestUtils.getDatanodeDetails();
- datanodes.add(datanode);
- nodeStateMap.put(datanode, HEALTHY);
- }
-
- commandQueue = new CommandQueue();
-
- // All nodes in this cluster are healthy for time being.
- nodeManager = new ReplicationNodeManagerMock(nodeStateMap, commandQueue);
- poolManager = new ReplicationNodePoolManagerMock();
-
-
- Assert.assertEquals("Max datanodes should be equal to POOL_SIZE * " +
- "POOL_COUNT", POOL_COUNT * POOL_SIZE, MAX_DATANODES);
-
- // Start from 1 instead of zero so we can multiply and get the node index.
- for (int y = 1; y <= POOL_COUNT; y++) {
- String poolName = String.format(POOL_NAME_TEMPLATE, y);
- for (int z = 0; z < POOL_SIZE; z++) {
- DatanodeDetails id = datanodes.get(y * z);
- poolManager.addNode(poolName, id);
- }
- }
- OzoneConfiguration config = SCMTestUtils.getOzoneConf();
- config.setTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, 2,
- TimeUnit.SECONDS);
- config.setTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL, 1,
- TimeUnit.SECONDS);
- containerSupervisor = new ContainerSupervisor(config,
- nodeManager, poolManager);
- datanodeStateManager = new ReplicationDatanodeStateManager(nodeManager,
- poolManager);
- // Sleep for one second to make sure all threads get time to run.
- sleepUninterruptibly(1, TimeUnit.SECONDS);
- }
-
- @Test
- /**
- * Asserts that at least one pool is picked up for processing.
- */
- public void testAssertPoolsAreProcessed() {
- // This asserts that replication manager has started processing at least
- // one pool.
- Assert.assertTrue(containerSupervisor.getInProgressPoolCount() > 0);
-
- // Since all datanodes are flagged as healthy in this test, for each
- // datanode we must have queued a command.
- Assert.assertEquals("Commands are in queue :",
- POOL_SIZE * containerSupervisor.getInProgressPoolCount(),
- commandQueue.getCommandsInQueue());
- }
-
- @Test
- /**
- * This test sends container reports for 2 containers to a pool in progress.
- * Asserts that we are able to find a container with single replica and do
- * not find container with 3 replicas.
- */
- public void testDetectSingleContainerReplica() throws TimeoutException,
- InterruptedException {
- long singleNodeContainerID = 9001;
- long threeNodeContainerID = 9003;
- InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0);
- // Only single datanode reporting that "SingleNodeContainer" exists.
- List<ContainerReportsRequestProto> clist =
- datanodeStateManager.getContainerReport(singleNodeContainerID,
- ppool.getPool().getPoolName(), 1);
- ppool.handleContainerReport(clist.get(0));
-
- // Three nodes are going to report that ThreeNodeContainer exists.
- clist = datanodeStateManager.getContainerReport(threeNodeContainerID,
- ppool.getPool().getPoolName(), 3);
-
- for (ContainerReportsRequestProto reportsProto : clist) {
- ppool.handleContainerReport(reportsProto);
- }
- GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() == 4,
- 200, 1000);
- ppool.setDoneProcessing();
-
- List<Map.Entry<Long, Integer>> containers = ppool.filterContainer(p -> p
- .getValue() == 1);
- Assert.assertEquals(singleNodeContainerID,
- containers.get(0).getKey().longValue());
- int count = containers.get(0).getValue();
- Assert.assertEquals(1L, count);
- }
-
- @Test
- /**
- * We create three containers, Normal,OveReplicated and WayOverReplicated
- * containers. This test asserts that we are able to find the
- * over replicated containers.
- */
- public void testDetectOverReplica() throws TimeoutException,
- InterruptedException {
- long normalContainerID = 9000;
- long overReplicatedContainerID = 9001;
- long wayOverReplicatedContainerID = 9002;
- InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0);
-
- List<ContainerReportsRequestProto> clist =
- datanodeStateManager.getContainerReport(normalContainerID,
- ppool.getPool().getPoolName(), 3);
- ppool.handleContainerReport(clist.get(0));
-
- clist = datanodeStateManager.getContainerReport(overReplicatedContainerID,
- ppool.getPool().getPoolName(), 4);
-
- for (ContainerReportsRequestProto reportsProto : clist) {
- ppool.handleContainerReport(reportsProto);
- }
-
- clist = datanodeStateManager.getContainerReport(
- wayOverReplicatedContainerID, ppool.getPool().getPoolName(), 7);
-
- for (ContainerReportsRequestProto reportsProto : clist) {
- ppool.handleContainerReport(reportsProto);
- }
-
- // We ignore container reports from the same datanodes.
- // it is possible that these each of these containers get placed
- // on same datanodes, so allowing for 4 duplicates in the set of 14.
- GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() > 10,
- 200, 1000);
- ppool.setDoneProcessing();
-
- List<Map.Entry<Long, Integer>> containers = ppool.filterContainer(p -> p
- .getValue() > 3);
- Assert.assertEquals(2, containers.size());
- }
-
- @Test
- /**
- * This test verifies that all pools are picked up for replica processing.
- *
- */
- public void testAllPoolsAreProcessed() throws TimeoutException,
- InterruptedException {
- // Verify that we saw all three pools being picked up for processing.
- GenericTestUtils.waitFor(() -> containerSupervisor.getPoolProcessCount()
- >= 3, 200, 15 * 1000);
- Assert.assertTrue(logCapturer.getOutput().contains("Pool1") &&
- logCapturer.getOutput().contains("Pool2") &&
- logCapturer.getOutput().contains("Pool3"));
- }
-
- @Test
- /**
- * Adds a new pool and tests that we are able to pick up that new pool for
- * processing as well as handle container reports for datanodes in that pool.
- * @throws TimeoutException
- * @throws InterruptedException
- */
- public void testAddingNewPoolWorks()
- throws TimeoutException, InterruptedException, IOException {
- LogCapturer inProgressLog = LogCapturer.captureLogs(
- LogFactory.getLog(InProgressPool.class));
- GenericTestUtils.setLogLevel(InProgressPool.LOG, Level.DEBUG);
- try {
- DatanodeDetails id = TestUtils.getDatanodeDetails();
- ((ReplicationNodeManagerMock) (nodeManager)).addNode(id, HEALTHY);
- poolManager.addNode("PoolNew", id);
- GenericTestUtils.waitFor(() ->
- logCapturer.getOutput().contains("PoolNew"),
- 200, 15 * 1000);
-
- long newContainerID = 7001;
- // Assert that we are able to send a container report to this new
- // pool and datanode.
- List<ContainerReportsRequestProto> clist =
- datanodeStateManager.getContainerReport(newContainerID,
- "PoolNew", 1);
- containerSupervisor.handleContainerReport(clist.get(0));
- GenericTestUtils.waitFor(() ->
- inProgressLog.getOutput()
- .contains(Long.toString(newContainerID)) && inProgressLog
- .getOutput().contains(id.getUuidString()),
- 200, 10 * 1000);
- } finally {
- inProgressLog.stopCapturing();
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org