You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/04/01 10:20:29 UTC

[hadoop] branch trunk updated: HDDS-1295. Add MiniOzoneChaosCluster to mimic long running workload in a unit test environment. Contributed by Mukul Kumar Singh.

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 509f31b  HDDS-1295. Add MiniOzoneChaosCluster to mimic long running workload in a unit test environment. Contributed by Mukul Kumar Singh.
509f31b is described below

commit 509f31b10990ccb12266e8f4d3f38b7eaf7f6050
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Mon Apr 1 15:50:03 2019 +0530

    HDDS-1295. Add MiniOzoneChaosCluster to mimic long running workload in a unit test environment. Contributed by Mukul Kumar Singh.
---
 .../apache/hadoop/ozone/MiniOzoneChaosCluster.java | 224 +++++++++++++++++++++
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |   2 +-
 .../hadoop/ozone/MiniOzoneLoadGenerator.java       | 160 +++++++++++++++
 .../hadoop/ozone/TestMiniChaosOzoneCluster.java    | 116 +++++++++++
 4 files changed, 501 insertions(+), 1 deletion(-)

diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
new file mode 100644
index 0000000..8e25d48
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Level;
+import org.apache.ratis.grpc.client.GrpcClientProtocolClient;
+import org.apache.ratis.util.LogUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+
+/**
+ * This class causes random failures in the chaos cluster.
+ */
+public class MiniOzoneChaosCluster extends MiniOzoneClusterImpl {
+
+  static final Logger LOG =
+      LoggerFactory.getLogger(MiniOzoneChaosCluster.class);
+
+  private final int numDatanodes;
+  private final ScheduledExecutorService executorService;
+
+  private ScheduledFuture scheduledFuture;
+
+  private enum FailureMode {
+    NODES
+  }
+
+  public MiniOzoneChaosCluster(OzoneConfiguration conf,
+                               OzoneManager ozoneManager,
+                       StorageContainerManager scm,
+                       List<HddsDatanodeService> hddsDatanodes) {
+    super(conf, ozoneManager, scm, hddsDatanodes);
+
+    this.executorService =  Executors.newSingleThreadScheduledExecutor();
+    this.numDatanodes = getHddsDatanodes().size();
+    LogUtils.setLogLevel(GrpcClientProtocolClient.LOG, Level.WARN);
+  }
+
+  // Get the number of datanodes to fail in the cluster.
+  private int getNumberOfNodesToFail() {
+    return RandomUtils.nextBoolean() ? 1 : 2;
+  }
+
+  // Should the failed node wait for SCM to register the even before
+  // restart, i.e fast restart or not.
+  private boolean isFastRestart() {
+    return RandomUtils.nextBoolean();
+  }
+
+  // Get the datanode index of the datanode to fail.
+  private int getNodeToFail() {
+    return RandomUtils.nextInt() % numDatanodes;
+  }
+
+  private void failNodes() {
+    for (int i = 0; i < getNumberOfNodesToFail(); i++) {
+      boolean failureMode = isFastRestart();
+      int failedNodeIndex = getNodeToFail();
+      try {
+        restartHddsDatanode(failedNodeIndex, failureMode);
+      } catch (Exception e) {
+
+      }
+    }
+  }
+
+  private FailureMode getFailureMode() {
+    return FailureMode.
+        values()[RandomUtils.nextInt() % FailureMode.values().length];
+  }
+
+  // Fail nodes randomly at configured timeout period.
+  private void fail() {
+    FailureMode mode = getFailureMode();
+    switch (mode) {
+    case NODES:
+      failNodes();
+      break;
+
+    default:
+      LOG.error("invalid failure mode:{}", mode);
+      break;
+    }
+  }
+
+  void startChaos(long initialDelay, long period, TimeUnit timeUnit) {
+    scheduledFuture = executorService.scheduleAtFixedRate(this::fail,
+        initialDelay, period, timeUnit);
+  }
+
+  void stopChaos() throws Exception {
+    scheduledFuture.cancel(false);
+    scheduledFuture.get();
+  }
+
+  public void shutdown() {
+    super.shutdown();
+    try {
+      stopChaos();
+      executorService.shutdown();
+      executorService.awaitTermination(1, TimeUnit.DAYS);
+    } catch (Exception e) {
+      LOG.error("failed to shutdown MiniOzoneChaosCluster", e);
+    }
+  }
+
+  /**
+   * Builder for configuring the MiniOzoneChaosCluster to run.
+   */
+  public static class Builder extends MiniOzoneClusterImpl.Builder {
+
+    /**
+     * Creates a new Builder.
+     *
+     * @param conf configuration
+     */
+    public Builder(OzoneConfiguration conf) {
+      super(conf);
+    }
+
+    /**
+     * Sets the number of HddsDatanodes to be started as part of
+     * MiniOzoneChaosCluster.
+     *
+     * @param val number of datanodes
+     *
+     * @return MiniOzoneChaosCluster.Builder
+     */
+    public Builder setNumDatanodes(int val) {
+      super.setNumDatanodes(val);
+      return this;
+    }
+
+    @Override
+    void initializeConfiguration() throws IOException {
+      super.initializeConfiguration();
+      conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
+          2, StorageUnit.KB);
+      conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
+          16, StorageUnit.KB);
+      conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
+          4, StorageUnit.KB);
+      conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
+          8, StorageUnit.KB);
+      conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+          1, StorageUnit.MB);
+      conf.setTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, 1000,
+          TimeUnit.MILLISECONDS);
+      conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL, 5,
+          TimeUnit.SECONDS);
+      conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
+          TimeUnit.SECONDS);
+      conf.setTimeDuration(HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, 1,
+          TimeUnit.SECONDS);
+      conf.setTimeDuration(
+          ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, 5,
+          TimeUnit.SECONDS);
+      conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
+          1, TimeUnit.SECONDS);
+      conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1,
+          TimeUnit.SECONDS);
+    }
+
+    @Override
+    public MiniOzoneChaosCluster build() throws IOException {
+      DefaultMetricsSystem.setMiniClusterMode(true);
+      initializeConfiguration();
+      StorageContainerManager scm;
+      OzoneManager om;
+      try {
+        scm = createSCM();
+        scm.start();
+        om = createOM();
+        if(certClient != null) {
+          om.setCertClient(certClient);
+        }
+      } catch (AuthenticationException ex) {
+        throw new IOException("Unable to build MiniOzoneCluster. ", ex);
+      }
+
+      om.start();
+      final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
+      MiniOzoneChaosCluster cluster =
+          new MiniOzoneChaosCluster(conf, om, scm, hddsDatanodes);
+      if (startDataNodes) {
+        cluster.startHddsDatanodes();
+      }
+      return cluster;
+    }
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index e746f33..5cd0841 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -510,7 +510,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
      *
      * @throws IOException
      */
