You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2021/06/04 15:53:01 UTC

[ozone] branch master updated: HDDS-5292. Introduce the WritableContainerInterface to SCM (#2300)

This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bc1a25  HDDS-5292. Introduce the WritableContainerInterface to SCM (#2300)
7bc1a25 is described below

commit 7bc1a257a9cd7dfcc27d3e91b5125e0fc60cfc43
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Fri Jun 4 16:52:45 2021 +0100

    HDDS-5292. Introduce the WritableContainerInterface to SCM (#2300)
---
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |  89 +-----------
 .../scm/pipeline/WritableContainerFactory.java     |  58 ++++++++
 .../scm/pipeline/WritableContainerProvider.java    |  57 ++++++++
 .../pipeline/WritableRatisContainerProvider.java   | 158 +++++++++++++++++++++
 .../hadoop/hdds/scm/server/SCMConfigurator.java    |  20 +++
 .../hdds/scm/server/StorageContainerManager.java   |  16 +++
 6 files changed, 316 insertions(+), 82 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 94da882..65a4192 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hdds.client.ContainerBlockID;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.StorageUnit;
-import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
 import org.apache.hadoop.hdds.scm.ScmConfig;
@@ -44,6 +43,7 @@ import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.WritableContainerFactory;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.common.BlockGroup;
@@ -70,6 +70,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
   private final StorageContainerManager scm;
   private final PipelineManager pipelineManager;
   private final ContainerManagerV2 containerManager;
+  private final WritableContainerFactory writableContainerFactory;
 
   private final long containerSize;
 
@@ -100,6 +101,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
         StorageUnit.BYTES);
+    this.writableContainerFactory = scm.getWritableContainerFactory();
 
     mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
 
@@ -174,89 +176,12 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
           INVALID_BLOCK_SIZE);
     }
 
