You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2020/03/18 16:16:27 UTC

[GitHub] [hadoop-ozone] xiaoyuyao commented on a change in pull request #438: HDDS-2878. Refactor MiniOzoneLoadGenerator to add more load generators to chaos testing.

xiaoyuyao commented on a change in pull request #438: HDDS-2878. Refactor MiniOzoneLoadGenerator to add more load generators to chaos testing.
URL: https://github.com/apache/hadoop-ozone/pull/438#discussion_r394471359
 
 

 ##########
 File path: hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java
 ##########
 @@ -47,212 +42,61 @@
       LoggerFactory.getLogger(MiniOzoneLoadGenerator.class);
 
   private static String keyNameDelimiter = "_";
-
-  private ThreadPoolExecutor writeExecutor;
-  private int numThreads;
-  // number of buffer to be allocated, each is allocated with length which
-  // is multiple of 2, each buffer is populated with random data.
-  private int numBuffers;
-  private List<ByteBuffer> buffers;
-
-  private AtomicBoolean isIOThreadRunning;
-
-  private final List<LoadBucket> ozoneBuckets;
-
-  private final AtomicInteger agedFileWrittenIndex;
-  private final ExecutorService agedFileExecutor;
-  private final LoadBucket agedLoadBucket;
-  private final TestProbability agedWriteProbability;
-
-  private final ThreadPoolExecutor fsExecutor;
-  private final LoadBucket fsBucket;
-
-  MiniOzoneLoadGenerator(List<LoadBucket> bucket,
-                         LoadBucket agedLoadBucket, LoadBucket fsBucket,
-                         int numThreads, int numBuffers) {
-    this.ozoneBuckets = bucket;
-    this.numThreads = numThreads;
-    this.numBuffers = numBuffers;
-    this.writeExecutor = createExecutor();
-
-    this.agedFileWrittenIndex = new AtomicInteger(0);
-    this.agedFileExecutor = Executors.newSingleThreadExecutor();
-    this.agedLoadBucket = agedLoadBucket;
-    this.agedWriteProbability = TestProbability.valueOf(10);
-
-    this.fsExecutor = createExecutor();
-    this.fsBucket = fsBucket;
-
-    this.isIOThreadRunning = new AtomicBoolean(false);
-
-    // allocate buffers and populate random data.
-    buffers = new ArrayList<>();
-    for (int i = 0; i < numBuffers; i++) {
-      int size = (int) StorageUnit.KB.toBytes(1 << i);
-      ByteBuffer buffer = ByteBuffer.allocate(size);
-      buffer.put(RandomUtils.nextBytes(size));
-      buffers.add(buffer);
-    }
-  }
-
-  private ThreadPoolExecutor createExecutor() {
-    ThreadPoolExecutor executor = new ThreadPoolExecutor(numThreads, numThreads,
-        100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
-        new ThreadPoolExecutor.CallerRunsPolicy());
-    executor.prestartAllCoreThreads();
-    return executor;
-
-  }
-
-  // Start IO load on an Ozone bucket.
-  private void load(long runTimeMillis) {
-    long threadID = Thread.currentThread().getId();
-    LOG.info("Started Mixed IO Thread:{}.", threadID);
-    String threadName = Thread.currentThread().getName();
-    long startTime = Time.monotonicNow();
-
-    while (isIOThreadRunning.get() &&
-        (Time.monotonicNow() < startTime + runTimeMillis)) {
-      LoadBucket bucket =
-          ozoneBuckets.get((int) (Math.random() * ozoneBuckets.size()));
-      try {
-        int index = RandomUtils.nextInt();
-        ByteBuffer buffer = getBuffer(index);
-        String keyName = getKeyName(index, threadName);
-        bucket.writeKey(buffer, keyName);
-
-        bucket.readKey(buffer, keyName);
-
-        bucket.deleteKey(keyName);
-      } catch (Exception e) {
-        LOG.error("LOADGEN: Exiting due to exception", e);
-        break;
-      }
-    }
-    // This will terminate other threads too.
-    isIOThreadRunning.set(false);
-    LOG.info("Terminating IO thread:{}.", threadID);
-  }
-
-  private Optional<Integer> randomKeyToRead() {
-    int currentIndex = agedFileWrittenIndex.get();
-    return currentIndex != 0
-      ? Optional.of(RandomUtils.nextInt(0, currentIndex))
-      : Optional.empty();
-  }
-
-  private void startAgedLoad(long runTimeMillis) {
-    long threadID = Thread.currentThread().getId();
-    LOG.info("AGED LOADGEN: Started Aged IO Thread:{}.", threadID);
-    String threadName = Thread.currentThread().getName();
-    long startTime = Time.monotonicNow();
-
-    while (isIOThreadRunning.get() &&
-        (Time.monotonicNow() < startTime + runTimeMillis)) {
-
-      String keyName = null;
-      try {
-        if (agedWriteProbability.isTrue()) {
-          int index = agedFileWrittenIndex.getAndIncrement();
-          ByteBuffer buffer = getBuffer(index);
-          keyName = getKeyName(index, threadName);
-
-          agedLoadBucket.writeKey(buffer, keyName);
-        } else {
-          Optional<Integer> index = randomKeyToRead();
-          if (index.isPresent()) {
-            ByteBuffer buffer = getBuffer(index.get());
-            keyName = getKeyName(index.get(), threadName);
-            agedLoadBucket.readKey(buffer, keyName);
-          }
-        }
-      } catch (Throwable t) {
-        LOG.error("AGED LOADGEN: {} Exiting due to exception", keyName, t);
-        break;
-      }
-    }
-    // This will terminate other threads too.
-    isIOThreadRunning.set(false);
-    LOG.info("Terminating IO thread:{}.", threadID);
-  }
-
-  // Start IO load on an Ozone bucket.
-  private void startFsLoad(long runTimeMillis) {
-    long threadID = Thread.currentThread().getId();
-    LOG.info("Started Filesystem IO Thread:{}.", threadID);
-    String threadName = Thread.currentThread().getName();
-    long startTime = Time.monotonicNow();
-
-    while (isIOThreadRunning.get() &&
-      (Time.monotonicNow() < startTime + runTimeMillis)) {
-      try {
-        int index = RandomUtils.nextInt();
-        ByteBuffer buffer = getBuffer(index);
-        String keyName = getKeyName(index, threadName);
-        fsBucket.writeKey(true, buffer, keyName);
-
-        fsBucket.readKey(true, buffer, keyName);
-
-        fsBucket.deleteKey(true, keyName);
-      } catch (Exception e) {
-        LOG.error("LOADGEN: Exiting due to exception", e);
-        break;
-      }
+  private final List<LoadExecutors> loadExecutors;
+
+  MiniOzoneLoadGenerator(OzoneVolume volume, int numClients, int numThreads,
+                         int numBuffers, OzoneConfiguration conf)
+      throws Exception {
+    DataBuffer buffer = new DataBuffer(numBuffers);
+    loadExecutors = new ArrayList<>();
+
+    // Random Load
+    String mixBucketName = RandomStringUtils.randomAlphabetic(10).toLowerCase();
+    volume.createBucket(mixBucketName);
+    List<LoadBucket> ozoneBuckets = new ArrayList<>(numClients);
+    for (int i = 0; i < numClients; i++) {
+      ozoneBuckets.add(new LoadBucket(volume.getBucket(mixBucketName),
+          conf));
     }
-    // This will terminate other threads too.
-    isIOThreadRunning.set(false);
-    LOG.info("Terminating IO thread:{}.", threadID);
+    RandomLoadGenerator loadGenerator =
+        new RandomLoadGenerator(buffer, ozoneBuckets);
+    loadExecutors.add(new LoadExecutors(numThreads, loadGenerator));
+
+    // Aged Load
 
 Review comment:
   NIT: line 65-84 can be refactored to dedup the same create loadgenerator and added to load executors. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-issues-help@hadoop.apache.org