You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/08/02 16:20:46 UTC
[hadoop] branch trunk updated: HDDS-1782. Add an option to
MiniOzoneChaosCluster to read files multiple times. Contributed by Mukul
Kumar Singh. (#1076)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new b5c74d4 HDDS-1782. Add an option to MiniOzoneChaosCluster to read files multiple times. Contributed by Mukul Kumar Singh. (#1076)
b5c74d4 is described below
commit b5c74d4ab88e2437c4a4f0464d0e2ea172c68367
Author: Mukul Kumar Singh <ms...@apache.org>
AuthorDate: Fri Aug 2 21:50:39 2019 +0530
HDDS-1782. Add an option to MiniOzoneChaosCluster to read files multiple times. Contributed by Mukul Kumar Singh. (#1076)
---
.../integration-test/src/test/bin/start-chaos.sh | 2 +-
.../apache/hadoop/ozone/MiniOzoneChaosCluster.java | 6 +-
.../hadoop/ozone/MiniOzoneLoadGenerator.java | 153 ++++++++++++++++-----
.../hadoop/ozone/TestMiniChaosOzoneCluster.java | 9 +-
.../apache/hadoop/ozone/chaos/TestProbability.java | 43 ++++++
5 files changed, 173 insertions(+), 40 deletions(-)
diff --git a/hadoop-ozone/integration-test/src/test/bin/start-chaos.sh b/hadoop-ozone/integration-test/src/test/bin/start-chaos.sh
index 5de6013..002fe94 100755
--- a/hadoop-ozone/integration-test/src/test/bin/start-chaos.sh
+++ b/hadoop-ozone/integration-test/src/test/bin/start-chaos.sh
@@ -22,7 +22,7 @@ current="/tmp/"
filename="${current}${date}${fileformat}"
heapdumpfile="${current}${date}${heapformat}"
-export MAVEN_OPTS="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${heapdumpfile}"
+export MAVEN_OPTS="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${heapdumpfile} -Dorg.apache.ratis.thirdparty.io.netty.allocator.useCacheForAllThreads=false"
echo "logging to ${filename}"
echo "heapdump to ${heapdumpfile}"
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
index ce29417..75911df 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
@@ -108,7 +108,7 @@ public class MiniOzoneChaosCluster extends MiniOzoneClusterImpl {
LOG.info("{} Completed restarting Datanode: {}", failString,
dn.getUuid());
} catch (Exception e) {
-
+ LOG.error("Failed to restartNodes Datanode", dn.getUuid());
}
}
}
@@ -133,7 +133,7 @@ public class MiniOzoneChaosCluster extends MiniOzoneClusterImpl {
LOG.info("Completed {} DataNode {}", stopString, dn.getUuid());
} catch (Exception e) {
-
+ LOG.error("Failed to shutdown Datanode", dn.getUuid());
}
}
}
@@ -247,6 +247,8 @@ public class MiniOzoneChaosCluster extends MiniOzoneClusterImpl {
conf.setInt(OzoneConfigKeys.OZONE_CONTAINER_CACHE_SIZE, 2);
conf.setInt("hdds.scm.replication.thread.interval", 10 * 1000);
conf.setInt("hdds.scm.replication.event.timeout", 20 * 1000);
+ conf.setInt(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 100);
+ conf.setInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP, 100);
}
@Override
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java
index 67edb15..b942447 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.ozone.chaos.TestProbability;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
@@ -28,6 +29,7 @@ import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -37,7 +39,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* A Simple Load generator for testing.
@@ -47,6 +52,8 @@ public class MiniOzoneLoadGenerator {
static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneLoadGenerator.class);
+ private static String keyNameDelimiter = "_";
+
private ThreadPoolExecutor writeExecutor;
private int numWriteThreads;
// number of buffer to be allocated, each is allocated with length which
@@ -58,7 +65,13 @@ public class MiniOzoneLoadGenerator {
private final List<OzoneBucket> ozoneBuckets;
- MiniOzoneLoadGenerator(List<OzoneBucket> bucket, int numThreads,
+ private final AtomicInteger agedFileWrittenIndex;
+ private final ExecutorService agedFileExecutor;
+ private final OzoneBucket agedLoadBucket;
+ private final TestProbability agedWriteProbability;
+
+ MiniOzoneLoadGenerator(List<OzoneBucket> bucket,
+ OzoneBucket agedLoadBucket, int numThreads,
int numBuffers) {
this.ozoneBuckets = bucket;
this.numWriteThreads = numThreads;
@@ -68,6 +81,11 @@ public class MiniOzoneLoadGenerator {
new ThreadPoolExecutor.CallerRunsPolicy());
this.writeExecutor.prestartAllCoreThreads();
+ this.agedFileWrittenIndex = new AtomicInteger(0);
+ this.agedFileExecutor = Executors.newSingleThreadExecutor();
+ this.agedLoadBucket = agedLoadBucket;
+ this.agedWriteProbability = TestProbability.valueOf(10);
+
this.isWriteThreadRunning = new AtomicBoolean(false);
// allocate buffers and populate random data.
@@ -89,51 +107,111 @@ public class MiniOzoneLoadGenerator {
while (isWriteThreadRunning.get() &&
(Time.monotonicNow() < startTime + runTimeMillis)) {
- // choose a random buffer.
- int index = RandomUtils.nextInt();
- ByteBuffer buffer = buffers.get(index % numBuffers);
- int bufferCapacity = buffer.capacity();
-
- String keyName = threadName + "-" + index;
OzoneBucket bucket =
ozoneBuckets.get((int) (Math.random() * ozoneBuckets.size()));
- try (OzoneOutputStream stream = bucket.createKey(keyName,
- bufferCapacity, ReplicationType.RATIS, ReplicationFactor.THREE,
- new HashMap<>())) {
- stream.write(buffer.array());
+ try {
+ int index = RandomUtils.nextInt();
+ String keyName = writeData(index, bucket, threadName);
+
+ readData(bucket, keyName);
+
+ deleteKey(bucket, keyName);
} catch (Exception e) {
- LOG.error("LOADGEN: Create key:{} failed with exception, skipping",
- keyName, e);
- continue;
- // TODO: HDDS-1403.A key write can fail after multiple block writes
- // to closed container. add a break here once that is fixed.
+ LOG.error("LOADGEN: Exiting due to exception", e);
+ break;
}
+ }
+ // This will terminate other threads too.
+ isWriteThreadRunning.set(false);
+ LOG.info("Terminating IO thread:{}.", threadID);
+ }
- try (OzoneInputStream stream = bucket.readKey(keyName)) {
- byte[] readBuffer = new byte[bufferCapacity];
- int readLen = stream.read(readBuffer);
- if (readLen < bufferCapacity) {
- LOG.error("LOADGEN: Read mismatch, key:{} read data length:{} is " +
- "smaller than excepted:{}", keyName, readLen, bufferCapacity);
- break;
- }
+ private String writeData(int keyIndex, OzoneBucket bucket, String threadName)
+ throws Exception {
+ // choose a random buffer.
+ ByteBuffer buffer = buffers.get(keyIndex % numBuffers);
+ int bufferCapacity = buffer.capacity();
- if (!Arrays.equals(readBuffer, buffer.array())) {
- LOG.error("LOADGEN: Read mismatch, key:{} Read data does not match " +
- "the written data", keyName);
- break;
- }
+ String keyName = threadName + keyNameDelimiter + keyIndex;
+ try (OzoneOutputStream stream = bucket.createKey(keyName,
+ bufferCapacity, ReplicationType.RATIS, ReplicationFactor.THREE,
+ new HashMap<>())) {
+ stream.write(buffer.array());
+ } catch (Throwable t) {
+ LOG.error("LOADGEN: Create key:{} failed with exception, skipping",
+ keyName, t);
+ throw t;
+ }
- } catch (Exception e) {
- LOG.error("LOADGEN: Read key:{} failed with exception", keyName, e);
- break;
+ return keyName;
+ }
+
+ private void readData(OzoneBucket bucket, String keyName) throws Exception {
+ int index = Integer.valueOf(keyName.split(keyNameDelimiter)[1]);
+
+
+ ByteBuffer buffer = buffers.get(index % numBuffers);
+ int bufferCapacity = buffer.capacity();
+
+ try (OzoneInputStream stream = bucket.readKey(keyName)) {
+ byte[] readBuffer = new byte[bufferCapacity];
+ int readLen = stream.read(readBuffer);
+
+ if (readLen < bufferCapacity) {
+ throw new IOException("Read mismatch, key:" + keyName +
+ " read data length:" + readLen +
+ " is smaller than excepted:" + bufferCapacity);
+ }
+
+ if (!Arrays.equals(readBuffer, buffer.array())) {
+ throw new IOException("Read mismatch, key:" + keyName +
+ " read data does not match the written data");
}
+ } catch (Throwable t) {
+ LOG.error("LOADGEN: Read key:{} failed with exception", keyName, t);
+ throw t;
+ }
+ }
+
+ private void deleteKey(OzoneBucket bucket, String keyName) throws Exception {
+ try {
+ bucket.deleteKey(keyName);
+ } catch (Throwable t) {
+ LOG.error("LOADGEN: Unable to delete key:{}", keyName, t);
+ throw t;
+ }
+ }
+
+ private String getKeyToRead() {
+ int currentIndex = agedFileWrittenIndex.get();
+ return currentIndex != 0 ?
+ String.valueOf(RandomUtils.nextInt(0, currentIndex)): null;
+ }
+ private void startAgedFilesLoad(long runTimeMillis) {
+ long threadID = Thread.currentThread().getId();
+ LOG.info("AGED LOADGEN: Started Aged IO Thread:{}.", threadID);
+ String threadName = Thread.currentThread().getName();
+ long startTime = Time.monotonicNow();
+
+ while (isWriteThreadRunning.get() &&
+ (Time.monotonicNow() < startTime + runTimeMillis)) {
+
+ String keyName = null;
try {
- bucket.deleteKey(keyName);
- } catch (Exception e) {
- LOG.error("LOADGEN: Unable to delete key:{}", keyName, e);
+ if (agedWriteProbability.isTrue()) {
+ keyName = writeData(agedFileWrittenIndex.incrementAndGet(),
+ agedLoadBucket, threadName);
+ } else {
+ keyName = getKeyToRead();
+ if (keyName != null) {
+ readData(agedLoadBucket, keyName);
+ }
+ }
+ } catch (Throwable t) {
+ LOG.error("AGED LOADGEN: {} Exiting due to exception", keyName, t);
+ break;
}
}
// This will terminate other threads too.
@@ -141,7 +219,7 @@ public class MiniOzoneLoadGenerator {
LOG.info("Terminating IO thread:{}.", threadID);
}
- public void startIO(long time, TimeUnit timeUnit) {
+ void startIO(long time, TimeUnit timeUnit) {
List<CompletableFuture<Void>> writeFutures = new ArrayList<>();
LOG.info("Starting MiniOzoneLoadGenerator for time {}:{} with {} buffers " +
"and {} threads", time, timeUnit, numBuffers, numWriteThreads);
@@ -153,6 +231,9 @@ public class MiniOzoneLoadGenerator {
writeExecutor));
}
+ writeFutures.add(CompletableFuture.runAsync(() ->
+ startAgedFilesLoad(timeUnit.toMillis(time)), agedFileExecutor));
+
// Wait for IO to complete
for (CompletableFuture<Void> f : writeFutures) {
try {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java
index 0cb7f81..bb66474 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java
@@ -83,8 +83,15 @@ public class TestMiniChaosOzoneCluster implements Runnable {
for (int i = 0; i < numClients; i++) {
ozoneBuckets.add(volume.getBucket(bucketName));
}
+
+ String agedBucketName =
+ RandomStringUtils.randomAlphabetic(10).toLowerCase();
+
+ volume.createBucket(agedBucketName);
+ OzoneBucket agedLoadBucket = volume.getBucket(agedBucketName);
loadGenerator =
- new MiniOzoneLoadGenerator(ozoneBuckets, numThreads, numBuffers);
+ new MiniOzoneLoadGenerator(ozoneBuckets, agedLoadBucket, numThreads,
+ numBuffers);
}
/**
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/chaos/TestProbability.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/chaos/TestProbability.java
new file mode 100644
index 0000000..41b8e56
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/chaos/TestProbability.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.chaos;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.RandomUtils;
+
+/**
+ * This class is used to find out if a certain event is true.
+ * Every event is assigned a propbability and the isTrue function returns true
+ * when the probability has been met.
+ */
+final public class TestProbability {
+ private int pct;
+
+ private TestProbability(int pct) {
+ Preconditions.checkArgument(pct <= 100 && pct > 0);
+ this.pct = pct;
+ }
+
+ public boolean isTrue() {
+ return (RandomUtils.nextInt(0, 100) <= pct);
+ }
+
+ public static TestProbability valueOf(int pct) {
+ return new TestProbability(pct);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org