-    private OzoneManager createOM()
+    OzoneManager createOM()
         throws IOException, AuthenticationException {
       configureOM();
       OMStorage omStore = new OMStorage(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java
new file mode 100644
index 0000000..efb3b66
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A Simple Load generator for testing.
+ */
+public class MiniOzoneLoadGenerator {
+
+  static final Logger LOG =
+      LoggerFactory.getLogger(MiniOzoneLoadGenerator.class);
+
+  private ThreadPoolExecutor writeExecutor;
+  private int numWriteThreads;
+  // number of buffer to be allocated, each is allocated with length which
+  // is multiple of 2, each buffer is populated with random data.
+  private int numBuffers;
+  private List<ByteBuffer> buffers;
+
+  private AtomicBoolean isWriteThreadRunning;
+
+  private final OzoneBucket ozoneBucket;
+
+  MiniOzoneLoadGenerator(OzoneBucket bucket, int numThreads, int numBuffers) {
+    this.ozoneBucket = bucket;
+    this.numWriteThreads = numThreads;
+    this.numBuffers = numBuffers;
+    this.writeExecutor = new ThreadPoolExecutor(numThreads, numThreads, 100,
+        TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
+        new ThreadPoolExecutor.CallerRunsPolicy());
+    this.writeExecutor.prestartAllCoreThreads();
+
+    this.isWriteThreadRunning = new AtomicBoolean(false);
+
+    // allocate buffers and populate random data.
+    buffers = new ArrayList<>();
+    for (int i = 0; i < numBuffers; i++) {
+      int size = (int) StorageUnit.KB.toBytes(1 << i);
+      ByteBuffer buffer = ByteBuffer.allocate(size);
+      buffer.put(RandomUtils.nextBytes(size));
+      buffers.add(buffer);
+    }
+  }
+
+  // Start IO load on an Ozone bucket.
+  private void load(long runTimeMillis) {
+    LOG.info("Started IO Thread" + Thread.currentThread().getId());
+    String threadName = Thread.currentThread().getName();
+    long startTime = Time.monotonicNow();
+
+    while (isWriteThreadRunning.get() &&
+        (Time.monotonicNow() < startTime + runTimeMillis)) {
+      // choose a random buffer.
+      int index = RandomUtils.nextInt();
+      ByteBuffer buffer = buffers.get(index % numBuffers);
+      int bufferCapacity = buffer.capacity();
+
+      String keyName = threadName + "-" + index;
+      try (OzoneOutputStream stream = ozoneBucket.createKey(keyName,
+          bufferCapacity, ReplicationType.RATIS, ReplicationFactor.THREE,
+          new HashMap<>())) {
+        stream.write(buffer.array());
+      } catch (Exception e) {
+        LOG.error("LOADGEN: Create key:{} failed with exception", keyName, e);
+        break;
+      }
+
+      try (OzoneInputStream stream = ozoneBucket.readKey(keyName)) {
+        byte[] readBuffer = new byte[bufferCapacity];
+        int readLen = stream.read(readBuffer);
+
+        if (readLen < bufferCapacity) {
+          LOG.error("LOADGEN: Read mismatch, key:{} read data length:{} is " +
+              "smaller than excepted:{}", keyName, readLen, bufferCapacity);
+          break;
+        }
+
+        if (!Arrays.equals(readBuffer, buffer.array())) {
+          LOG.error("LOADGEN: Read mismatch, key:{} Read data does not match " +
+              "the written data", keyName);
+          break;
+        }
+
+      } catch (Exception e) {
+        LOG.error("Read key:{} failed with exception", keyName, e);
+        break;
+      }
+
+    }
+    // This will terminate other threads too.
+    isWriteThreadRunning.set(false);
+  }
+
+  public void startIO(long time, TimeUnit timeUnit) {
+    List<CompletableFuture<Void>> writeFutures = new ArrayList<>();
+    if (isWriteThreadRunning.compareAndSet(false, true)) {
+      // Start the IO thread
+      for (int i = 0; i < numWriteThreads; i++) {
+        writeFutures.add(
+            CompletableFuture.runAsync(() -> load(timeUnit.toMillis(time)),
+                writeExecutor));
+      }
+
+      // Wait for IO to complete
+      for (CompletableFuture<Void> f : writeFutures) {
+        try {
+          f.get();
+        } catch (Throwable t) {
+          LOG.error("startIO failed with exception", t);
+        }
+      }
+    }
+  }
+
+  public void shutdownLoadGenerator() {
+    try {
+      writeExecutor.shutdown();
+      writeExecutor.awaitTermination(1, TimeUnit.DAYS);
+    } catch (Exception e) {
+      LOG.error("error while closing ", e);
+    }
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java
new file mode 100644
index 0000000..0438351
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+import picocli.CommandLine;
+
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test Read Write with Mini Ozone Chaos Cluster.
+ */
+@Command(description = "Starts IO with MiniOzoneChaosCluster",
+    name = "chaos", mixinStandardHelpOptions = true)
+public class TestMiniChaosOzoneCluster implements Runnable {
+
+  @Option(names = {"-d", "--numDatanodes"},
+      description = "num of datanodes")
+  private static int numDatanodes = 20;
+
+  @Option(names = {"-t", "--numThreads"},
+      description = "num of IO threads")
+  private static int numThreads = 10;
+
+  @Option(names = {"-b", "--numBuffers"},
+      description = "num of IO buffers")
+  private static int numBuffers = 16;
+
+  @Option(names = {"-m", "--numMinutes"},
+      description = "total run time")
+  private static int numMinutes = 1440; // 1 day by default
+
+  @Option(names = {"-i", "--failureInterval"},
+      description = "time between failure events in seconds")
+  private static int failureInterval = 5; // 5 second period between failures.
+
+  private static MiniOzoneChaosCluster cluster;
+  private static MiniOzoneLoadGenerator loadGenerator;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    cluster = new MiniOzoneChaosCluster.Builder(new OzoneConfiguration())
+        .setNumDatanodes(numDatanodes).build();
+    cluster.waitForClusterToBeReady();
+
+    String volumeName = RandomStringUtils.randomAlphabetic(10).toLowerCase();
+    String bucketName = RandomStringUtils.randomAlphabetic(10).toLowerCase();
+    ObjectStore store = cluster.getRpcClient().getObjectStore();
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket ozoneBucket = volume.getBucket(bucketName);
+    loadGenerator =
+        new MiniOzoneLoadGenerator(ozoneBucket, numThreads, numBuffers);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (loadGenerator != null) {
+      loadGenerator.shutdownLoadGenerator();
+    }
+
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  public void run() {
+    try {
+      init();
+      cluster.startChaos(5, failureInterval, TimeUnit.SECONDS);
+      loadGenerator.startIO(numMinutes, TimeUnit.MINUTES);
+    } catch (Exception e) {
+    } finally {
+      shutdown();
+    }
+  }
+
+  public static void main(String... args) {
+    CommandLine.run(new TestMiniChaosOzoneCluster(), System.err, args);
+  }
+
+  @Test
+  public void testReadWriteWithChaosCluster() throws Exception {
+    cluster.startChaos(5, 1, TimeUnit.SECONDS);
+    loadGenerator.startIO(1, TimeUnit.MINUTES);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org