You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/08/02 16:20:46 UTC

[hadoop] branch trunk updated: HDDS-1782. Add an option to MiniOzoneChaosCluster to read files multiple times. Contributed by Mukul Kumar Singh. (#1076)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b5c74d4  HDDS-1782. Add an option to MiniOzoneChaosCluster to read files multiple times. Contributed by Mukul Kumar Singh. (#1076)
b5c74d4 is described below

commit b5c74d4ab88e2437c4a4f0464d0e2ea172c68367
Author: Mukul Kumar Singh <ms...@apache.org>
AuthorDate: Fri Aug 2 21:50:39 2019 +0530

    HDDS-1782. Add an option to MiniOzoneChaosCluster to read files multiple times. Contributed by Mukul Kumar Singh. (#1076)
---
 .../integration-test/src/test/bin/start-chaos.sh   |   2 +-
 .../apache/hadoop/ozone/MiniOzoneChaosCluster.java |   6 +-
 .../hadoop/ozone/MiniOzoneLoadGenerator.java       | 153 ++++++++++++++++-----
 .../hadoop/ozone/TestMiniChaosOzoneCluster.java    |   9 +-
 .../apache/hadoop/ozone/chaos/TestProbability.java |  43 ++++++
 5 files changed, 173 insertions(+), 40 deletions(-)

diff --git a/hadoop-ozone/integration-test/src/test/bin/start-chaos.sh b/hadoop-ozone/integration-test/src/test/bin/start-chaos.sh
index 5de6013..002fe94 100755
--- a/hadoop-ozone/integration-test/src/test/bin/start-chaos.sh
+++ b/hadoop-ozone/integration-test/src/test/bin/start-chaos.sh
@@ -22,7 +22,7 @@ current="/tmp/"
 filename="${current}${date}${fileformat}"
 heapdumpfile="${current}${date}${heapformat}"
 
-export MAVEN_OPTS="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${heapdumpfile}"
+export MAVEN_OPTS="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${heapdumpfile} -Dorg.apache.ratis.thirdparty.io.netty.allocator.useCacheForAllThreads=false"
 
 echo "logging to ${filename}"
 echo "heapdump to ${heapdumpfile}"
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
index ce29417..75911df 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
@@ -108,7 +108,7 @@ public class MiniOzoneChaosCluster extends MiniOzoneClusterImpl {
         LOG.info("{} Completed restarting Datanode: {}", failString,
             dn.getUuid());
       } catch (Exception e) {
-
+        LOG.error("Failed to restartNodes Datanode", dn.getUuid());
       }
     }
   }
@@ -133,7 +133,7 @@ public class MiniOzoneChaosCluster extends MiniOzoneClusterImpl {
         LOG.info("Completed {} DataNode {}", stopString, dn.getUuid());
 
       } catch (Exception e) {
-
+        LOG.error("Failed to shutdown Datanode", dn.getUuid());
       }
     }
   }
