You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2023/02/15 12:04:46 UTC
[ozone] branch master updated: HDDS-7919. EC: ECPipelineProvider.createForRead should filter out dead replicas and sort replicas (#4277)
This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 5d5d370de3 HDDS-7919. EC: ECPipelineProvider.createForRead should filter out dead replicas and sort replicas (#4277)
5d5d370de3 is described below
commit 5d5d370de3cd4ba9e8b3fa88d69e33f7deaf8d57
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Wed Feb 15 13:04:39 2023 +0100
HDDS-7919. EC: ECPipelineProvider.createForRead should filter out dead replicas and sort replicas (#4277)
---
.../hdds/scm/container/ContainerReplica.java | 12 ++++
.../apache/hadoop/hdds/scm/node/NodeStatus.java | 18 ++++-
.../hdds/scm/pipeline/ECPipelineProvider.java | 25 ++++++-
.../hdds/scm/container/TestContainerReplica.java | 53 +++++++++++++++
.../hadoop/hdds/scm/node/TestNodeStatus.java | 68 +++++++++++++++++++
.../hdds/scm/pipeline/TestECPipelineProvider.java | 76 +++++++++++++++++++++-
6 files changed, 247 insertions(+), 5 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
index 3c1176c015..ab67bff0c7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
@@ -163,6 +163,18 @@ public final class ContainerReplica implements Comparable<ContainerReplica> {
return new ContainerReplicaBuilder();
}
+ public ContainerReplicaBuilder toBuilder() {
+ return newBuilder()
+ .setBytesUsed(bytesUsed)
+ .setContainerID(containerID)
+ .setContainerState(state)
+ .setDatanodeDetails(datanodeDetails)
+ .setKeyCount(keyCount)
+ .setOriginNodeId(placeOfBirth)
+ .setReplicaIndex(replicaIndex)
+ .setSequenceId(sequenceId);
+ }
+
@Override
public String toString() {
return "ContainerReplica{" +
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
index ac56c01ea7..68cb3f4be7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
@@ -28,7 +28,7 @@ import java.util.Objects;
* in_service, decommissioned and maintenance mode) along with the expiry time
* for the operational state (used with maintenance mode).
*/
-public class NodeStatus {
+public class NodeStatus implements Comparable<NodeStatus> {
private HddsProtos.NodeOperationalState operationalState;
private HddsProtos.NodeState health;
@@ -93,6 +93,10 @@ public class NodeStatus {
return System.currentTimeMillis() / 1000 >= opStateExpiryEpochSeconds;
}
+ public boolean isInService() {
+ return operationalState == HddsProtos.NodeOperationalState.IN_SERVICE;
+ }
+
/**
* Returns true if the nodeStatus indicates the node is in any decommission
* state.
@@ -214,4 +218,16 @@ public class NodeStatus {
" OperationStateExpiry: " + opStateExpiryEpochSeconds;
}
+ @Override
+ public int compareTo(NodeStatus o) {
+ int order = Boolean.compare(o.isHealthy(), isHealthy());
+ if (order == 0) {
+ order = Boolean.compare(isDead(), o.isDead());
+ }
+ if (order == 0) {
+ order = operationalState.compareTo(o.operationalState);
+ }
+ return order;
+ }
+
}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
index 11c23f1072..c7bf6819a7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
@@ -25,10 +25,15 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -39,6 +44,9 @@ import java.util.Set;
*/
public class ECPipelineProvider extends PipelineProvider<ECReplicationConfig> {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ECPipelineProvider.class);
+
// TODO - EC Placement Policy. Standard Network Aware topology will not work
// for EC as it stands. We may want an "as many racks as possible"
// policy. HDDS-5326.
@@ -96,11 +104,24 @@ public class ECPipelineProvider extends PipelineProvider<ECReplicationConfig> {
Set<ContainerReplica> replicas) {
Map<DatanodeDetails, Integer> map = new HashMap<>();
List<DatanodeDetails> dns = new ArrayList<>(replicas.size());
+ Map<DatanodeDetails, NodeStatus> nodeStatusMap = new HashMap<>();
for (ContainerReplica r : replicas) {
- map.put(r.getDatanodeDetails(), r.getReplicaIndex());
- dns.add(r.getDatanodeDetails());
+ DatanodeDetails dn = r.getDatanodeDetails();
+ try {
+ NodeStatus nodeStatus = getNodeManager().getNodeStatus(dn);
+ if (!nodeStatus.isDead()) {
+ map.put(dn, r.getReplicaIndex());
+ dns.add(dn);
+ nodeStatusMap.put(dn, nodeStatus);
+ }
+ } catch (NodeNotFoundException e) {
+ LOG.error("Node not found", e);
+ }
}
+
+ dns.sort(Comparator.comparing(nodeStatusMap::get));
+
return createPipelineInternal(replicationConfig, dns, map);
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReplica.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReplica.java
new file mode 100644
index 0000000000..a9563aa344
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReplica.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.junit.jupiter.api.Test;
+
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests for {@link ContainerReplica}.
+ */
+class TestContainerReplica {
+
+ @Test
+ void toBuilder() {
+ ContainerReplica subject = ContainerReplica.newBuilder()
+ .setBytesUsed(ThreadLocalRandom.current().nextLong())
+ .setContainerID(ContainerID.valueOf(
+ ThreadLocalRandom.current().nextLong(Long.MAX_VALUE - 1) + 1))
+ .setContainerState(CLOSED)
+ .setKeyCount(ThreadLocalRandom.current().nextLong())
+ .setOriginNodeId(UUID.randomUUID())
+ .setSequenceId(ThreadLocalRandom.current().nextLong())
+ .setReplicaIndex(ThreadLocalRandom.current().nextInt())
+ .setDatanodeDetails(MockDatanodeDetails.randomDatanodeDetails())
+ .build();
+
+ ContainerReplica copy = subject.toBuilder().build();
+ assertEquals(subject, copy);
+ assertEquals(subject.toString(), copy.toString()); // equals is incomplete
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStatus.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStatus.java
new file mode 100644
index 0000000000..00e0e11e89
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStatus.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY;
+import static org.apache.hadoop.hdds.scm.node.NodeStatus.inServiceDead;
+import static org.apache.hadoop.hdds.scm.node.NodeStatus.inServiceHealthy;
+import static org.apache.hadoop.hdds.scm.node.NodeStatus.inServiceStale;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link NodeStatus}.
+ */
+class TestNodeStatus {
+
+ @ParameterizedTest
+ @EnumSource
+ void readOnly(HddsProtos.NodeOperationalState state) {
+ assertEquals(0, new NodeStatus(state, HEALTHY)
+ .compareTo(new NodeStatus(state, HEALTHY_READONLY)));
+ }
+
+ @Test
+ void healthyFirst() {
+ assertTrue(0 > inServiceHealthy().compareTo(inServiceStale()));
+ assertTrue(0 < inServiceDead().compareTo(inServiceHealthy()));
+ assertTrue(0 > new NodeStatus(ENTERING_MAINTENANCE, HEALTHY).compareTo(
+ inServiceStale()
+ ));
+ assertTrue(0 < inServiceStale().compareTo(
+ new NodeStatus(DECOMMISSIONING, HEALTHY)
+ ));
+ }
+
+ @Test
+ void inServiceFirst() {
+ assertTrue(0 > inServiceHealthy().compareTo(
+ new NodeStatus(ENTERING_MAINTENANCE, HEALTHY)));
+ assertTrue(0 < new NodeStatus(DECOMMISSIONING, HEALTHY).compareTo(
+ inServiceHealthy()
+ ));
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
index 702807c388..f18704b50d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.pipeline;
+import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
@@ -29,6 +30,8 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -37,13 +40,21 @@ import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.ALLOCATED;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Test for the ECPipelineProvider.
@@ -58,7 +69,7 @@ public class TestECPipelineProvider {
private PlacementPolicy placementPolicy = Mockito.mock(PlacementPolicy.class);
private long containerSizeBytes;
@BeforeEach
- public void setup() throws IOException {
+ public void setup() throws IOException, NodeNotFoundException {
conf = new OzoneConfiguration();
provider = new ECPipelineProvider(
nodeManager, stateManager, conf, placementPolicy);
@@ -67,7 +78,7 @@ public class TestECPipelineProvider {
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
StorageUnit.BYTES);
// Placement policy will always return EC number of random nodes.
- Mockito.when(placementPolicy.chooseDatanodes(Mockito.anyList(),
+ when(placementPolicy.chooseDatanodes(Mockito.anyList(),
Mockito.anyList(), Mockito.anyInt(), Mockito.anyLong(),
Mockito.anyLong()))
.thenAnswer(invocation -> {
@@ -78,6 +89,8 @@ public class TestECPipelineProvider {
return dns;
});
+ when(nodeManager.getNodeStatus(any()))
+ .thenReturn(NodeStatus.inServiceHealthy());
}
@@ -112,6 +125,65 @@ public class TestECPipelineProvider {
}
}
+ @Test
+ void omitsDeadNodes() throws NodeNotFoundException {
+ ECReplicationConfig ecConf = new ECReplicationConfig(3, 2);
+ Set<ContainerReplica> replicas = createContainerReplicas(5);
+
+ Iterator<ContainerReplica> iterator = replicas.iterator();
+ DatanodeDetails dead = iterator.next().getDatanodeDetails();
+ when(nodeManager.getNodeStatus(dead))
+ .thenReturn(NodeStatus.inServiceDead());
+ DatanodeDetails dead2 = iterator.next().getDatanodeDetails();
+ when(nodeManager.getNodeStatus(dead2))
+ .thenReturn(new NodeStatus(IN_MAINTENANCE, DEAD));
+ DatanodeDetails dead3 = iterator.next().getDatanodeDetails();
+ when(nodeManager.getNodeStatus(dead3))
+ .thenReturn(new NodeStatus(DECOMMISSIONED, DEAD));
+ Set<DatanodeDetails> deadNodes = ImmutableSet.of(dead, dead2, dead3);
+
+ Pipeline pipeline = provider.createForRead(ecConf, replicas);
+
+ List<DatanodeDetails> nodes = pipeline.getNodes();
+ Assertions.assertEquals(replicas.size() - deadNodes.size(), nodes.size());
+ for (DatanodeDetails d : deadNodes) {
+ Assertions.assertFalse(nodes.contains(d));
+ }
+ }
+
+ @Test
+ void sortsHealthyNodesFirst() throws NodeNotFoundException {
+ ECReplicationConfig ecConf = new ECReplicationConfig(3, 2);
+ Set<ContainerReplica> replicas = new HashSet<>();
+ Set<DatanodeDetails> healthyNodes = new HashSet<>();
+ Set<DatanodeDetails> staleNodes = new HashSet<>();
+ Set<DatanodeDetails> decomNodes = new HashSet<>();
+ for (ContainerReplica replica : createContainerReplicas(5)) {
+ replicas.add(replica);
+ healthyNodes.add(replica.getDatanodeDetails());
+
+ DatanodeDetails decomNode = MockDatanodeDetails.randomDatanodeDetails();
+ replicas.add(replica.toBuilder().setDatanodeDetails(decomNode).build());
+ when(nodeManager.getNodeStatus(decomNode))
+ .thenReturn(new NodeStatus(DECOMMISSIONING, HEALTHY));
+ decomNodes.add(decomNode);
+
+ DatanodeDetails staleNode = MockDatanodeDetails.randomDatanodeDetails();
+ replicas.add(replica.toBuilder().setDatanodeDetails(staleNode).build());
+ when(nodeManager.getNodeStatus(staleNode))
+ .thenReturn(NodeStatus.inServiceStale());
+ staleNodes.add(staleNode);
+ }
+
+ Pipeline pipeline = provider.createForRead(ecConf, replicas);
+
+ List<DatanodeDetails> nodes = pipeline.getNodes();
+ Assertions.assertEquals(replicas.size(), nodes.size());
+ Assertions.assertEquals(healthyNodes, new HashSet<>(nodes.subList(0, 5)));
+ Assertions.assertEquals(decomNodes, new HashSet<>(nodes.subList(5, 10)));
+ Assertions.assertEquals(staleNodes, new HashSet<>(nodes.subList(10, 15)));
+ }
+
@Test
public void testExcludedAndFavoredNodesPassedToPlacementPolicy()
throws IOException {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org