You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/04/01 10:20:29 UTC
[hadoop] branch trunk updated: HDDS-1295. Add MiniOzoneChaosCluster
to mimic long running workload in a unit test environment. Contributed by
Mukul Kumar Singh.
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 509f31b HDDS-1295. Add MiniOzoneChaosCluster to mimic long running workload in a unit test environment. Contributed by Mukul Kumar Singh.
509f31b is described below
commit 509f31b10990ccb12266e8f4d3f38b7eaf7f6050
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Mon Apr 1 15:50:03 2019 +0530
HDDS-1295. Add MiniOzoneChaosCluster to mimic long running workload in a unit test environment. Contributed by Mukul Kumar Singh.
---
.../apache/hadoop/ozone/MiniOzoneChaosCluster.java | 224 +++++++++++++++++++++
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 2 +-
.../hadoop/ozone/MiniOzoneLoadGenerator.java | 160 +++++++++++++++
.../hadoop/ozone/TestMiniChaosOzoneCluster.java | 116 +++++++++++
4 files changed, 501 insertions(+), 1 deletion(-)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
new file mode 100644
index 0000000..8e25d48
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Level;
+import org.apache.ratis.grpc.client.GrpcClientProtocolClient;
+import org.apache.ratis.util.LogUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+
+/**
+ * This class causes random failures in the chaos cluster.
+ */
+public class MiniOzoneChaosCluster extends MiniOzoneClusterImpl {
+
+ static final Logger LOG =
+ LoggerFactory.getLogger(MiniOzoneChaosCluster.class);
+
+ private final int numDatanodes;
+ private final ScheduledExecutorService executorService;
+
+ private ScheduledFuture scheduledFuture;
+
+ private enum FailureMode {
+ NODES
+ }
+
+ public MiniOzoneChaosCluster(OzoneConfiguration conf,
+ OzoneManager ozoneManager,
+ StorageContainerManager scm,
+ List<HddsDatanodeService> hddsDatanodes) {
+ super(conf, ozoneManager, scm, hddsDatanodes);
+
+ this.executorService = Executors.newSingleThreadScheduledExecutor();
+ this.numDatanodes = getHddsDatanodes().size();
+ LogUtils.setLogLevel(GrpcClientProtocolClient.LOG, Level.WARN);
+ }
+
+ // Get the number of datanodes to fail in the cluster.
+ private int getNumberOfNodesToFail() {
+ return RandomUtils.nextBoolean() ? 1 : 2;
+ }
+
+ // Should the failed node wait for SCM to register the even before
+ // restart, i.e fast restart or not.
+ private boolean isFastRestart() {
+ return RandomUtils.nextBoolean();
+ }
+
+ // Get the datanode index of the datanode to fail.
+ private int getNodeToFail() {
+ return RandomUtils.nextInt() % numDatanodes;
+ }
+
+ private void failNodes() {
+ for (int i = 0; i < getNumberOfNodesToFail(); i++) {
+ boolean failureMode = isFastRestart();
+ int failedNodeIndex = getNodeToFail();
+ try {
+ restartHddsDatanode(failedNodeIndex, failureMode);
+ } catch (Exception e) {
+
+ }
+ }
+ }
+
+ private FailureMode getFailureMode() {
+ return FailureMode.
+ values()[RandomUtils.nextInt() % FailureMode.values().length];
+ }
+
+ // Fail nodes randomly at configured timeout period.
+ private void fail() {
+ FailureMode mode = getFailureMode();
+ switch (mode) {
+ case NODES:
+ failNodes();
+ break;
+
+ default:
+ LOG.error("invalid failure mode:{}", mode);
+ break;
+ }
+ }
+
+ void startChaos(long initialDelay, long period, TimeUnit timeUnit) {
+ scheduledFuture = executorService.scheduleAtFixedRate(this::fail,
+ initialDelay, period, timeUnit);
+ }
+
+ void stopChaos() throws Exception {
+ scheduledFuture.cancel(false);
+ scheduledFuture.get();
+ }
+
+ public void shutdown() {
+ super.shutdown();
+ try {
+ stopChaos();
+ executorService.shutdown();
+ executorService.awaitTermination(1, TimeUnit.DAYS);
+ } catch (Exception e) {
+ LOG.error("failed to shutdown MiniOzoneChaosCluster", e);
+ }
+ }
+
+ /**
+ * Builder for configuring the MiniOzoneChaosCluster to run.
+ */
+ public static class Builder extends MiniOzoneClusterImpl.Builder {
+
+ /**
+ * Creates a new Builder.
+ *
+ * @param conf configuration
+ */
+ public Builder(OzoneConfiguration conf) {
+ super(conf);
+ }
+
+ /**
+ * Sets the number of HddsDatanodes to be started as part of
+ * MiniOzoneChaosCluster.
+ *
+ * @param val number of datanodes
+ *
+ * @return MiniOzoneChaosCluster.Builder
+ */
+ public Builder setNumDatanodes(int val) {
+ super.setNumDatanodes(val);
+ return this;
+ }
+
+ @Override
+ void initializeConfiguration() throws IOException {
+ super.initializeConfiguration();
+ conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
+ 2, StorageUnit.KB);
+ conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
+ 16, StorageUnit.KB);
+ conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
+ 4, StorageUnit.KB);
+ conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
+ 8, StorageUnit.KB);
+ conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+ 1, StorageUnit.MB);
+ conf.setTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, 1000,
+ TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL, 5,
+ TimeUnit.SECONDS);
+ conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
+ TimeUnit.SECONDS);
+ conf.setTimeDuration(HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, 1,
+ TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, 5,
+ TimeUnit.SECONDS);
+ conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
+ 1, TimeUnit.SECONDS);
+ conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1,
+ TimeUnit.SECONDS);
+ }
+
+ @Override
+ public MiniOzoneChaosCluster build() throws IOException {
+ DefaultMetricsSystem.setMiniClusterMode(true);
+ initializeConfiguration();
+ StorageContainerManager scm;
+ OzoneManager om;
+ try {
+ scm = createSCM();
+ scm.start();
+ om = createOM();
+ if(certClient != null) {
+ om.setCertClient(certClient);
+ }
+ } catch (AuthenticationException ex) {
+ throw new IOException("Unable to build MiniOzoneCluster. ", ex);
+ }
+
+ om.start();
+ final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
+ MiniOzoneChaosCluster cluster =
+ new MiniOzoneChaosCluster(conf, om, scm, hddsDatanodes);
+ if (startDataNodes) {
+ cluster.startHddsDatanodes();
+ }
+ return cluster;
+ }
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index e746f33..5cd0841 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -510,7 +510,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
*
* @throws IOException
*/
- private OzoneManager createOM()
+ OzoneManager createOM()
throws IOException, AuthenticationException {
configureOM();
OMStorage omStore = new OMStorage(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java
new file mode 100644
index 0000000..efb3b66
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A Simple Load generator for testing.
+ */
+public class MiniOzoneLoadGenerator {
+
+ static final Logger LOG =
+ LoggerFactory.getLogger(MiniOzoneLoadGenerator.class);
+
+ private ThreadPoolExecutor writeExecutor;
+ private int numWriteThreads;
+ // number of buffer to be allocated, each is allocated with length which
+ // is multiple of 2, each buffer is populated with random data.
+ private int numBuffers;
+ private List<ByteBuffer> buffers;
+
+ private AtomicBoolean isWriteThreadRunning;
+
+ private final OzoneBucket ozoneBucket;
+
+ MiniOzoneLoadGenerator(OzoneBucket bucket, int numThreads, int numBuffers) {
+ this.ozoneBucket = bucket;
+ this.numWriteThreads = numThreads;
+ this.numBuffers = numBuffers;
+ this.writeExecutor = new ThreadPoolExecutor(numThreads, numThreads, 100,
+ TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ this.writeExecutor.prestartAllCoreThreads();
+
+ this.isWriteThreadRunning = new AtomicBoolean(false);
+
+ // allocate buffers and populate random data.
+ buffers = new ArrayList<>();
+ for (int i = 0; i < numBuffers; i++) {
+ int size = (int) StorageUnit.KB.toBytes(1 << i);
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ buffer.put(RandomUtils.nextBytes(size));
+ buffers.add(buffer);
+ }
+ }
+
+ // Start IO load on an Ozone bucket.
+ private void load(long runTimeMillis) {
+ LOG.info("Started IO Thread" + Thread.currentThread().getId());
+ String threadName = Thread.currentThread().getName();
+ long startTime = Time.monotonicNow();
+
+ while (isWriteThreadRunning.get() &&
+ (Time.monotonicNow() < startTime + runTimeMillis)) {
+ // choose a random buffer.
+ int index = RandomUtils.nextInt();
+ ByteBuffer buffer = buffers.get(index % numBuffers);
+ int bufferCapacity = buffer.capacity();
+
+ String keyName = threadName + "-" + index;
+ try (OzoneOutputStream stream = ozoneBucket.createKey(keyName,
+ bufferCapacity, ReplicationType.RATIS, ReplicationFactor.THREE,
+ new HashMap<>())) {
+ stream.write(buffer.array());
+ } catch (Exception e) {
+ LOG.error("LOADGEN: Create key:{} failed with exception", keyName, e);
+ break;
+ }
+
+ try (OzoneInputStream stream = ozoneBucket.readKey(keyName)) {
+ byte[] readBuffer = new byte[bufferCapacity];
+ int readLen = stream.read(readBuffer);
+
+ if (readLen < bufferCapacity) {
+ LOG.error("LOADGEN: Read mismatch, key:{} read data length:{} is " +
+ "smaller than excepted:{}", keyName, readLen, bufferCapacity);
+ break;
+ }
+
+ if (!Arrays.equals(readBuffer, buffer.array())) {
+ LOG.error("LOADGEN: Read mismatch, key:{} Read data does not match " +
+ "the written data", keyName);
+ break;
+ }
+
+ } catch (Exception e) {
+ LOG.error("Read key:{} failed with exception", keyName, e);
+ break;
+ }
+
+ }
+ // This will terminate other threads too.
+ isWriteThreadRunning.set(false);
+ }
+
+ public void startIO(long time, TimeUnit timeUnit) {
+ List<CompletableFuture<Void>> writeFutures = new ArrayList<>();
+ if (isWriteThreadRunning.compareAndSet(false, true)) {
+ // Start the IO thread
+ for (int i = 0; i < numWriteThreads; i++) {
+ writeFutures.add(
+ CompletableFuture.runAsync(() -> load(timeUnit.toMillis(time)),
+ writeExecutor));
+ }
+
+ // Wait for IO to complete
+ for (CompletableFuture<Void> f : writeFutures) {
+ try {
+ f.get();
+ } catch (Throwable t) {
+ LOG.error("startIO failed with exception", t);
+ }
+ }
+ }
+ }
+
+ public void shutdownLoadGenerator() {
+ try {
+ writeExecutor.shutdown();
+ writeExecutor.awaitTermination(1, TimeUnit.DAYS);
+ } catch (Exception e) {
+ LOG.error("error while closing ", e);
+ }
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java
new file mode 100644
index 0000000..0438351
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+import picocli.CommandLine;
+
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test Read Write with Mini Ozone Chaos Cluster.
+ */
+@Command(description = "Starts IO with MiniOzoneChaosCluster",
+ name = "chaos", mixinStandardHelpOptions = true)
+public class TestMiniChaosOzoneCluster implements Runnable {
+
+ @Option(names = {"-d", "--numDatanodes"},
+ description = "num of datanodes")
+ private static int numDatanodes = 20;
+
+ @Option(names = {"-t", "--numThreads"},
+ description = "num of IO threads")
+ private static int numThreads = 10;
+
+ @Option(names = {"-b", "--numBuffers"},
+ description = "num of IO buffers")
+ private static int numBuffers = 16;
+
+ @Option(names = {"-m", "--numMinutes"},
+ description = "total run time")
+ private static int numMinutes = 1440; // 1 day by default
+
+ @Option(names = {"-i", "--failureInterval"},
+ description = "time between failure events in seconds")
+ private static int failureInterval = 5; // 5 second period between failures.
+
+ private static MiniOzoneChaosCluster cluster;
+ private static MiniOzoneLoadGenerator loadGenerator;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ cluster = new MiniOzoneChaosCluster.Builder(new OzoneConfiguration())
+ .setNumDatanodes(numDatanodes).build();
+ cluster.waitForClusterToBeReady();
+
+ String volumeName = RandomStringUtils.randomAlphabetic(10).toLowerCase();
+ String bucketName = RandomStringUtils.randomAlphabetic(10).toLowerCase();
+ ObjectStore store = cluster.getRpcClient().getObjectStore();
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket ozoneBucket = volume.getBucket(bucketName);
+ loadGenerator =
+ new MiniOzoneLoadGenerator(ozoneBucket, numThreads, numBuffers);
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @AfterClass
+ public static void shutdown() {
+ if (loadGenerator != null) {
+ loadGenerator.shutdownLoadGenerator();
+ }
+
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ public void run() {
+ try {
+ init();
+ cluster.startChaos(5, failureInterval, TimeUnit.SECONDS);
+ loadGenerator.startIO(numMinutes, TimeUnit.MINUTES);
+ } catch (Exception e) {
+ } finally {
+ shutdown();
+ }
+ }
+
+ public static void main(String... args) {
+ CommandLine.run(new TestMiniChaosOzoneCluster(), System.err, args);
+ }
+
+ @Test
+ public void testReadWriteWithChaosCluster() throws Exception {
+ cluster.startChaos(5, 1, TimeUnit.SECONDS);
+ loadGenerator.startIO(1, TimeUnit.MINUTES);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org