@@ -247,6 +247,8 @@ public class MiniOzoneChaosCluster extends MiniOzoneClusterImpl {
       conf.setInt(OzoneConfigKeys.OZONE_CONTAINER_CACHE_SIZE, 2);
       conf.setInt("hdds.scm.replication.thread.interval", 10 * 1000);
       conf.setInt("hdds.scm.replication.event.timeout", 20 * 1000);
+      conf.setInt(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 100);
+      conf.setInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP, 100);
     }
 
     @Override
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java
index 67edb15..b942447 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.ozone.chaos.TestProbability;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
@@ -28,6 +29,7 @@ import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -37,7 +39,10 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * A Simple Load generator for testing.
@@ -47,6 +52,8 @@ public class MiniOzoneLoadGenerator {
   static final Logger LOG =
       LoggerFactory.getLogger(MiniOzoneLoadGenerator.class);
 
+  private static String keyNameDelimiter = "_";
+
   private ThreadPoolExecutor writeExecutor;
   private int numWriteThreads;
   // number of buffer to be allocated, each is allocated with length which
@@ -58,7 +65,13 @@ public class MiniOzoneLoadGenerator {
 
   private final List<OzoneBucket> ozoneBuckets;
 
-  MiniOzoneLoadGenerator(List<OzoneBucket> bucket, int numThreads,
+  private final AtomicInteger agedFileWrittenIndex;
+  private final ExecutorService agedFileExecutor;
+  private final OzoneBucket agedLoadBucket;
+  private final TestProbability agedWriteProbability;
+
+  MiniOzoneLoadGenerator(List<OzoneBucket> bucket,
+                         OzoneBucket agedLoadBucket, int numThreads,
       int numBuffers) {
     this.ozoneBuckets = bucket;
     this.numWriteThreads = numThreads;
@@ -68,6 +81,11 @@ public class MiniOzoneLoadGenerator {
         new ThreadPoolExecutor.CallerRunsPolicy());
     this.writeExecutor.prestartAllCoreThreads();
 
+    this.agedFileWrittenIndex = new AtomicInteger(0);
+    this.agedFileExecutor = Executors.newSingleThreadExecutor();
+    this.agedLoadBucket = agedLoadBucket;
+    this.agedWriteProbability = TestProbability.valueOf(10);
+
     this.isWriteThreadRunning = new AtomicBoolean(false);
 
     // allocate buffers and populate random data.
@@ -89,51 +107,111 @@ public class MiniOzoneLoadGenerator {
 
     while (isWriteThreadRunning.get() &&
         (Time.monotonicNow() < startTime + runTimeMillis)) {
-      // choose a random buffer.
-      int index = RandomUtils.nextInt();
-      ByteBuffer buffer = buffers.get(index % numBuffers);
-      int bufferCapacity = buffer.capacity();
-
-      String keyName = threadName + "-" + index;
       OzoneBucket bucket =
           ozoneBuckets.get((int) (Math.random() * ozoneBuckets.size()));
-      try (OzoneOutputStream stream = bucket.createKey(keyName,
-          bufferCapacity, ReplicationType.RATIS, ReplicationFactor.THREE,
-          new HashMap<>())) {
-        stream.write(buffer.array());
+      try {
+        int index = RandomUtils.nextInt();
+        String keyName = writeData(index, bucket, threadName);
+
+        readData(bucket, keyName);
+
+        deleteKey(bucket, keyName);
       } catch (Exception e) {
-        LOG.error("LOADGEN: Create key:{} failed with exception, skipping",
-            keyName, e);
-        continue;
-        // TODO: HDDS-1403.A key write can fail after multiple block writes
-        //  to closed container. add a break here once that is fixed.
+        LOG.error("LOADGEN: Exiting due to exception", e);
+        break;
       }
+    }
+    // This will terminate other threads too.
+    isWriteThreadRunning.set(false);
+    LOG.info("Terminating IO thread:{}.", threadID);
+  }
 
-      try (OzoneInputStream stream = bucket.readKey(keyName)) {
-        byte[] readBuffer = new byte[bufferCapacity];
-        int readLen = stream.read(readBuffer);
 
-        if (readLen < bufferCapacity) {
-          LOG.error("LOADGEN: Read mismatch, key:{} read data length:{} is " +
-              "smaller than excepted:{}", keyName, readLen, bufferCapacity);
-          break;
-        }
+  private String writeData(int keyIndex, OzoneBucket bucket, String threadName)
+      throws Exception {
+    // choose a random buffer.
+    ByteBuffer buffer = buffers.get(keyIndex % numBuffers);
+    int bufferCapacity = buffer.capacity();
 
-        if (!Arrays.equals(readBuffer, buffer.array())) {
-          LOG.error("LOADGEN: Read mismatch, key:{} Read data does not match " +
-              "the written data", keyName);
-          break;
-        }
+    String keyName = threadName + keyNameDelimiter + keyIndex;
+    try (OzoneOutputStream stream = bucket.createKey(keyName,
+        bufferCapacity, ReplicationType.RATIS, ReplicationFactor.THREE,
+        new HashMap<>())) {
+      stream.write(buffer.array());
+    } catch (Throwable t) {
+      LOG.error("LOADGEN: Create key:{} failed with exception, skipping",
+          keyName, t);
+      throw t;
+    }
 
-      } catch (Exception e) {
-        LOG.error("LOADGEN: Read key:{} failed with exception", keyName, e);
-        break;
+    return keyName;
+  }
+
+  private void readData(OzoneBucket bucket, String keyName) throws Exception {
+    int index = Integer.valueOf(keyName.split(keyNameDelimiter)[1]);
+
+
+    ByteBuffer buffer = buffers.get(index % numBuffers);
+    int bufferCapacity = buffer.capacity();
+
+    try (OzoneInputStream stream = bucket.readKey(keyName)) {
+      byte[] readBuffer = new byte[bufferCapacity];
+      int readLen = stream.read(readBuffer);
+
+      if (readLen < bufferCapacity) {
+        throw new IOException("Read mismatch, key:" + keyName +
+            " read data length:" + readLen +
+            " is smaller than excepted:" + bufferCapacity);
+      }
+
+      if (!Arrays.equals(readBuffer, buffer.array())) {
+        throw new IOException("Read mismatch, key:" + keyName +
+            " read data does not match the written data");
       }
+    } catch (Throwable t) {
+      LOG.error("LOADGEN: Read key:{} failed with exception", keyName, t);
+      throw t;
+    }
+  }
+
+  private void deleteKey(OzoneBucket bucket, String keyName) throws Exception {
+    try {
+      bucket.deleteKey(keyName);
+    } catch (Throwable t) {
+      LOG.error("LOADGEN: Unable to delete key:{}", keyName, t);
+      throw t;
+    }
+  }
+
+  private String getKeyToRead() {
+    int currentIndex = agedFileWrittenIndex.get();
+    return currentIndex != 0 ?
+        String.valueOf(RandomUtils.nextInt(0, currentIndex)): null;
+  }
 
+  private void startAgedFilesLoad(long runTimeMillis) {
+    long threadID = Thread.currentThread().getId();
+    LOG.info("AGED LOADGEN: Started Aged IO Thread:{}.", threadID);
+    String threadName = Thread.currentThread().getName();
+    long startTime = Time.monotonicNow();
+
+    while (isWriteThreadRunning.get() &&
+        (Time.monotonicNow() < startTime + runTimeMillis)) {
+
+      String keyName = null;
       try {
-        bucket.deleteKey(keyName);
-      } catch (Exception e) {
-        LOG.error("LOADGEN: Unable to delete key:{}", keyName, e);
+        if (agedWriteProbability.isTrue()) {
+          keyName = writeData(agedFileWrittenIndex.incrementAndGet(),
+              agedLoadBucket, threadName);
+        } else {
+          keyName = getKeyToRead();
+          if (keyName != null) {
+            readData(agedLoadBucket, keyName);
+          }
+        }
+      } catch (Throwable t) {
+        LOG.error("AGED LOADGEN: {} Exiting due to exception", keyName, t);
+        break;
       }
     }
     // This will terminate other threads too.
@@ -141,7 +219,7 @@ public class MiniOzoneLoadGenerator {
     LOG.info("Terminating IO thread:{}.", threadID);
   }
 
-  public void startIO(long time, TimeUnit timeUnit) {
+  void startIO(long time, TimeUnit timeUnit) {
     List<CompletableFuture<Void>> writeFutures = new ArrayList<>();
     LOG.info("Starting MiniOzoneLoadGenerator for time {}:{} with {} buffers " +
             "and {} threads", time, timeUnit, numBuffers, numWriteThreads);
@@ -153,6 +231,9 @@ public class MiniOzoneLoadGenerator {
                 writeExecutor));
       }
 
+      writeFutures.add(CompletableFuture.runAsync(() ->
+              startAgedFilesLoad(timeUnit.toMillis(time)), agedFileExecutor));
+
       // Wait for IO to complete
       for (CompletableFuture<Void> f : writeFutures) {
         try {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java
index 0cb7f81..bb66474 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java
@@ -83,8 +83,15 @@ public class TestMiniChaosOzoneCluster implements Runnable {
     for (int i = 0; i < numClients; i++) {
       ozoneBuckets.add(volume.getBucket(bucketName));
     }
+
+    String agedBucketName =
+        RandomStringUtils.randomAlphabetic(10).toLowerCase();
+
+    volume.createBucket(agedBucketName);
+    OzoneBucket agedLoadBucket = volume.getBucket(agedBucketName);
     loadGenerator =
-        new MiniOzoneLoadGenerator(ozoneBuckets, numThreads, numBuffers);
+        new MiniOzoneLoadGenerator(ozoneBuckets, agedLoadBucket, numThreads,
+            numBuffers);
   }
 
   /**
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/chaos/TestProbability.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/chaos/TestProbability.java
new file mode 100644
index 0000000..41b8e56
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/chaos/TestProbability.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.chaos;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.RandomUtils;
+
+/**
+ * This class is used to find out if a certain event is true.
+ * Every event is assigned a propbability and the isTrue function returns true
+ * when the probability has been met.
+ */
+final public class TestProbability {
+  private int pct;
+
+  private TestProbability(int pct) {
+    Preconditions.checkArgument(pct <= 100 && pct > 0);
+    this.pct = pct;
+  }
+
+  public boolean isTrue() {
+    return (RandomUtils.nextInt(0, 100) <= pct);
+  }
+
+  public static TestProbability valueOf(int pct) {
+    return new TestProbability(pct);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org