You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2023/02/15 12:04:46 UTC

[ozone] branch master updated: HDDS-7919. EC: ECPipelineProvider.createForRead should filter out dead replicas and sort replicas (#4277)

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d5d370de3 HDDS-7919. EC: ECPipelineProvider.createForRead should filter out dead replicas and sort replicas (#4277)
5d5d370de3 is described below

commit 5d5d370de3cd4ba9e8b3fa88d69e33f7deaf8d57
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Wed Feb 15 13:04:39 2023 +0100

    HDDS-7919. EC: ECPipelineProvider.createForRead should filter out dead replicas and sort replicas (#4277)
---
 .../hdds/scm/container/ContainerReplica.java       | 12 ++++
 .../apache/hadoop/hdds/scm/node/NodeStatus.java    | 18 ++++-
 .../hdds/scm/pipeline/ECPipelineProvider.java      | 25 ++++++-
 .../hdds/scm/container/TestContainerReplica.java   | 53 +++++++++++++++
 .../hadoop/hdds/scm/node/TestNodeStatus.java       | 68 +++++++++++++++++++
 .../hdds/scm/pipeline/TestECPipelineProvider.java  | 76 +++++++++++++++++++++-
 6 files changed, 247 insertions(+), 5 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
index 3c1176c015..ab67bff0c7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
@@ -163,6 +163,18 @@ public final class ContainerReplica implements Comparable<ContainerReplica> {
     return new ContainerReplicaBuilder();
   }
 
+  public ContainerReplicaBuilder toBuilder() {
+    return newBuilder()
+        .setBytesUsed(bytesUsed)
+        .setContainerID(containerID)
+        .setContainerState(state)
+        .setDatanodeDetails(datanodeDetails)
+        .setKeyCount(keyCount)
+        .setOriginNodeId(placeOfBirth)
+        .setReplicaIndex(replicaIndex)
+        .setSequenceId(sequenceId);
+  }
+
   @Override
   public String toString() {
     return "ContainerReplica{" +
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
index ac56c01ea7..68cb3f4be7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
@@ -28,7 +28,7 @@ import java.util.Objects;
  * in_service, decommissioned and maintenance mode) along with the expiry time
  * for the operational state (used with maintenance mode).
  */
-public class NodeStatus {
+public class NodeStatus implements Comparable<NodeStatus> {
 
   private HddsProtos.NodeOperationalState operationalState;
   private HddsProtos.NodeState health;
@@ -93,6 +93,10 @@ public class NodeStatus {
     return System.currentTimeMillis() / 1000 >= opStateExpiryEpochSeconds;
   }
 
+  public boolean isInService() {
+    return operationalState == HddsProtos.NodeOperationalState.IN_SERVICE;
+  }
+
   /**
    * Returns true if the nodeStatus indicates the node is in any decommission
    * state.
@@ -214,4 +218,16 @@ public class NodeStatus {
         " OperationStateExpiry: " + opStateExpiryEpochSeconds;
   }
 
+  @Override
+  public int compareTo(NodeStatus o) {
+    int order = Boolean.compare(o.isHealthy(), isHealthy());
+    if (order == 0) {
+      order = Boolean.compare(isDead(), o.isDead());
+    }
+    if (order == 0) {
+      order = operationalState.compareTo(o.operationalState);
+    }
+    return order;
+  }
+
 }
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
index 11c23f1072..c7bf6819a7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
@@ -25,10 +25,15 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -39,6 +44,9 @@ import java.util.Set;
  */
 public class ECPipelineProvider extends PipelineProvider<ECReplicationConfig> {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ECPipelineProvider.class);
+
   // TODO - EC Placement Policy. Standard Network Aware topology will not work
   //        for EC as it stands. We may want an "as many racks as possible"
   //        policy. HDDS-5326.
@@ -96,11 +104,24 @@ public class ECPipelineProvider extends PipelineProvider<ECReplicationConfig> {
       Set<ContainerReplica> replicas) {
     Map<DatanodeDetails, Integer> map = new HashMap<>();
     List<DatanodeDetails> dns = new ArrayList<>(replicas.size());
+    Map<DatanodeDetails, NodeStatus> nodeStatusMap = new HashMap<>();
 
     for (ContainerReplica r : replicas) {
-      map.put(r.getDatanodeDetails(), r.getReplicaIndex());
-      dns.add(r.getDatanodeDetails());
+      DatanodeDetails dn = r.getDatanodeDetails();
+      try {
+        NodeStatus nodeStatus = getNodeManager().getNodeStatus(dn);
+        if (!nodeStatus.isDead()) {
+          map.put(dn, r.getReplicaIndex());
+          dns.add(dn);
+          nodeStatusMap.put(dn, nodeStatus);
+        }
+      } catch (NodeNotFoundException e) {
+        LOG.error("Node not found", e);
+      }
     }
+
+    dns.sort(Comparator.comparing(nodeStatusMap::get));
+
     return createPipelineInternal(replicationConfig, dns, map);
   }
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReplica.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReplica.java
new file mode 100644
index 0000000000..a9563aa344
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReplica.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.junit.jupiter.api.Test;
+
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests for {@link ContainerReplica}.
+ */
+class TestContainerReplica {
+
+  @Test
+  void toBuilder() {
+    ContainerReplica subject = ContainerReplica.newBuilder()
+        .setBytesUsed(ThreadLocalRandom.current().nextLong())
+        .setContainerID(ContainerID.valueOf(
+            ThreadLocalRandom.current().nextLong(Long.MAX_VALUE - 1) + 1))
+        .setContainerState(CLOSED)
+        .setKeyCount(ThreadLocalRandom.current().nextLong())
+        .setOriginNodeId(UUID.randomUUID())
+        .setSequenceId(ThreadLocalRandom.current().nextLong())
+        .setReplicaIndex(ThreadLocalRandom.current().nextInt())
+        .setDatanodeDetails(MockDatanodeDetails.randomDatanodeDetails())
+        .build();
+
+    ContainerReplica copy = subject.toBuilder().build();
+    assertEquals(subject, copy);
+    assertEquals(subject.toString(), copy.toString()); // equals is incomplete
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStatus.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStatus.java
new file mode 100644
index 0000000000..00e0e11e89
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStatus.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY;
+import static org.apache.hadoop.hdds.scm.node.NodeStatus.inServiceDead;
+import static org.apache.hadoop.hdds.scm.node.NodeStatus.inServiceHealthy;
+import static org.apache.hadoop.hdds.scm.node.NodeStatus.inServiceStale;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link NodeStatus}.
+ */
+class TestNodeStatus {
+
+  @ParameterizedTest
+  @EnumSource
+  void readOnly(HddsProtos.NodeOperationalState state) {
+    assertEquals(0, new NodeStatus(state, HEALTHY)
+        .compareTo(new NodeStatus(state, HEALTHY_READONLY)));
+  }
+
+  @Test
+  void healthyFirst() {
+    assertTrue(0 > inServiceHealthy().compareTo(inServiceStale()));
+    assertTrue(0 < inServiceDead().compareTo(inServiceHealthy()));
+    assertTrue(0 > new NodeStatus(ENTERING_MAINTENANCE, HEALTHY).compareTo(
+        inServiceStale()
+    ));
+    assertTrue(0 < inServiceStale().compareTo(
+        new NodeStatus(DECOMMISSIONING, HEALTHY)
+    ));
+  }
+
+  @Test
+  void inServiceFirst() {
+    assertTrue(0 > inServiceHealthy().compareTo(
+        new NodeStatus(ENTERING_MAINTENANCE, HEALTHY)));
+    assertTrue(0 < new NodeStatus(DECOMMISSIONING, HEALTHY).compareTo(
+        inServiceHealthy()
+    ));
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
index 702807c388..f18704b50d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
@@ -29,6 +30,8 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -37,13 +40,21 @@ import org.mockito.Mockito;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
 import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.ALLOCATED;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Test for the ECPipelineProvider.
@@ -58,7 +69,7 @@ public class TestECPipelineProvider {
   private PlacementPolicy placementPolicy = Mockito.mock(PlacementPolicy.class);
   private long containerSizeBytes;
   @BeforeEach
-  public void setup() throws IOException {
+  public void setup() throws IOException, NodeNotFoundException {
     conf = new OzoneConfiguration();
     provider = new ECPipelineProvider(
         nodeManager, stateManager, conf, placementPolicy);
@@ -67,7 +78,7 @@ public class TestECPipelineProvider {
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
         StorageUnit.BYTES);
     // Placement policy will always return EC number of random nodes.
-    Mockito.when(placementPolicy.chooseDatanodes(Mockito.anyList(),
+    when(placementPolicy.chooseDatanodes(Mockito.anyList(),
         Mockito.anyList(), Mockito.anyInt(), Mockito.anyLong(),
         Mockito.anyLong()))
         .thenAnswer(invocation -> {
@@ -78,6 +89,8 @@ public class TestECPipelineProvider {
           return dns;
         });
 
+    when(nodeManager.getNodeStatus(any()))
+        .thenReturn(NodeStatus.inServiceHealthy());
   }
 
 
@@ -112,6 +125,65 @@ public class TestECPipelineProvider {
     }
   }
 
+  @Test
+  void omitsDeadNodes() throws NodeNotFoundException {
+    ECReplicationConfig ecConf = new ECReplicationConfig(3, 2);
+    Set<ContainerReplica> replicas = createContainerReplicas(5);
+
+    Iterator<ContainerReplica> iterator = replicas.iterator();
+    DatanodeDetails dead = iterator.next().getDatanodeDetails();
+    when(nodeManager.getNodeStatus(dead))
+        .thenReturn(NodeStatus.inServiceDead());
+    DatanodeDetails dead2 = iterator.next().getDatanodeDetails();
+    when(nodeManager.getNodeStatus(dead2))
+        .thenReturn(new NodeStatus(IN_MAINTENANCE, DEAD));
+    DatanodeDetails dead3 = iterator.next().getDatanodeDetails();
+    when(nodeManager.getNodeStatus(dead3))
+        .thenReturn(new NodeStatus(DECOMMISSIONED, DEAD));
+    Set<DatanodeDetails> deadNodes = ImmutableSet.of(dead, dead2, dead3);
+
+    Pipeline pipeline = provider.createForRead(ecConf, replicas);
+
+    List<DatanodeDetails> nodes = pipeline.getNodes();
+    Assertions.assertEquals(replicas.size() - deadNodes.size(), nodes.size());
+    for (DatanodeDetails d : deadNodes) {
+      Assertions.assertFalse(nodes.contains(d));
+    }
+  }
+
+  @Test
+  void sortsHealthyNodesFirst() throws NodeNotFoundException {
+    ECReplicationConfig ecConf = new ECReplicationConfig(3, 2);
+    Set<ContainerReplica> replicas = new HashSet<>();
+    Set<DatanodeDetails> healthyNodes = new HashSet<>();
+    Set<DatanodeDetails> staleNodes = new HashSet<>();
+    Set<DatanodeDetails> decomNodes = new HashSet<>();
+    for (ContainerReplica replica : createContainerReplicas(5)) {
+      replicas.add(replica);
+      healthyNodes.add(replica.getDatanodeDetails());
+
+      DatanodeDetails decomNode = MockDatanodeDetails.randomDatanodeDetails();
+      replicas.add(replica.toBuilder().setDatanodeDetails(decomNode).build());
+      when(nodeManager.getNodeStatus(decomNode))
+          .thenReturn(new NodeStatus(DECOMMISSIONING, HEALTHY));
+      decomNodes.add(decomNode);
+
+      DatanodeDetails staleNode = MockDatanodeDetails.randomDatanodeDetails();
+      replicas.add(replica.toBuilder().setDatanodeDetails(staleNode).build());
+      when(nodeManager.getNodeStatus(staleNode))
+          .thenReturn(NodeStatus.inServiceStale());
+      staleNodes.add(staleNode);
+    }
+
+    Pipeline pipeline = provider.createForRead(ecConf, replicas);
+
+    List<DatanodeDetails> nodes = pipeline.getNodes();
+    Assertions.assertEquals(replicas.size(), nodes.size());
+    Assertions.assertEquals(healthyNodes, new HashSet<>(nodes.subList(0, 5)));
+    Assertions.assertEquals(decomNodes, new HashSet<>(nodes.subList(5, 10)));
+    Assertions.assertEquals(staleNodes, new HashSet<>(nodes.subList(10, 15)));
+  }
+
   @Test
   public void testExcludedAndFavoredNodesPassedToPlacementPolicy()
       throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org