You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2021/08/31 18:03:34 UTC

[ozone] branch master updated: HDDS-5644. Speed up decommission tests using a background Mini Cluster provider (#2554)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new cdac8bc  HDDS-5644. Speed up decommission tests using a background Mini Cluster provider (#2554)
cdac8bc is described below

commit cdac8bc30313b28338bf0478ec3d67287b2b51db
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Tue Aug 31 19:03:16 2021 +0100

    HDDS-5644. Speed up decommission tests using a background Mini Cluster provider (#2554)
---
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |   7 +-
 .../hadoop/ozone/MiniOzoneClusterProvider.java     | 283 +++++++++++++++++++++
 .../scm/node/TestDecommissionAndMaintenance.java   |  36 ++-
 3 files changed, 315 insertions(+), 11 deletions(-)

diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 0263fcf..8e30540 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -295,7 +295,7 @@ public interface MiniOzoneCluster {
     protected static final int DEFAULT_PIPELIME_LIMIT = 3;
     protected static final int DEFAULT_RATIS_RPC_TIMEOUT_SEC = 1;
 
-    protected final OzoneConfiguration conf;
+    protected OzoneConfiguration conf;
     protected String path;
 
     protected String clusterId;
@@ -341,6 +341,11 @@ public interface MiniOzoneCluster {
       setClusterId(UUID.randomUUID().toString());
     }
 
+    public Builder setConf(OzoneConfiguration config) {
+      this.conf = config;
+      return this;
+    }
+
     /**
      * Sets the cluster Id.
      *
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterProvider.java
new file mode 100644
index 0000000..e2916cd
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterProvider.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * Class to create mini-clusters in the background. In general creating a mini
+ * Cluster takes 15 - 30 seconds and destroying one can take about 10 seconds.
+ *
+ * When there are tests that must create a new cluster per test, the time taken
+ * to create the clusters can greatly increase the test runtime. If the test
+ * logic runs for longer than the time to create the cluster, then it can make
+ * sense to create a new cluster in the background while the first test is
+ * running. Then when test 1 completes, we can destroy its cluster in the
+ * background and immediately give a new cluster to test 2.
+ *
+ * If the runtime of the test logic is very fast (only a second or two) after
+ * the cluster is started, this class may not help much, as there will be no
+ * time to create the new cluster in the background before the next test starts,
+ * however shutting down the cluster in the background while the new cluster is
+ * getting created will likely save about 10 seconds per test.
+ *
+ * To use this class, setup the Cluster Provider in a static method annotated
+ * with @BeforeClass, eg:
+ *
+ *   @BeforeClass
+ *   public static void init() {
+ *     OzoneConfiguration conf = new OzoneConfiguration();
+ *     final int interval = 100;
+ *
+ *     conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
+ *         interval, TimeUnit.MILLISECONDS);
+ *     ...
+ *
+ *     ReplicationManagerConfiguration replicationConf =
+ *         conf.getObject(ReplicationManagerConfiguration.class);
+ *     replicationConf.setInterval(Duration.ofSeconds(1));
+ *     conf.setFromObject(replicationConf);
+ *
+ *     MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf)
+ *         .setNumDatanodes(numOfDatanodes);
+ *
+ *     clusterProvider = new MiniOzoneClusterProvider(conf, builder, 5);
+ *   }
+ *
+ * Ensure you shutdown the provider in a @AfterClass annotated method:
+ *
+ *   @AfterClass
+ *   public static void shutdown() throws InterruptedException {
+ *     if (clusterProvider != null) {
+ *       clusterProvider.shutdown();
+ *     }
+ *   }
+ *
+ * Then in the @Before method, or in the test itself, obtain a cluster:
+ *
+ *   @Before
+ *   public void setUp() throws Exception {
+ *     cluster = clusterProvider.provide();
+ *   }
+ *
+ *   @After
+ *   public void tearDown() throws InterruptedException, IOException {
+ *     if (cluster != null) {
+ *       clusterProvider.destroy(cluster);
+ *     }
+ *   }
+ *
+ *  This only works if the same config / builder object can be passed to each
+ *  cluster in the test suite.
+ *
+ *  Also note that the expected number of clusters must be passed to the
+ *  provider. As things stand, the provider always starts creating a new cluster
+ *  each time a cluster is requested, so it always has one in reserve. This will
+ *  result in an extra cluster getting created which is not needed. To avoid
+ *  this, pass the number of expected clusters into the provider. This will
+ *  avoid the extra cluster creation, but if you request more clusters than the
+ *  limit an error will be thrown.
+ *
+ */
+public class MiniOzoneClusterProvider {
+
+  private static final Logger LOG
+      = LoggerFactory.getLogger(MiniOzoneClusterProvider.class);
+
+  private static final int PRE_CREATE_LIMIT = 1;
+  private static final int EXPIRED_LIMIT = 4;
+  private volatile boolean shutdown = false;
+  private final int clusterLimit;
+  private int consumedClusterCount = 0;
+
+  private final OzoneConfiguration conf;
+  private final MiniOzoneCluster.Builder builder;
+  private final Thread createThread;
+  private final Thread reapThread;
+
+  private final Set<MiniOzoneCluster> createdClusters = new HashSet<>();
+  private final BlockingQueue<MiniOzoneCluster> clusters
+      = new ArrayBlockingQueue<>(PRE_CREATE_LIMIT);
+  private final BlockingQueue<MiniOzoneCluster> expiredClusters
+      = new ArrayBlockingQueue<>(EXPIRED_LIMIT);
+
+  /**
+   *
+   * @param conf The configuration to use when creating the cluster
+   * @param builder A builder object with all cluster options set
+   * @param clusterLimit The total number of clusters this provider should
+   *                     create. If another is requested after this limit has
+   *                     been reached, an exception will be thrown.
+   */
+  public MiniOzoneClusterProvider(OzoneConfiguration conf,
+      MiniOzoneCluster.Builder builder, int clusterLimit) {
+    this.conf = conf;
+    this.builder = builder;
+    this.clusterLimit = clusterLimit;
+    createThread = createClusters();
+    reapThread = reapClusters();
+  }
+
+  public synchronized MiniOzoneCluster provide()
+      throws InterruptedException, IOException {
+    ensureNotShutdown();
+    if (consumedClusterCount >= clusterLimit) {
+      throw new IOException("The cluster limit of " + clusterLimit + " has "
+          + "been reached for this provider. Please increase the value set "
+          + "in the constructor");
+    }
+    MiniOzoneCluster cluster = clusters.poll(100, SECONDS);
+    createdClusters.add(cluster);
+    consumedClusterCount++;
+    return cluster;
+  }
+
+  public synchronized void destroy(MiniOzoneCluster c)
+      throws InterruptedException, IOException {
+    ensureNotShutdown();
+    createdClusters.remove(c);
+    expiredClusters.put(c);
+  }
+
+  public synchronized void shutdown() throws InterruptedException {
+    createThread.interrupt();
+    createThread.join();
+    destroyRemainingClusters();
+    shutdown = true;
+    reapThread.join();
+  }
+
+  private void ensureNotShutdown() throws IOException {
+    if (shutdown) {
+      throw new IOException("The mini-cluster provider is shutdown");
+    }
+  }
+
+  private Thread reapClusters() {
+    Thread t = new Thread(() -> {
+      while(!shutdown || !expiredClusters.isEmpty()) {
+        try {
+          // Why not just call take and wait forever until interrupt is
+          // thrown? Inside MiniCluster.shutdown, there are places where it
+          // waits on things to happen, and the interrupt can interrupt those
+          // and prevent the shutdown from happening correctly. Therefore it is
+          // safer to just poll and avoid interrupting this thread.
+          MiniOzoneCluster c = expiredClusters.poll(100,
+              TimeUnit.MILLISECONDS);
+          if (c != null) {
+            c.shutdown();
+          }
+        } catch (Exception e) {
+          LOG.error("Unexpected exception received", e);
+        }
+      }
+    });
+    t.setName("Mini-Cluster-Provider-Reap");
+    t.start();
+    return t;
+  }
+
+  private Thread createClusters() {
+    Thread t = new Thread(() -> {
+      int createdCount = 0;
+      while (!Thread.interrupted() && createdCount < clusterLimit) {
+        MiniOzoneCluster cluster = null;
+        try {
+          builder.setClusterId(UUID.randomUUID().toString());
+
+          OzoneConfiguration newConf = new OzoneConfiguration(conf);
+          List<Integer> portList = getFreePortList(4);
+          newConf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
+              "127.0.0.1:" + portList.get(0));
+          newConf.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY,
+              "127.0.0.1:" + portList.get(1));
+          newConf.set(OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY,
+              "127.0.0.1:" + portList.get(2));
+          newConf.setInt(OMConfigKeys.OZONE_OM_RATIS_PORT_KEY,
+              portList.get(3));
+          builder.setConf(newConf);
+
+          cluster = builder.build();
+          cluster.waitForClusterToBeReady();
+          createdCount++;
+          clusters.put(cluster);
+        } catch (InterruptedException e) {
+          if (cluster != null) {
+            cluster.shutdown();
+          }
+          break;
+        } catch (IOException | TimeoutException e) {
+          throw new RuntimeException("Unable to build cluster", e);
+        }
+      }
+    });
+    t.setName("Mini-Cluster-Provider-Create");
+    t.start();
+    return t;
+  }
+
+  private void destroyRemainingClusters() {
+    while(!clusters.isEmpty()) {
+      try {
+        MiniOzoneCluster cluster = clusters.poll();
+        if (cluster != null) {
+          destroy(cluster);
+        }
+      } catch (InterruptedException | IOException e) {
+        LOG.error("Caught exception when destroying clusters", e);
+        // Do nothing as we want to ensure all clusters try to shutdown.
+      }
+    }
+    // If there are any clusters remaining in createdClusters, then destroy
+    // them too. This could be due to a test failing to return the cluster
+    // for some reason
+    MiniOzoneCluster[] remaining
+        = createdClusters.toArray(new MiniOzoneCluster[0]);
+    for (MiniOzoneCluster c : remaining) {
+      try {
+        destroy(c);
+      } catch (InterruptedException | IOException e) {
+        LOG.error("Caught exception when destroying remaining clusters", e);
+      }
+    }
+    createdClusters.clear();
+  }
+
+  private List<Integer> getFreePortList(int size) {
+    return org.apache.ratis.util.NetUtils.createLocalServerAddress(size)
+        .stream()
+        .map(inetSocketAddress -> inetSocketAddress.getPort())
+        .collect(Collectors.toList());
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
index 7b46941..5cc0570 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
@@ -36,12 +36,15 @@ import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneClusterProvider;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.TestDataUtil;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,8 +101,10 @@ public class TestDecommissionAndMaintenance {
 
   private ContainerOperationClient scmClient;
 
-  @Before
-  public void setUp() throws Exception {
+  private static MiniOzoneClusterProvider clusterProvider;
+
+  @BeforeClass
+  public static void init() {
     OzoneConfiguration conf = new OzoneConfiguration();
     final int interval = 100;
 
@@ -121,20 +126,31 @@ public class TestDecommissionAndMaintenance {
     replicationConf.setInterval(Duration.ofSeconds(1));
     conf.setFromObject(replicationConf);
 
-    cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(numOfDatanodes)
-        .build();
-    cluster.waitForClusterToBeReady();
-    setManagers();
+    MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(numOfDatanodes);
+
+    clusterProvider = new MiniOzoneClusterProvider(conf, builder, 11);
+  }
 
+  @AfterClass
+  public static void shutdown() throws InterruptedException {
+    if (clusterProvider != null) {
+      clusterProvider.shutdown();
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    cluster = clusterProvider.provide();
+    setManagers();
     bucket = TestDataUtil.createVolumeAndBucket(cluster, volName, bucketName);
-    scmClient = new ContainerOperationClient(conf);
+    scmClient = new ContainerOperationClient(cluster.getConf());
   }
 
   @After
-  public void tearDown() {
+  public void tearDown() throws InterruptedException, IOException {
     if (cluster != null) {
-      cluster.shutdown();
+      clusterProvider.destroy(cluster);
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org