You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/06/21 14:56:25 UTC

[GitHub] [ozone] sodonnel opened a new pull request #2353: HDDS-4892. EC: Implement basic EC pipeline provider

sodonnel opened a new pull request #2353:
URL: https://github.com/apache/ozone/pull/2353


   ## What changes were proposed in this pull request?
   
   This chance creates an ECPipelineProvider instance to allow for EC Pipelines to be created in the PipelineManager.
   
   It also adds the the WritableEcContainerProvider, so a writable container can be selected from the open pipelines, or created if necessary.
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/HDDS-4892
   
   ## How was this patch tested?
   
   New unit tests
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2353: HDDS-4892. EC: Implement basic EC pipeline provider

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2353:
URL: https://github.com/apache/ozone/pull/2353#discussion_r658628721



##########
File path: hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
##########
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests to validate the WritableECContainerProvider works correctly.
+ */
+public class TestWritableECContainerProvider {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestWritableECContainerProvider.class);
+  private static final String OWNER = "SCM";
+  private PipelineManager pipelineManager = MockPipelineManager.getInstance();
+  private ContainerManagerV2 containerManager
+      = Mockito.mock(ContainerManagerV2.class);
+  private PipelineChoosePolicy pipelineChoosingPolicy
+      = new HealthyPipelineChoosePolicy();
+
+  private ConfigurationSource conf;
+  private WritableContainerProvider provider;
+  private ReplicationConfig repConfig;
+  private int minPipelines;
+
+  private Map<ContainerID, ContainerInfo> containers;
+
+  @Before
+  public void setup() throws ContainerNotFoundException {
+    repConfig = new ECReplicationConfig(3, 2);
+    conf = new OzoneConfiguration();
+    WritableECContainerProvider.WritableECContainerProviderConfig providerConf =
+        conf.getObject(WritableECContainerProvider
+            .WritableECContainerProviderConfig.class);
+    minPipelines = providerConf.getMinimumPipelines();
+    containers = new HashMap<>();
+    provider = new WritableECContainerProvider(
+        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+    Mockito.doAnswer(call -> {
+      Pipeline pipeline = (Pipeline)call.getArguments()[2];
+      ContainerInfo container = createContainer(pipeline,
+          repConfig, System.nanoTime());
+      pipelineManager.addContainerToPipeline(
+          pipeline.getId(), container.containerID());
+      containers.put(container.containerID(), container);
+      return container;
+    }).when(containerManager).getMatchingContainer(Matchers.anyLong(),
+        Matchers.anyString(), Matchers.any(Pipeline.class));
+
+    Mockito.doAnswer(call ->
+        containers.get((ContainerID)call.getArguments()[0]))
+        .when(containerManager).getContainer(Matchers.any(ContainerID.class));
+
+  }
+
+  @After
+  public void teardown() {
+  }
+
+  @Test
+  public void testPipelinesCreatedUpToMinLimitAndRandomPipelineReturned()
+      throws IOException {
+    // The first 5 calls should return a different container
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container =
+          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+      assertFalse(allocatedContainers.contains(container));
+      allocatedContainers.add(container);
+    }
+
+    allocatedContainers.clear();
+    for (int i=0; i<20; i++) {
+      ContainerInfo container =
+          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+      allocatedContainers.add(container);
+    }
+    // Should have minPipelines containers created
+    assertEquals(minPipelines,
+        pipelineManager.getPipelines(repConfig, OPEN).size());
+    // We should have more than 1 allocatedContainers in the set proving a
+    // random container is selected each time. Do not check for 5 here as there
+    // is a reasonable chance that in 20 turns we don't pick all 5 nodes.
+    assertTrue(allocatedContainers.size() > 2);
+  }
+
+  @Test
+  public void testPiplineLimitIgnoresExcludedPipelines() throws IOException {
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container = provider.getContainer(
+          1, repConfig, OWNER, new ExcludeList());
+      allocatedContainers.add(container);
+    }
+    // We have the min limit of pipelines, but then exclude one. It should use
+    // one of the existing rather than createing a new one, as the limit is
+    // checked against all pipelines, not just the filtered list
+    ExcludeList exclude = new ExcludeList();
+    PipelineID excludedID = allocatedContainers
+        .stream().findFirst().get().getPipelineID();
+    exclude.addPipeline(excludedID);
+
+    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
+    assertNotEquals(excludedID, c.getPipelineID());
+    assertTrue(allocatedContainers.contains(c));
+  }
+
+  @Test
+  public void testNewPipelineCreatedIfAllPipelinesExcluded()
+      throws IOException {
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container = provider.getContainer(
+          1, repConfig, OWNER, new ExcludeList());
+      allocatedContainers.add(container);
+    }
+    // We have the min limit of pipelines, but then exclude one. It should use
+    // one of the existing rather than createing a new one, as the limit is
+    // checked against all pipelines, not just the filtered list
+    ExcludeList exclude = new ExcludeList();
+    for (ContainerInfo c : allocatedContainers) {
+      exclude.addPipeline(c.getPipelineID());
+    }
+    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
+    assertFalse(allocatedContainers.contains(c));
+  }
+
+  @Test
+  public void testNewPipelineCreatedIfAllContainersExcluded()
+      throws IOException {
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container = provider.getContainer(
+          1, repConfig, OWNER, new ExcludeList());
+      allocatedContainers.add(container);
+    }
+    // We have the min limit of pipelines, but then exclude one. It should use
+    // one of the existing rather than createing a new one, as the limit is
+    // checked against all pipelines, not just the filtered list
+    ExcludeList exclude = new ExcludeList();
+    for (ContainerInfo c : allocatedContainers) {
+      exclude.addConatinerId(c.containerID());
+    }
+    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
+    assertFalse(allocatedContainers.contains(c));
+  }
+
+  @Test
+  public void testUnableToCreateAnyPipelinesReturnsNull() throws IOException {
+    pipelineManager = new MockPipelineManager() {
+      @Override
+      public Pipeline createPipeline(ReplicationConfig repConf)
+          throws IOException {
+        throw new IOException("Cannot create pipelines");
+      }
+    };
+    provider = new WritableECContainerProvider(
+        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+    ContainerInfo container =
+        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+    assertNull(container);
+  }
+
+  @Test
+  public void testExistingPipelineReturnedWhenNewCannotBeCreated()
+      throws IOException {
+    pipelineManager = new MockPipelineManager() {
+
+      private boolean throwError = false;
+
+      @Override
+      public Pipeline createPipeline(ReplicationConfig repConf)
+          throws IOException {
+        if (throwError) {
+          throw new IOException("Cannot create pipelines");
+        }
+        throwError = true;
+        return super.createPipeline(repConfig);
+      }
+    };
+    provider = new WritableECContainerProvider(
+        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+    ContainerInfo container =
+        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+    for (int i=0; i<5; i++) {
+      ContainerInfo nextContainer =
+          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+      assertEquals(container, nextContainer);
+    }
+  }
+
+  @Test
+  public void testNewContainerAllocatedAndPipelinesClosedIfNoSpaceInExisting()
+      throws IOException {
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container =
+          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+      assertFalse(allocatedContainers.contains(container));
+      allocatedContainers.add(container);
+    }
+    // Update all the containers to make them full
+    for (ContainerInfo c : allocatedContainers) {
+      c.setUsedBytes(getMaxContainerSize());
+    }
+    // Get a new container and ensure it is not one of the original set
+    ContainerInfo newContainer =
+        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+    assertNotNull(newContainer);
+    assertFalse(allocatedContainers.contains(newContainer));
+    // The original pipelines should all be closed
+    for (ContainerInfo c : allocatedContainers) {
+      Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
+      assertEquals(CLOSED, pipeline.getPipelineState());
+    }
+  }
+
+  @Test
+  public void testPipelineNotFoundWhenAttemptingToUseExisting()
+      throws IOException {
+    // Ensure PM throws PNF exception when we ask for the containers in the
+    // pipeline
+    pipelineManager = new MockPipelineManager() {

Review comment:
       I have ran the tests quite a few times, and there have never failed (except when I was developing the code). However I see I have only instantiated pipelineManager once here:
   
   ```
   private PipelineManager pipelineManager = MockPipelineManager.getInstance();
   ```
   
   I should be creating a new instance in setup for each test, so I will move that line in there. Otherwise there may be some leftover things in the pipelineManager from the previous test which could affect a later one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2353: HDDS-4892. EC: Implement basic EC pipeline provider

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2353:
URL: https://github.com/apache/ozone/pull/2353#discussion_r658631619



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
##########
@@ -97,6 +97,21 @@ public Pipeline getPipeline(PipelineID pipelineID)
         .getPipelines(replicationConfig, state, excludeDns, excludePipelines);
   }
 
+  @Override
+  /**
+   * Returns the count of pipelines meeting the given ReplicationConfig and
+   * state.
+   * @param replicationConfig The ReplicationConfig of the pipelines to count
+   * @param state The current state of the pipelines to count
+   * @return The count of pipelines meeting the above criteria
+   */

Review comment:
       I have fixed it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2353: HDDS-4892. EC: Implement basic EC pipeline provider

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2353:
URL: https://github.com/apache/ozone/pull/2353#discussion_r658630755



##########
File path: hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
##########
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests to validate the WritableECContainerProvider works correctly.
+ */
+public class TestWritableECContainerProvider {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestWritableECContainerProvider.class);
+  private static final String OWNER = "SCM";
+  private PipelineManager pipelineManager = MockPipelineManager.getInstance();
+  private ContainerManagerV2 containerManager
+      = Mockito.mock(ContainerManagerV2.class);
+  private PipelineChoosePolicy pipelineChoosingPolicy
+      = new HealthyPipelineChoosePolicy();
+
+  private ConfigurationSource conf;
+  private WritableContainerProvider provider;
+  private ReplicationConfig repConfig;
+  private int minPipelines;
+
+  private Map<ContainerID, ContainerInfo> containers;
+
+  @Before
+  public void setup() throws ContainerNotFoundException {
+    repConfig = new ECReplicationConfig(3, 2);
+    conf = new OzoneConfiguration();
+    WritableECContainerProvider.WritableECContainerProviderConfig providerConf =
+        conf.getObject(WritableECContainerProvider
+            .WritableECContainerProviderConfig.class);
+    minPipelines = providerConf.getMinimumPipelines();
+    containers = new HashMap<>();
+    provider = new WritableECContainerProvider(
+        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+    Mockito.doAnswer(call -> {
+      Pipeline pipeline = (Pipeline)call.getArguments()[2];
+      ContainerInfo container = createContainer(pipeline,
+          repConfig, System.nanoTime());
+      pipelineManager.addContainerToPipeline(
+          pipeline.getId(), container.containerID());
+      containers.put(container.containerID(), container);
+      return container;
+    }).when(containerManager).getMatchingContainer(Matchers.anyLong(),
+        Matchers.anyString(), Matchers.any(Pipeline.class));
+
+    Mockito.doAnswer(call ->
+        containers.get((ContainerID)call.getArguments()[0]))
+        .when(containerManager).getContainer(Matchers.any(ContainerID.class));
+
+  }
+
+  @After

Review comment:
       True, I have removed it. I usually just create setup and teardown out of habit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2353: HDDS-4892. EC: Implement basic EC pipeline provider

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2353:
URL: https://github.com/apache/ozone/pull/2353#discussion_r657009386



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
##########
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class to create pipelines for EC containers.
+ */
+public class ECPipelineProvider extends PipelineProvider<ECReplicationConfig> {
+
+  // TODO - EC Placement Policy. Standard Network Aware topology will not work
+  //        for EC as it stands. We may want an "as many racks as possible"
+  //        policy.
+
+  private final ConfigurationSource conf;
+  private final PlacementPolicy placementPolicy;
+
+  public ECPipelineProvider(NodeManager nodeManager,
+                            StateManager stateManager,
+                            ConfigurationSource conf,
+                            PlacementPolicy placementPolicy) {
+    super(nodeManager, stateManager);
+    this.conf = conf;
+    this.placementPolicy = placementPolicy;
+  }
+
+  @Override
+  protected Pipeline create(ECReplicationConfig replicationConfig)
+      throws IOException {
+    List<DatanodeDetails> dns = placementPolicy.chooseDatanodes(null,
+        null, replicationConfig.getRequiredNodes(), 0);
+    return create(replicationConfig, dns);
+  }
+
+  @Override
+  protected Pipeline create(ECReplicationConfig replicationConfig,
+      List<DatanodeDetails> nodes) {
+
+    Map<DatanodeDetails, Integer> dnIndexes = new HashMap<>();
+    int ecIndex = 0;
+    for (DatanodeDetails dn : nodes) {
+      dnIndexes.put(dn, ecIndex);

Review comment:
       OK - I have changed it to start from 1.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2353: HDDS-4892. EC: Implement basic EC pipeline provider

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2353:
URL: https://github.com/apache/ozone/pull/2353#discussion_r658440566



##########
File path: hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
##########
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests to validate the WritableECContainerProvider works correctly.
+ */
+public class TestWritableECContainerProvider {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestWritableECContainerProvider.class);
+  private static final String OWNER = "SCM";
+  private PipelineManager pipelineManager = MockPipelineManager.getInstance();
+  private ContainerManagerV2 containerManager
+      = Mockito.mock(ContainerManagerV2.class);
+  private PipelineChoosePolicy pipelineChoosingPolicy
+      = new HealthyPipelineChoosePolicy();
+
+  private ConfigurationSource conf;
+  private WritableContainerProvider provider;
+  private ReplicationConfig repConfig;
+  private int minPipelines;
+
+  private Map<ContainerID, ContainerInfo> containers;
+
+  @Before
+  public void setup() throws ContainerNotFoundException {
+    repConfig = new ECReplicationConfig(3, 2);
+    conf = new OzoneConfiguration();
+    WritableECContainerProvider.WritableECContainerProviderConfig providerConf =
+        conf.getObject(WritableECContainerProvider
+            .WritableECContainerProviderConfig.class);
+    minPipelines = providerConf.getMinimumPipelines();
+    containers = new HashMap<>();
+    provider = new WritableECContainerProvider(
+        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+    Mockito.doAnswer(call -> {
+      Pipeline pipeline = (Pipeline)call.getArguments()[2];
+      ContainerInfo container = createContainer(pipeline,
+          repConfig, System.nanoTime());
+      pipelineManager.addContainerToPipeline(
+          pipeline.getId(), container.containerID());
+      containers.put(container.containerID(), container);
+      return container;
+    }).when(containerManager).getMatchingContainer(Matchers.anyLong(),
+        Matchers.anyString(), Matchers.any(Pipeline.class));
+
+    Mockito.doAnswer(call ->
+        containers.get((ContainerID)call.getArguments()[0]))
+        .when(containerManager).getContainer(Matchers.any(ContainerID.class));
+
+  }
+
+  @After

Review comment:
       you can remove this if we are not doing anything in teardown

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
##########
@@ -97,6 +97,21 @@ public Pipeline getPipeline(PipelineID pipelineID)
         .getPipelines(replicationConfig, state, excludeDns, excludePipelines);
   }
 
+  @Override
+  /**
+   * Returns the count of pipelines meeting the given ReplicationConfig and
+   * state.
+   * @param replicationConfig The ReplicationConfig of the pipelines to count
+   * @param state The current state of the pipelines to count
+   * @return The count of pipelines meeting the above criteria
+   */

Review comment:
       Nit: below method signature needs formatting?

##########
File path: hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
##########
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests to validate the WritableECContainerProvider works correctly.
+ */
+public class TestWritableECContainerProvider {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestWritableECContainerProvider.class);
+  private static final String OWNER = "SCM";
+  private PipelineManager pipelineManager = MockPipelineManager.getInstance();
+  private ContainerManagerV2 containerManager
+      = Mockito.mock(ContainerManagerV2.class);
+  private PipelineChoosePolicy pipelineChoosingPolicy
+      = new HealthyPipelineChoosePolicy();
+
+  private ConfigurationSource conf;
+  private WritableContainerProvider provider;
+  private ReplicationConfig repConfig;
+  private int minPipelines;
+
+  private Map<ContainerID, ContainerInfo> containers;
+
+  @Before
+  public void setup() throws ContainerNotFoundException {
+    repConfig = new ECReplicationConfig(3, 2);
+    conf = new OzoneConfiguration();
+    WritableECContainerProvider.WritableECContainerProviderConfig providerConf =
+        conf.getObject(WritableECContainerProvider
+            .WritableECContainerProviderConfig.class);
+    minPipelines = providerConf.getMinimumPipelines();
+    containers = new HashMap<>();
+    provider = new WritableECContainerProvider(
+        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+    Mockito.doAnswer(call -> {
+      Pipeline pipeline = (Pipeline)call.getArguments()[2];
+      ContainerInfo container = createContainer(pipeline,
+          repConfig, System.nanoTime());
+      pipelineManager.addContainerToPipeline(
+          pipeline.getId(), container.containerID());
+      containers.put(container.containerID(), container);
+      return container;
+    }).when(containerManager).getMatchingContainer(Matchers.anyLong(),
+        Matchers.anyString(), Matchers.any(Pipeline.class));
+
+    Mockito.doAnswer(call ->
+        containers.get((ContainerID)call.getArguments()[0]))
+        .when(containerManager).getContainer(Matchers.any(ContainerID.class));
+
+  }
+
+  @After
+  public void teardown() {
+  }
+
+  @Test
+  public void testPipelinesCreatedUpToMinLimitAndRandomPipelineReturned()
+      throws IOException {
+    // The first 5 calls should return a different container
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container =
+          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+      assertFalse(allocatedContainers.contains(container));
+      allocatedContainers.add(container);
+    }
+
+    allocatedContainers.clear();
+    for (int i=0; i<20; i++) {
+      ContainerInfo container =
+          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+      allocatedContainers.add(container);
+    }
+    // Should have minPipelines containers created
+    assertEquals(minPipelines,
+        pipelineManager.getPipelines(repConfig, OPEN).size());
+    // We should have more than 1 allocatedContainers in the set proving a
+    // random container is selected each time. Do not check for 5 here as there
+    // is a reasonable chance that in 20 turns we don't pick all 5 nodes.
+    assertTrue(allocatedContainers.size() > 2);
+  }
+
+  @Test
+  public void testPiplineLimitIgnoresExcludedPipelines() throws IOException {
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container = provider.getContainer(
+          1, repConfig, OWNER, new ExcludeList());
+      allocatedContainers.add(container);
+    }
+    // We have the min limit of pipelines, but then exclude one. It should use
+    // one of the existing rather than createing a new one, as the limit is
+    // checked against all pipelines, not just the filtered list
+    ExcludeList exclude = new ExcludeList();
+    PipelineID excludedID = allocatedContainers
+        .stream().findFirst().get().getPipelineID();
+    exclude.addPipeline(excludedID);
+
+    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
+    assertNotEquals(excludedID, c.getPipelineID());
+    assertTrue(allocatedContainers.contains(c));
+  }
+
+  @Test
+  public void testNewPipelineCreatedIfAllPipelinesExcluded()
+      throws IOException {
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container = provider.getContainer(
+          1, repConfig, OWNER, new ExcludeList());
+      allocatedContainers.add(container);
+    }
+    // We have the min limit of pipelines, but then exclude one. It should use
+    // one of the existing rather than createing a new one, as the limit is
+    // checked against all pipelines, not just the filtered list
+    ExcludeList exclude = new ExcludeList();
+    for (ContainerInfo c : allocatedContainers) {
+      exclude.addPipeline(c.getPipelineID());
+    }
+    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
+    assertFalse(allocatedContainers.contains(c));
+  }
+
+  @Test
+  public void testNewPipelineCreatedIfAllContainersExcluded()
+      throws IOException {
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container = provider.getContainer(
+          1, repConfig, OWNER, new ExcludeList());
+      allocatedContainers.add(container);
+    }
+    // We have the min limit of pipelines, but then exclude one. It should use
+    // one of the existing rather than createing a new one, as the limit is
+    // checked against all pipelines, not just the filtered list
+    ExcludeList exclude = new ExcludeList();
+    for (ContainerInfo c : allocatedContainers) {
+      exclude.addConatinerId(c.containerID());
+    }
+    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
+    assertFalse(allocatedContainers.contains(c));
+  }
+
+  @Test
+  public void testUnableToCreateAnyPipelinesReturnsNull() throws IOException {
+    pipelineManager = new MockPipelineManager() {
+      @Override
+      public Pipeline createPipeline(ReplicationConfig repConf)
+          throws IOException {
+        throw new IOException("Cannot create pipelines");
+      }
+    };
+    provider = new WritableECContainerProvider(
+        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+    ContainerInfo container =
+        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+    assertNull(container);
+  }
+
+  @Test
+  public void testExistingPipelineReturnedWhenNewCannotBeCreated()
+      throws IOException {
+    pipelineManager = new MockPipelineManager() {
+
+      private boolean throwError = false;
+
+      @Override
+      public Pipeline createPipeline(ReplicationConfig repConf)
+          throws IOException {
+        if (throwError) {
+          throw new IOException("Cannot create pipelines");
+        }
+        throwError = true;
+        return super.createPipeline(repConfig);
+      }
+    };
+    provider = new WritableECContainerProvider(
+        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+    ContainerInfo container =
+        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+    for (int i=0; i<5; i++) {
+      ContainerInfo nextContainer =
+          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+      assertEquals(container, nextContainer);
+    }
+  }
+
+  @Test
+  public void testNewContainerAllocatedAndPipelinesClosedIfNoSpaceInExisting()
+      throws IOException {
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container =
+          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+      assertFalse(allocatedContainers.contains(container));
+      allocatedContainers.add(container);
+    }
+    // Update all the containers to make them full
+    for (ContainerInfo c : allocatedContainers) {
+      c.setUsedBytes(getMaxContainerSize());
+    }
+    // Get a new container and ensure it is not one of the original set
+    ContainerInfo newContainer =
+        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+    assertNotNull(newContainer);
+    assertFalse(allocatedContainers.contains(newContainer));
+    // The original pipelines should all be closed
+    for (ContainerInfo c : allocatedContainers) {
+      Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
+      assertEquals(CLOSED, pipeline.getPipelineState());
+    }
+  }
+
+  @Test
+  public void testPipelineNotFoundWhenAttemptingToUseExisting()
+      throws IOException {
+    // Ensure PM throws PNF exception when we ask for the containers in the
+    // pipeline
+    pipelineManager = new MockPipelineManager() {

Review comment:
       pipelineManager field initialized in some tests and other test using pipeline manager. I think order of tests execution is not gauranteed right? could you please check if that can create any flaky tests?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2353: HDDS-4892. EC: Implement basic EC pipeline provider

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2353:
URL: https://github.com/apache/ozone/pull/2353#discussion_r656573236



##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
##########
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class to create pipelines for EC containers.
+ */
+public class ECPipelineProvider extends PipelineProvider<ECReplicationConfig> {
+
+  // TODO - EC Placement Policy. Standard Network Aware topology will not work

Review comment:
       Since you already filed JIRA for this, please add JIRA id here. HDDS-5326

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
##########
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class to create pipelines for EC containers.
+ */
+public class ECPipelineProvider extends PipelineProvider<ECReplicationConfig> {
+
+  // TODO - EC Placement Policy. Standard Network Aware topology will not work
+  //        for EC as it stands. We may want an "as many racks as possible"
+  //        policy.
+
+  private final ConfigurationSource conf;
+  private final PlacementPolicy placementPolicy;
+
+  public ECPipelineProvider(NodeManager nodeManager,
+                            StateManager stateManager,
+                            ConfigurationSource conf,
+                            PlacementPolicy placementPolicy) {
+    super(nodeManager, stateManager);
+    this.conf = conf;
+    this.placementPolicy = placementPolicy;
+  }
+
+  @Override
+  protected Pipeline create(ECReplicationConfig replicationConfig)
+      throws IOException {
+    List<DatanodeDetails> dns = placementPolicy.chooseDatanodes(null,
+        null, replicationConfig.getRequiredNodes(), 0);
+    return create(replicationConfig, dns);
+  }
+
+  @Override
+  protected Pipeline create(ECReplicationConfig replicationConfig,
+      List<DatanodeDetails> nodes) {
+
+    Map<DatanodeDetails, Integer> dnIndexes = new HashMap<>();
+    int ecIndex = 0;
+    for (DatanodeDetails dn : nodes) {
+      dnIndexes.put(dn, ecIndex);

Review comment:
       you may want to use ++ecIndex or initialize with 1. We discussed ecIndexes starts with 1. The 0 represents in EC.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel merged pull request #2353: HDDS-4892. EC: Implement basic EC pipeline provider

Posted by GitBox <gi...@apache.org>.
sodonnel merged pull request #2353:
URL: https://github.com/apache/ozone/pull/2353


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org