You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2021/07/14 22:25:54 UTC
[ozone] branch master updated: HDDS-5296. Replication Manager
should not sent replicate command to DN which is not IN_SERVICE (#2377)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c41d08f HDDS-5296. Replication Manager should not sent replicate command to DN which is not IN_SERVICE (#2377)
c41d08f is described below
commit c41d08f40f56d10b2a67ca02c178813ea49cd7a1
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Thu Jul 15 06:25:31 2021 +0800
HDDS-5296. Replication Manager should not sent replicate command to DN which is not IN_SERVICE (#2377)
---
.../common/statemachine/DatanodeStateMachine.java | 2 +-
.../replication/ReplicationSupervisor.java | 33 +++++++--
.../replication/TestReplicationSupervisor.java | 4 +-
.../algorithms/SCMContainerPlacementCapacity.java | 2 +-
.../algorithms/SCMContainerPlacementRackAware.java | 55 +++++++++------
.../algorithms/SCMContainerPlacementRandom.java | 2 +-
.../apache/hadoop/hdds/scm/node/NodeStatus.java | 5 ++
.../algorithms/TestContainerPlacementFactory.java | 35 ++++++----
.../TestSCMContainerPlacementRackAware.java | 81 +++++++++++++---------
.../hdds/scm/node/states/TestNodeStateMap.java | 14 ++--
.../ozone/container/TestContainerReplication.java | 81 +++++++++++++++++++++-
.../apache/hadoop/ozone/container/TestHelper.java | 2 +-
12 files changed, 224 insertions(+), 92 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index a08213f..5446ec5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -164,7 +164,7 @@ public class DatanodeStateMachine implements Closeable {
replicatorMetrics = new MeasuredReplicator(replicator);
supervisor =
- new ReplicationSupervisor(container.getContainerSet(),
+ new ReplicationSupervisor(container.getContainerSet(), context,
replicatorMetrics, dnConf.getReplicationMaxStreams());
// When we add new handlers just adding a new handler here should do the
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index b049171..cb0ce56 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -25,7 +25,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
import com.google.common.annotations.VisibleForTesting;
@@ -44,6 +47,7 @@ public class ReplicationSupervisor {
private final ContainerSet containerSet;
private final ContainerReplicator replicator;
private final ExecutorService executor;
+ private final StateContext context;
private final AtomicLong requestCounter = new AtomicLong();
private final AtomicLong successCounter = new AtomicLong();
@@ -58,20 +62,19 @@ public class ReplicationSupervisor {
@VisibleForTesting
ReplicationSupervisor(
- ContainerSet containerSet, ContainerReplicator replicator,
- ExecutorService executor
- ) {
+ ContainerSet containerSet, StateContext context,
+ ContainerReplicator replicator, ExecutorService executor) {
this.containerSet = containerSet;
this.replicator = replicator;
this.containersInFlight = ConcurrentHashMap.newKeySet();
this.executor = executor;
+ this.context = context;
}
public ReplicationSupervisor(
- ContainerSet containerSet,
- ContainerReplicator replicator, int poolSize
- ) {
- this(containerSet, replicator, new ThreadPoolExecutor(
+ ContainerSet containerSet, StateContext context,
+ ContainerReplicator replicator, int poolSize) {
+ this(containerSet, context, replicator, new ThreadPoolExecutor(
poolSize, poolSize, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
@@ -79,6 +82,11 @@ public class ReplicationSupervisor {
.build()));
}
+ public ReplicationSupervisor(ContainerSet containerSet,
+ ContainerReplicator replicator, int poolSize) {
+ this(containerSet, null, replicator, poolSize);
+ }
+
/**
* Queue an asynchronous download of the given container.
*/
@@ -133,6 +141,17 @@ public class ReplicationSupervisor {
try {
requestCounter.incrementAndGet();
+ if (context != null) {
+ DatanodeDetails dn = context.getParent().getDatanodeDetails();
+ if (dn.getPersistedOpState() !=
+ HddsProtos.NodeOperationalState.IN_SERVICE) {
+ LOG.info("Dn is of {} state. Ignore this replicate container " +
+ "command for container {}", dn.getPersistedOpState(),
+ containerId);
+ return;
+ }
+ }
+
if (containerSet.getContainer(task.getContainerId()) != null) {
LOG.debug("Container {} has already been downloaded.", containerId);
return;
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index f8eb25e..27c6065 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -181,7 +181,7 @@ public class TestReplicationSupervisor {
@Test
public void testDownloadAndImportReplicatorFailure() {
ReplicationSupervisor supervisor =
- new ReplicationSupervisor(set, mutableReplicator,
+ new ReplicationSupervisor(set, null, mutableReplicator,
newDirectExecutorService());
// Mock to fetch an exception in the importContainer method.
@@ -216,7 +216,7 @@ public class TestReplicationSupervisor {
Function<ReplicationSupervisor, ContainerReplicator> replicatorFactory,
ExecutorService executor) {
ReplicationSupervisor supervisor =
- new ReplicationSupervisor(set, mutableReplicator, executor);
+ new ReplicationSupervisor(set, null, mutableReplicator, executor);
replicatorRef.set(replicatorFactory.apply(supervisor));
return supervisor;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
index 7d2db05..47806ed 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
@@ -69,7 +69,7 @@ import org.slf4j.LoggerFactory;
public final class SCMContainerPlacementCapacity
extends SCMCommonPlacementPolicy {
@VisibleForTesting
- static final Logger LOG =
+ public static final Logger LOG =
LoggerFactory.getLogger(SCMContainerPlacementCapacity.class);
/**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
index 113b518..d3eb136 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NetConstants;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +50,7 @@ import java.util.List;
public final class SCMContainerPlacementRackAware
extends SCMCommonPlacementPolicy {
@VisibleForTesting
- static final Logger LOG =
+ public static final Logger LOG =
LoggerFactory.getLogger(SCMContainerPlacementRackAware.class);
private final NetworkTopology networkTopology;
private boolean fallback;
@@ -105,8 +106,8 @@ public final class SCMContainerPlacementRackAware
if (datanodeCount < nodesRequired + excludedNodesCount) {
throw new SCMException("No enough datanodes to choose. " +
"TotalNode = " + datanodeCount +
- "RequiredNode = " + nodesRequired +
- "ExcludedNode = " + excludedNodesCount, null);
+ " RequiredNode = " + nodesRequired +
+ " ExcludedNode = " + excludedNodesCount, null);
}
List<DatanodeDetails> mutableFavoredNodes = favoredNodes;
// sanity check of favoredNodes
@@ -275,28 +276,38 @@ public final class SCMContainerPlacementRackAware
throw new SCMException("No satisfied datanode to meet the" +
" excludedNodes and affinityNode constrains.", null);
}
- if (super.hasEnoughSpace((DatanodeDetails)node, sizeRequired)) {
- LOG.debug("Datanode {} is chosen. Required size is {}",
- node.toString(), sizeRequired);
- metrics.incrDatanodeChooseSuccessCount();
- if (isFallbacked) {
- metrics.incrDatanodeChooseFallbackCount();
- }
- return node;
+
+ DatanodeDetails datanodeDetails = (DatanodeDetails)node;
+ DatanodeInfo datanodeInfo = (DatanodeInfo)getNodeManager()
+ .getNodeByUuid(datanodeDetails.getUuidString());
+ if (datanodeInfo == null) {
+ LOG.error("Failed to find the DatanodeInfo for datanode {}",
+ datanodeDetails);
} else {
- maxRetry--;
- if (maxRetry == 0) {
- // avoid the infinite loop
- String errMsg = "No satisfied datanode to meet the space constrains. "
- + " sizeRequired: " + sizeRequired;
- LOG.info(errMsg);
- throw new SCMException(errMsg, null);
- }
- if (excludedNodesForCapacity == null) {
- excludedNodesForCapacity = new ArrayList<>();
+ if (datanodeInfo.getNodeStatus().isNodeWritable() &&
+ (super.hasEnoughSpace(datanodeInfo, sizeRequired))) {
+ LOG.debug("Datanode {} is chosen. Required storage size is {} bytes",
+ node, sizeRequired);
+ metrics.incrDatanodeChooseSuccessCount();
+ if (isFallbacked) {
+ metrics.incrDatanodeChooseFallbackCount();
+ }
+ return node;
}
- excludedNodesForCapacity.add(node.getNetworkFullPath());
}
+
+ maxRetry--;
+ if (maxRetry == 0) {
+ // avoid the infinite loop
+ String errMsg = "No satisfied datanode to meet the space constrains. "
+ + " sizeRequired: " + sizeRequired;
+ LOG.info(errMsg);
+ throw new SCMException(errMsg, null);
+ }
+ if (excludedNodesForCapacity == null) {
+ excludedNodesForCapacity = new ArrayList<>();
+ }
+ excludedNodesForCapacity.add(node.getNetworkFullPath());
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
index 4927517..b30f767 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
@@ -42,7 +42,7 @@ import java.util.List;
public final class SCMContainerPlacementRandom extends SCMCommonPlacementPolicy
implements PlacementPolicy {
@VisibleForTesting
- static final Logger LOG =
+ public static final Logger LOG =
LoggerFactory.getLogger(SCMContainerPlacementRandom.class);
/**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
index 153af92..a9164c7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
@@ -69,6 +69,11 @@ public class NodeStatus {
HddsProtos.NodeState.DEAD);
}
+ public boolean isNodeWritable() {
+ return health == HddsProtos.NodeState.HEALTHY &&
+ operationalState == HddsProtos.NodeOperationalState.IN_SERVICE;
+ }
+
public HddsProtos.NodeState getHealth() {
return health;
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
index f851287..c37da63 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
@@ -59,7 +59,8 @@ public class TestContainerPlacementFactory {
// network topology cluster
private NetworkTopology cluster;
// datanodes array list
- private List<DatanodeInfo> datanodes = new ArrayList<>();
+ private List<DatanodeDetails> datanodes = new ArrayList<>();
+ private List<DatanodeInfo> dnInfos = new ArrayList<>();
// node storage capacity
private static final long STORAGE_CAPACITY = 100L;
// configuration
@@ -90,9 +91,10 @@ public class TestContainerPlacementFactory {
String hostname = "node";
for (int i = 0; i < 15; i++) {
// Totally 3 racks, each has 5 datanodes
+ DatanodeDetails datanodeDetails = MockDatanodeDetails
+ .createDatanodeDetails(hostname + i, rack + (i / 5));
DatanodeInfo datanodeInfo = new DatanodeInfo(
- MockDatanodeDetails.createDatanodeDetails(
- hostname + i, rack + (i / 5)), NodeStatus.inServiceHealthy(),
+ datanodeDetails, NodeStatus.inServiceHealthy(),
UpgradeUtils.defaultLayoutVersionProto());
StorageReportProto storage1 = TestUtils.createStorageReport(
@@ -107,33 +109,38 @@ public class TestContainerPlacementFactory {
datanodeInfo.updateMetaDataStorageReports(
new ArrayList<>(Arrays.asList(metaStorage1)));
- datanodes.add(datanodeInfo);
- cluster.add(datanodeInfo);
+ datanodes.add(datanodeDetails);
+ cluster.add(datanodeDetails);
+ dnInfos.add(datanodeInfo);
}
StorageReportProto storage2 = TestUtils.createStorageReport(
- datanodes.get(2).getUuid(),
- "/data1-" + datanodes.get(2).getUuidString(),
+ dnInfos.get(2).getUuid(),
+ "/data1-" + dnInfos.get(2).getUuidString(),
STORAGE_CAPACITY, 90L, 10L, null);
- datanodes.get(2).updateStorageReports(
+ dnInfos.get(2).updateStorageReports(
new ArrayList<>(Arrays.asList(storage2)));
StorageReportProto storage3 = TestUtils.createStorageReport(
- datanodes.get(3).getUuid(),
- "/data1-" + datanodes.get(3).getUuidString(),
+ dnInfos.get(3).getUuid(),
+ "/data1-" + dnInfos.get(3).getUuidString(),
STORAGE_CAPACITY, 80L, 20L, null);
- datanodes.get(3).updateStorageReports(
+ dnInfos.get(3).updateStorageReports(
new ArrayList<>(Arrays.asList(storage3)));
StorageReportProto storage4 = TestUtils.createStorageReport(
- datanodes.get(4).getUuid(),
- "/data1-" + datanodes.get(4).getUuidString(),
+ dnInfos.get(4).getUuid(),
+ "/data1-" + dnInfos.get(4).getUuidString(),
STORAGE_CAPACITY, 70L, 30L, null);
- datanodes.get(4).updateStorageReports(
+ dnInfos.get(4).updateStorageReports(
new ArrayList<>(Arrays.asList(storage4)));
// create mock node manager
nodeManager = Mockito.mock(NodeManager.class);
when(nodeManager.getNodes(NodeStatus.inServiceHealthy()))
.thenReturn(new ArrayList<>(datanodes));
+ for (DatanodeInfo dn: dnInfos) {
+ when(nodeManager.getNodeByUuid(dn.getUuidString()))
+ .thenReturn(dn);
+ }
PlacementPolicy policy = ContainerPlacementPolicyFactory
.getPolicy(conf, nodeManager, cluster, true,
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
index 611c204..87e77ff 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
@@ -53,6 +53,7 @@ import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
import org.hamcrest.MatcherAssert;
+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -70,7 +71,8 @@ public class TestSCMContainerPlacementRackAware {
private OzoneConfiguration conf;
private NodeManager nodeManager;
private final Integer datanodeCount;
- private final List<DatanodeInfo> datanodes = new ArrayList<>();
+ private final List<DatanodeDetails> datanodes = new ArrayList<>();
+ private final List<DatanodeInfo> dnInfos = new ArrayList<>();
// policy with fallback capability
private SCMContainerPlacementRackAware policy;
// policy prohibit fallback
@@ -107,10 +109,11 @@ public class TestSCMContainerPlacementRackAware {
String hostname = "node";
for (int i = 0; i < datanodeCount; i++) {
// Totally 3 racks, each has 5 datanodes
- DatanodeInfo datanodeInfo = new DatanodeInfo(
+ DatanodeDetails datanodeDetails =
MockDatanodeDetails.createDatanodeDetails(
- hostname + i, rack + (i / NODE_PER_RACK)),
- NodeStatus.inServiceHealthy(),
+ hostname + i, rack + (i / NODE_PER_RACK));
+ DatanodeInfo datanodeInfo = new DatanodeInfo(
+ datanodeDetails, NodeStatus.inServiceHealthy(),
UpgradeUtils.defaultLayoutVersionProto());
StorageReportProto storage1 = TestUtils.createStorageReport(
@@ -125,48 +128,49 @@ public class TestSCMContainerPlacementRackAware {
datanodeInfo.updateMetaDataStorageReports(
new ArrayList<>(Arrays.asList(metaStorage1)));
- datanodes.add(datanodeInfo);
- cluster.add(datanodeInfo);
+ datanodes.add(datanodeDetails);
+ cluster.add(datanodeDetails);
+ dnInfos.add(datanodeInfo);
}
if (datanodeCount > 4) {
StorageReportProto storage2 = TestUtils.createStorageReport(
- datanodes.get(2).getUuid(),
+ dnInfos.get(2).getUuid(),
"/data1-" + datanodes.get(2).getUuidString(),
STORAGE_CAPACITY, 90L, 10L, null);
- datanodes.get(2).updateStorageReports(
+ dnInfos.get(2).updateStorageReports(
new ArrayList<>(Arrays.asList(storage2)));
StorageReportProto storage3 = TestUtils.createStorageReport(
- datanodes.get(3).getUuid(),
- "/data1-" + datanodes.get(3).getUuidString(),
+ dnInfos.get(3).getUuid(),
+ "/data1-" + dnInfos.get(3).getUuidString(),
STORAGE_CAPACITY, 80L, 20L, null);
- datanodes.get(3).updateStorageReports(
+ dnInfos.get(3).updateStorageReports(
new ArrayList<>(Arrays.asList(storage3)));
StorageReportProto storage4 = TestUtils.createStorageReport(
- datanodes.get(4).getUuid(),
- "/data1-" + datanodes.get(4).getUuidString(),
+ dnInfos.get(4).getUuid(),
+ "/data1-" + dnInfos.get(4).getUuidString(),
STORAGE_CAPACITY, 70L, 30L, null);
- datanodes.get(4).updateStorageReports(
+ dnInfos.get(4).updateStorageReports(
new ArrayList<>(Arrays.asList(storage4)));
} else if (datanodeCount > 3) {
StorageReportProto storage2 = TestUtils.createStorageReport(
- datanodes.get(2).getUuid(),
- "/data1-" + datanodes.get(2).getUuidString(),
+ dnInfos.get(2).getUuid(),
+ "/data1-" + dnInfos.get(2).getUuidString(),
STORAGE_CAPACITY, 90L, 10L, null);
- datanodes.get(2).updateStorageReports(
+ dnInfos.get(2).updateStorageReports(
new ArrayList<>(Arrays.asList(storage2)));
StorageReportProto storage3 = TestUtils.createStorageReport(
- datanodes.get(3).getUuid(),
- "/data1-" + datanodes.get(3).getUuidString(),
+ dnInfos.get(3).getUuid(),
+ "/data1-" + dnInfos.get(3).getUuidString(),
STORAGE_CAPACITY, 80L, 20L, null);
- datanodes.get(3).updateStorageReports(
+ dnInfos.get(3).updateStorageReports(
new ArrayList<>(Arrays.asList(storage3)));
} else if (datanodeCount > 2) {
StorageReportProto storage2 = TestUtils.createStorageReport(
- datanodes.get(2).getUuid(),
- "/data1-" + datanodes.get(2).getUuidString(),
+ dnInfos.get(2).getUuid(),
+ "/data1-" + dnInfos.get(2).getUuidString(),
STORAGE_CAPACITY, 84L, 16L, null);
- datanodes.get(2).updateStorageReports(
+ dnInfos.get(2).updateStorageReports(
new ArrayList<>(Arrays.asList(storage2)));
}
@@ -174,6 +178,10 @@ public class TestSCMContainerPlacementRackAware {
nodeManager = Mockito.mock(NodeManager.class);
when(nodeManager.getNodes(NodeStatus.inServiceHealthy()))
.thenReturn(new ArrayList<>(datanodes));
+ for (DatanodeInfo dn: dnInfos) {
+ when(nodeManager.getNodeByUuid(dn.getUuidString()))
+ .thenReturn(dn);
+ }
when(nodeManager.getClusterNetworkTopologyMap())
.thenReturn(cluster);
@@ -400,33 +408,40 @@ public class TestSCMContainerPlacementRackAware {
@Test
public void testDatanodeWithDefaultNetworkLocation() throws SCMException {
String hostname = "node";
- List<DatanodeInfo> dataList = new ArrayList<>();
+ List<DatanodeInfo> dnInfoList = new ArrayList<>();
+ List<DatanodeDetails> dataList = new ArrayList<>();
NetworkTopology clusterMap =
new NetworkTopologyImpl(NodeSchemaManager.getInstance());
for (int i = 0; i < 15; i++) {
// Totally 3 racks, each has 5 datanodes
- DatanodeInfo datanodeInfo = new DatanodeInfo(
- MockDatanodeDetails.createDatanodeDetails(
- hostname + i, null), NodeStatus.inServiceHealthy(),
+ DatanodeDetails dn = MockDatanodeDetails.createDatanodeDetails(
+ hostname + i, null);
+ DatanodeInfo dnInfo = new DatanodeInfo(
+ dn, NodeStatus.inServiceHealthy(),
UpgradeUtils.defaultLayoutVersionProto());
StorageReportProto storage1 = TestUtils.createStorageReport(
- datanodeInfo.getUuid(), "/data1-" + datanodeInfo.getUuidString(),
+ dnInfo.getUuid(), "/data1-" + dnInfo.getUuidString(),
STORAGE_CAPACITY, 0, 100L, null);
MetadataStorageReportProto metaStorage1 =
TestUtils.createMetadataStorageReport(
- "/metadata1-" + datanodeInfo.getUuidString(),
+ "/metadata1-" + dnInfo.getUuidString(),
STORAGE_CAPACITY, 0, 100L, null);
- datanodeInfo.updateStorageReports(
+ dnInfo.updateStorageReports(
new ArrayList<>(Arrays.asList(storage1)));
- datanodeInfo.updateMetaDataStorageReports(
+ dnInfo.updateMetaDataStorageReports(
new ArrayList<>(Arrays.asList(metaStorage1)));
- dataList.add(datanodeInfo);
- clusterMap.add(datanodeInfo);
+ dataList.add(dn);
+ clusterMap.add(dn);
+ dnInfoList.add(dnInfo);
}
Assert.assertEquals(dataList.size(), StringUtils.countMatches(
clusterMap.toString(), NetConstants.DEFAULT_RACK));
+ for (DatanodeInfo dn: dnInfoList) {
+ when(nodeManager.getNodeByUuid(dn.getUuidString()))
+ .thenReturn(dn);
+ }
// choose nodes to host 3 replica
int nodeNum = 3;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
index b59f806..4de5eea 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
@@ -127,18 +127,16 @@ public class TestNodeStateMap {
*/
@Test
public void testConcurrency() throws Exception {
- NodeStateMap nodeStateMap = new NodeStateMap();
-
final DatanodeDetails datanodeDetails =
MockDatanodeDetails.randomDatanodeDetails();
- nodeStateMap.addNode(datanodeDetails, NodeStatus.inServiceHealthy(), null);
+ map.addNode(datanodeDetails, NodeStatus.inServiceHealthy(), null);
UUID dnUuid = datanodeDetails.getUuid();
- nodeStateMap.addContainer(dnUuid, ContainerID.valueOf(1L));
- nodeStateMap.addContainer(dnUuid, ContainerID.valueOf(2L));
- nodeStateMap.addContainer(dnUuid, ContainerID.valueOf(3L));
+ map.addContainer(dnUuid, ContainerID.valueOf(1L));
+ map.addContainer(dnUuid, ContainerID.valueOf(2L));
+ map.addContainer(dnUuid, ContainerID.valueOf(3L));
CountDownLatch elementRemoved = new CountDownLatch(1);
CountDownLatch loopStarted = new CountDownLatch(1);
@@ -146,7 +144,7 @@ public class TestNodeStateMap {
new Thread(() -> {
try {
loopStarted.await();
- nodeStateMap.removeContainer(dnUuid, ContainerID.valueOf(1L));
+ map.removeContainer(dnUuid, ContainerID.valueOf(1L));
elementRemoved.countDown();
} catch (Exception e) {
e.printStackTrace();
@@ -155,7 +153,7 @@ public class TestNodeStateMap {
}).start();
boolean first = true;
- for (ContainerID key : nodeStateMap.getContainers(dnUuid)) {
+ for (ContainerID key : map.getContainers(dnUuid)) {
if (first) {
loopStarted.countDown();
elementRemoved.await();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
index 8562554..f0cdc49 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
@@ -20,21 +20,31 @@ package org.apache.hadoop.ozone.container;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import static org.apache.hadoop.ozone.container.TestHelper.waitForContainerClose;
import static org.apache.hadoop.ozone.container.TestHelper.waitForReplicaCount;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.OutputStream;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackAware;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.node.NodeDecommissionManager;
+import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
@@ -46,22 +56,27 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.ozone.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.event.Level;
/**
* Tests ozone containers replication.
*/
+@RunWith(Parameterized.class)
public class TestContainerReplication {
/**
* Set the timeout for every test.
*/
@Rule
- public Timeout testTimeout = Timeout.seconds(300);;
+ public Timeout testTimeout = Timeout.seconds(300);
private static final String VOLUME = "vol1";
private static final String BUCKET = "bucket1";
@@ -69,12 +84,32 @@ public class TestContainerReplication {
private MiniOzoneCluster cluster;
private OzoneClient client;
+ private String placementPolicyClass;
+
+ @Parameterized.Parameters
+ public static List<String> parameters() {
+ List<String> classes = new ArrayList<>();
+ classes.add(SCMContainerPlacementRackAware.class.getCanonicalName());
+ classes.add(SCMContainerPlacementCapacity.class.getCanonicalName());
+ classes.add(SCMContainerPlacementRandom.class.getCanonicalName());
+ return classes;
+ }
+
+ public TestContainerReplication(String placementPolicy) {
+ this.placementPolicyClass = placementPolicy;
+ }
@Before
public void setUp() throws Exception {
+ GenericTestUtils.setLogLevel(SCMContainerPlacementRandom.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(SCMContainerPlacementCapacity.LOG,
+ Level.DEBUG);
+ GenericTestUtils.setLogLevel(SCMContainerPlacementRackAware.LOG,
+ Level.DEBUG);
OzoneConfiguration conf = createConfiguration();
+ conf.set(OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, placementPolicyClass);
- cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).build();
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
cluster.waitForClusterToBeReady();
client = OzoneClientFactory.getRpcClient(conf);
@@ -103,6 +138,48 @@ public class TestContainerReplication {
waitForReplicaCount(containerID, 3, cluster);
}
+ @Test
+ public void testSkipDecommissionAndMaintenanceNode() throws Exception {
+ List<OmKeyLocationInfo> keyLocations = lookupKey(cluster);
+ assertFalse(keyLocations.isEmpty());
+
+ OmKeyLocationInfo keyLocation = keyLocations.get(0);
+ long containerID = keyLocation.getContainerID();
+ waitForContainerClose(cluster, containerID);
+
+ // Mark other two DN Decommission and Maintenance
+ NodeDecommissionManager decommissionManager =
+ cluster.getStorageContainerManager().getScmDecommissionManager();
+ boolean deCommission = true;
+ for (HddsDatanodeService d1 : cluster.getHddsDatanodes()) {
+ boolean match = false;
+ for (DatanodeDetails d2 : keyLocations.get(0).getPipeline().getNodes()) {
+ if (d1.getDatanodeDetails().equals(d2)) {
+ match = true;
+ break;
+ }
+ }
+ if (!match) {
+ if (deCommission) {
+ decommissionManager.startDecommission(d1.getDatanodeDetails());
+ deCommission = false;
+ } else {
+ decommissionManager.startMaintenance(d1.getDatanodeDetails(), 1);
+ }
+ }
+ }
+
+ cluster.shutdownHddsDatanode(keyLocation.getPipeline().getFirstNode());
+
+ waitForReplicaCount(containerID, 2, cluster);
+ try {
+ waitForReplicaCount(containerID, 3, cluster);
+ fail("Replication should not succeed without extra IN_SERVICE nodes");
+ } catch (TimeoutException e) {
+ Assert.assertTrue(TestHelper.countReplicas(containerID, cluster) == 2);
+ }
+ }
+
private static OzoneConfiguration createConfiguration() {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index 48a9a09..aa50234 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@ -382,6 +382,6 @@ public final class TestHelper {
public static void waitForReplicaCount(long containerID, int count,
MiniOzoneCluster cluster) throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> countReplicas(containerID, cluster) == count,
- 1000, 30_000);
+ 1000, 30000);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org