-    /*
-      Here is the high level logic.
+    ContainerInfo containerInfo = writableContainerFactory.getContainer(
+        size, replicationConfig, owner, excludeList);
 
-      1. We try to find pipelines in open state.
-
-      2. If there are no pipelines in OPEN state, then we try to create one.
-
-      3. We allocate a block from the available containers in the selected
-      pipeline.
-
-      TODO : #CLUTIL Support random picking of two containers from the list.
-      So we can use different kind of policies.
-    */
-
-    ContainerInfo containerInfo;
-
-    //TODO we need to continue the refactor to use ReplicationConfig everywhere
-    //in downstream managers.
-
-    while (true) {
-      List<Pipeline> availablePipelines =
-          pipelineManager
-              .getPipelines(replicationConfig, Pipeline.PipelineState.OPEN,
-                  excludeList.getDatanodes(), excludeList.getPipelineIds());
-      Pipeline pipeline = null;
-      if (availablePipelines.size() == 0 && !excludeList.isEmpty()) {
-        // if no pipelines can be found, try finding pipeline without
-        // exclusion
-        availablePipelines = pipelineManager
-            .getPipelines(replicationConfig, Pipeline.PipelineState.OPEN);
-      }
-      if (availablePipelines.size() == 0) {
-        try {
-          // TODO: #CLUTIL Remove creation logic when all replication types and
-          // factors are handled by pipeline creator
-          pipeline = pipelineManager.createPipeline(replicationConfig);
-
-          // wait until pipeline is ready
-          pipelineManager.waitPipelineReady(pipeline.getId(), 0);
-        } catch (SCMException se) {
-          LOG.warn("Pipeline creation failed for replicationConfig {} " +
-              "Datanodes may be used up.", replicationConfig, se);
-          break;
-        } catch (IOException e) {
-          LOG.warn("Pipeline creation failed for replicationConfig: {}. "
-              + "Retrying get pipelines call once.", replicationConfig, e);
-          availablePipelines = pipelineManager
-              .getPipelines(replicationConfig, Pipeline.PipelineState.OPEN,
-                  excludeList.getDatanodes(), excludeList.getPipelineIds());
-          if (availablePipelines.size() == 0 && !excludeList.isEmpty()) {
-            // if no pipelines can be found, try finding pipeline without
-            // exclusion
-            availablePipelines = pipelineManager
-                .getPipelines(replicationConfig, Pipeline.PipelineState.OPEN);
-          }
-          if (availablePipelines.size() == 0) {
-            LOG.info(
-                "Could not find available pipeline of replicationConfig: {} "
-                    + "even after retrying",
-                replicationConfig);
-            break;
-          }
-        }
-      }
-
-      if (null == pipeline) {
-        PipelineRequestInformation pri =
-            PipelineRequestInformation.Builder.getBuilder()
-                .setSize(size)
-                .build();
-        pipeline = pipelineChoosePolicy.choosePipeline(
-            availablePipelines, pri);
-      }
-
-      // look for OPEN containers that match the criteria.
-      containerInfo = containerManager.getMatchingContainer(size, owner,
-          pipeline, excludeList.getContainerIds());
-
-      if (containerInfo != null) {
-        return newBlock(containerInfo);
-      }
+    if (containerInfo != null) {
+      return newBlock(containerInfo);
     }
-
     // we have tried all strategies we know and but somehow we are not able
     // to get a container for this block. Log that info and return a null.
     LOG.error(
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
new file mode 100644
index 0000000..356d047
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+
+import java.io.IOException;
+
+/**
+ * Factory class to obtain a container to which a block can be allocated for
+ * write.
+ */
+public class WritableContainerFactory {
+
+  private final WritableContainerProvider<ReplicationConfig> ratisProvider;
+  private final WritableContainerProvider<ReplicationConfig> standaloneProvider;
+
+  public WritableContainerFactory(StorageContainerManager scm) {
+    this.ratisProvider = new WritableRatisContainerProvider(
+        scm.getConfiguration(), scm.getPipelineManager(),
+        scm.getContainerManager(), scm.getPipelineChoosePolicy());
+    this.standaloneProvider = ratisProvider;
+  }
+
+  public ContainerInfo getContainer(final long size,
+      ReplicationConfig repConfig, String owner, ExcludeList excludeList)
+      throws IOException {
+    switch(repConfig.getReplicationType()) {
+    case STAND_ALONE:
+      return standaloneProvider
+          .getContainer(size, repConfig, owner, excludeList);
+    case RATIS:
+      return ratisProvider.getContainer(size, repConfig, owner, excludeList);
+    default:
+      throw new IOException(repConfig.getReplicationType()
+          + " is an invalid replication type");
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerProvider.java
new file mode 100644
index 0000000..d5d1776
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerProvider.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+
+import java.io.IOException;
+
+/**
+ * Interface used by the WritableContainerFactory to obtain a writable container
+ * from the providers. This interface is implemented by various
+ * WritableContainerProviders, eg Ratis and Standalone. These providers
+ * will query the open pipelines from the PipelineManager and select or allocate
+ * a container on the pipeline to allow for a new block to be created in it.
+ *
+ * The provider can also manage the number of open pipelines, including asking
+ * the pipeline manager to create a new pipeline if needed, or close a pipeline.
+ *
+ * Anytime a container needs to be selected for a new block, this interface
+ * should be used via the WritableContainerFactory instance.
+ */
+public interface WritableContainerProvider<T extends ReplicationConfig> {
+
+  /**
+   *
+   * @param size The max size of block in bytes which will be written
+   * @param repConfig The replication Config indicating if the container should
+   *                  be Ratis or Standalone.
+   * @param owner The owner of the container
+   * @param excludeList A set of datanodes, container and pipelines which should
+   *                    not be considered.
+   * @return A ContainerInfo which is open and has the capacity to store the
+   *         desired block size.
+   * @throws IOException
+   */
+  ContainerInfo getContainer(long size, T repConfig,
+      String owner, ExcludeList excludeList) throws IOException;
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
new file mode 100644
index 0000000..5ba960f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
+ * information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
+ * License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Class to obtain a writable container for Ratis and Standalone pipelines.
+ */
+public class WritableRatisContainerProvider
+    implements WritableContainerProvider<ReplicationConfig> {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(WritableRatisContainerProvider.class);
+
+  private final ConfigurationSource conf;
+  private final PipelineManager pipelineManager;
+  private final PipelineChoosePolicy pipelineChoosePolicy;
+  private final ContainerManagerV2 containerManager;
+
+  public WritableRatisContainerProvider(ConfigurationSource conf,
+      PipelineManager pipelineManager,
+      ContainerManagerV2 containerManager,
+      PipelineChoosePolicy pipelineChoosePolicy) {
+    this.conf = conf;
+    this.pipelineManager = pipelineManager;
+    this.containerManager = containerManager;
+    this.pipelineChoosePolicy = pipelineChoosePolicy;
+  }
+
+
+  @Override
+  public ContainerInfo getContainer(final long size,
+      ReplicationConfig repConfig, String owner, ExcludeList excludeList)
+      throws IOException {
+    /*
+      Here is the high level logic.
+
+      1. We try to find pipelines in open state.
+
+      2. If there are no pipelines in OPEN state, then we try to create one.
+
+      3. We allocate a block from the available containers in the selected
+      pipeline.
+
+      TODO : #CLUTIL Support random picking of two containers from the list.
+      So we can use different kind of policies.
+    */
+
+    ContainerInfo containerInfo;
+
+    //TODO we need to continue the refactor to use repConfig everywhere
+    //in downstream managers.
+
+    while (true) {
+      List<Pipeline> availablePipelines =
+          pipelineManager
+              .getPipelines(repConfig, Pipeline.PipelineState.OPEN,
+                  excludeList.getDatanodes(), excludeList.getPipelineIds());
+      Pipeline pipeline = null;
+      if (availablePipelines.size() == 0 && !excludeList.isEmpty()) {
+        // if no pipelines can be found, try finding pipeline without
+        // exclusion
+        availablePipelines = pipelineManager
+            .getPipelines(repConfig, Pipeline.PipelineState.OPEN);
+      }
+      if (availablePipelines.size() == 0) {
+        try {
+          // TODO: #CLUTIL Remove creation logic when all replication types and
+          // factors are handled by pipeline creator
+          pipeline = pipelineManager.createPipeline(repConfig);
+
+          // wait until pipeline is ready
+          pipelineManager.waitPipelineReady(pipeline.getId(), 0);
+        } catch (SCMException se) {
+          LOG.warn("Pipeline creation failed for repConfig {} " +
+              "Datanodes may be used up.", repConfig, se);
+          break;
+        } catch (IOException e) {
+          LOG.warn("Pipeline creation failed for repConfig: {}. "
+              + "Retrying get pipelines call once.", repConfig, e);
+          availablePipelines = pipelineManager
+              .getPipelines(repConfig, Pipeline.PipelineState.OPEN,
+                  excludeList.getDatanodes(), excludeList.getPipelineIds());
+          if (availablePipelines.size() == 0 && !excludeList.isEmpty()) {
+            // if no pipelines can be found, try finding pipeline without
+            // exclusion
+            availablePipelines = pipelineManager
+                .getPipelines(repConfig, Pipeline.PipelineState.OPEN);
+          }
+          if (availablePipelines.size() == 0) {
+            LOG.info(
+                "Could not find available pipeline of repConfig: {} "
+                    + "even after retrying",
+                repConfig);
+            break;
+          }
+        }
+      }
+
+      if (null == pipeline) {
+        PipelineRequestInformation pri =
+            PipelineRequestInformation.Builder.getBuilder()
+                .setSize(size)
+                .build();
+        pipeline = pipelineChoosePolicy.choosePipeline(
+            availablePipelines, pri);
+      }
+
+      // look for OPEN containers that match the criteria.
+      containerInfo = containerManager.getMatchingContainer(size, owner,
+          pipeline, excludeList.getContainerIds());
+
+      if (containerInfo != null) {
+        return containerInfo;
+      }
+    }
+
+    // we have tried all strategies we know and but somehow we are not able
+    // to get a container for this block. Log that info and return a null.
+    LOG.error(
+        "Unable to allocate a block for the size: {}, repConfig: {}",
+        size, repConfig);
+    return null;
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
index 7cdd5c5..5558204 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.pipeline.WritableContainerFactory;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
@@ -73,6 +74,7 @@ public final class SCMConfigurator {
   private NetworkTopology networkTopology;
   private SCMHAManager scmHAManager;
   private SCMContext scmContext;
+  private WritableContainerFactory writableContainerFactory;
 
   /**
    * Allows user to specify a version of Node manager to use with this SCM.
@@ -173,6 +175,15 @@ public final class SCMConfigurator {
   }
 
   /**
+   * Allows user to set the WritableContainerFactory to be used with this SCM.
+   * @param writableContainerFactory - Container Factory to use.
+   */
+  public void setWritableContainerFactory(
+      WritableContainerFactory writableContainerFactory) {
+    this.writableContainerFactory = writableContainerFactory;
+  }
+
+  /**
    * Gets SCM Node Manager.
    * @return Node Manager.
    */
@@ -259,4 +270,13 @@ public final class SCMConfigurator {
   public SCMContext getScmContext() {
     return scmContext;
   }
+
+  /**
+   * Get the WritableContainerFactory.
+   * @return WritableContainerFactory.
+   */
+  public WritableContainerFactory getWritableContainerFactory() {
+    return writableContainerFactory;
+  }
+
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 9819d77..388dd8e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
 import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
 import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
 import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.pipeline.WritableContainerFactory;
 import org.apache.hadoop.hdds.security.token.ContainerTokenGenerator;
 import org.apache.hadoop.hdds.security.token.ContainerTokenSecretManager;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
@@ -196,6 +197,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   private BlockManager scmBlockManager;
   private final SCMStorageConfig scmStorageConfig;
   private NodeDecommissionManager scmDecommissionManager;
+  private WritableContainerFactory writableContainerFactory;
 
   private SCMMetadataStore scmMetadataStore;
   private CertificateStore certificateStore;
@@ -550,6 +552,11 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     }
 
     pipelineChoosePolicy = PipelineChoosePolicyFactory.getPolicy(conf);
+    if (configurator.getWritableContainerFactory() != null) {
+      writableContainerFactory = configurator.getWritableContainerFactory();
+    } else {
+      writableContainerFactory = new WritableContainerFactory(this);
+    }
     if (configurator.getScmBlockManager() != null) {
       scmBlockManager = configurator.getScmBlockManager();
     } else {
@@ -1446,6 +1453,15 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   }
 
   /**
+   * Returns the Writable Container Factory.
+   *
+   * @return The WritableContainerFactory instance used by SCM.
+   */
+  public WritableContainerFactory getWritableContainerFactory() {
+    return writableContainerFactory;
+  }
+
+  /**
    * Returns SCM container manager.
    */
   @VisibleForTesting

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org