You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2023/04/17 13:01:50 UTC

[ozone] branch master updated: HDDS-7908. Support OM Metadata operation Generator in `Ozone freon` (#4251)

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new cffa3864b9 HDDS-7908. Support OM Metadata operation Generator in `Ozone freon` (#4251)
cffa3864b9 is described below

commit cffa3864b974499078fbdb37506ed992319b09c5
Author: XiChen <32...@users.noreply.github.com>
AuthorDate: Mon Apr 17 21:01:41 2023 +0800

    HDDS-7908. Support OM Metadata operation Generator in `Ozone freon` (#4251)
---
 .../hadoop/ozone/freon/BaseFreonGenerator.java     | 118 +++++-
 .../java/org/apache/hadoop/ozone/freon/Freon.java  |   3 +-
 .../hadoop/ozone/freon/OmMetadataGenerator.java    | 452 +++++++++++++++++++++
 .../org/apache/hadoop/ozone/freon/ProgressBar.java |  25 +-
 .../apache/hadoop/ozone/freon/TestProgressBar.java |   2 +-
 5 files changed, 580 insertions(+), 20 deletions(-)

diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java
index 90ee15e61f..35e753b41d 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java
@@ -18,18 +18,24 @@ package org.apache.hadoop.ozone.freon;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.TimeDurationUtil;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.utils.HAUtils;
@@ -60,6 +66,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
 import org.apache.ratis.protocol.ClientId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import picocli.CommandLine;
 import picocli.CommandLine.Option;
 import picocli.CommandLine.ParentCommand;
 
@@ -92,6 +99,11 @@ public class BaseFreonGenerator {
       defaultValue = "10")
   private int threadNo;
 
+  @Option(names = {"--duration"},
+      description = "Duration to run the test. "
+          + " Can be '30s', '5m', '1h', '7d' etc..")
+  private String duration;
+
   @Option(names = {"-f", "--fail-at-end"},
       description = "If turned on, all the tasks will be executed even if "
           + "there are failures.")
@@ -104,6 +116,14 @@ public class BaseFreonGenerator {
       defaultValue = "")
   private String prefix = "";
 
+  @Option(names = {"--verbose"},
+          description = "More verbose output. "
+              + "Show all the command line Option info.")
+  private boolean verbose;
+
+  @CommandLine.Spec
+  private CommandLine.Model.CommandSpec spec;
+
   private MetricRegistry metrics = new MetricRegistry();
 
   private AtomicLong successCounter;
@@ -111,12 +131,19 @@ public class BaseFreonGenerator {
   private AtomicLong attemptCounter;
 
   private long startTime;
+  private long durationInSecond;
+  private boolean timebase;
 
   private PathSchema pathSchema;
   private String spanName;
   private ExecutorService executor;
   private ProgressBar progressBar;
 
+  private final ThreadLocal<Long> threadSequenceId = new ThreadLocal<>();
+  private final AtomicLong id = new AtomicLong(0);
+
+  private final AtomicBoolean completed = new AtomicBoolean(false);
+
   /**
    * The main logic to execute a test generator.
    *
@@ -153,15 +180,24 @@ public class BaseFreonGenerator {
    * concurrently in {@code executor}.
    */
   private void taskLoop(TaskProvider provider) {
-    while (true) {
+    threadSequenceId.set(id.getAndIncrement());
+    while (!completed.get()) {
       long counter = attemptCounter.getAndIncrement();
-
-      //in case of an other failed test, we shouldn't execute more tasks.
-      if (counter >= testNo || (!failAtEnd && failureCounter.get() > 0)) {
-        break;
+      if (timebase) {
+        if (System.currentTimeMillis()
+            > startTime + TimeUnit.SECONDS.toMillis(durationInSecond)) {
+          completed.set(true);
+          break;
+        }
+      } else {
+        //in case of an other failed test, we shouldn't execute more tasks.
+        if (counter >= testNo || (!failAtEnd && failureCounter.get() > 0)) {
+          completed.set(true);
+          break;
+        }
       }
 
-      tryNextTask(provider, counter);
+      tryNextTask(provider, counter % testNo);
     }
 
     taskLoopCompleted();
@@ -198,8 +234,7 @@ public class BaseFreonGenerator {
    * thread.
    */
   private void waitForCompletion() {
-    while (successCounter.get() + failureCounter.get() < testNo && (
-        failureCounter.get() == 0 || failAtEnd)) {
+    while (!completed.get() && (failureCounter.get() == 0 || failAtEnd)) {
       try {
         Thread.sleep(CHECK_INTERVAL_MILLIS);
       } catch (InterruptedException e) {
@@ -215,6 +250,7 @@ public class BaseFreonGenerator {
     } else {
       progressBar.shutdown();
     }
+    threadSequenceId.remove();
     executor.shutdown();
     try {
       executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
@@ -250,6 +286,22 @@ public class BaseFreonGenerator {
       //replace environment variables to support multi-node execution
       prefix = resolvePrefix(prefix);
     }
+    if (duration != null) {
+      durationInSecond = TimeDurationUtil.getTimeDurationHelper(
+          "--runtime", duration, TimeUnit.SECONDS);
+      if (durationInSecond <= 0) {
+        throw new IllegalArgumentException(
+            "Incomplete command, "
+                + "the runtime must be given, and must not be negative");
+      }
+      timebase = true;
+    }
+
+    if (testNo <= 0) {
+      throw new IllegalArgumentException(
+              "Invalid command, "
+                      + "the testNo must be a positive integer");
+    }
     LOG.info("Executing test with prefix {} " +
         "and number-of-tests {}", prefix, testNo);
 
@@ -263,17 +315,33 @@ public class BaseFreonGenerator {
             LOG.error("HTTP server can't be stopped.", ex);
           }
           printReport();
+          if (verbose) {
+            printOption();
+          }
         }, 10);
 
     executor = Executors.newFixedThreadPool(threadNo);
-
-    progressBar = new ProgressBar(System.out, testNo, successCounter::get,
-        freonCommand.isInteractive());
+    long maxValue;
+    LongSupplier supplier;
+    if (duration != null) {
+      maxValue = durationInSecond;
+      supplier = () -> Duration.between(
+          Instant.ofEpochMilli(startTime), Instant.now()).getSeconds();
+    } else {
+      maxValue = testNo;
+      supplier = successCounter::get;
+    }
+    progressBar = new ProgressBar(System.out, maxValue, supplier,
+        freonCommand.isInteractive(), realTimeStatusSupplier());
     progressBar.start();
 
     startTime = System.currentTimeMillis();
   }
 
+  public Supplier<String> realTimeStatusSupplier() {
+    return () -> "";
+  }
+
   /**
    * Resolve environment variables in the prefixes.
    */
@@ -314,6 +382,23 @@ public class BaseFreonGenerator {
     messages.forEach(print);
   }
 
+  /**
+   * Print Option info about the executed tests.
+   */
+  public void printOption() {
+    List<String> messages = new LinkedList<>();
+    messages.add("\nOption:");
+    for (CommandLine.Model.OptionSpec option : spec.options()) {
+      String name = option.longestName();
+      messages.add(name + "=" + option.getValue());
+    }
+
+    Consumer<String> print = freonCommand.isInteractive()
+            ? System.out::println
+            : LOG::info;
+    messages.forEach(print);
+  }
+
   /**
    * Print out reports with the given message.
    */
@@ -484,6 +569,17 @@ public class BaseFreonGenerator {
     this.threadNo = threadNo;
   }
 
+  /**
+   * Get current Thread sequence ID.
+   * Similar to Thread tid, but sequence ID starts at 0 and is consecutive
+   * each thread will be set a self-incrementing sequence id
+   * after it be created.
+   * @return Current Thread sequence ID
+   */
+  public long getThreadSequenceId() {
+    return threadSequenceId.get();
+  }
+
   protected OzoneClient createOzoneClient(String omServiceID,
       OzoneConfiguration conf) throws Exception {
     if (omServiceID != null) {
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
index fcb9a5b27e..d346866352 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
@@ -70,7 +70,8 @@ import picocli.CommandLine.Option;
         OmRPCLoadGenerator.class,
         OzoneClientKeyReadWriteOps.class,
         RangeKeysGenerator.class,
-        DatanodeSimulator.class
+        DatanodeSimulator.class,
+        OmMetadataGenerator.class
     },
     versionProvider = HddsVersionProvider.class,
     mixinStandardHelpOptions = true)
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java
new file mode 100644
index 0000000000..6ce460563f
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.NoSuchFileException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricFilter;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs.Builder;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.security.UserGroupInformation;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
+
+/**
+ * Data generator tool test om performance.
+ */
+@Command(name = "ommg",
+    aliases = "om-metadata-generator",
+    description = "Create metadata operation to the OM.",
+    versionProvider = HddsVersionProvider.class,
+    mixinStandardHelpOptions = true,
+    showDefaultValues = true)
+public class OmMetadataGenerator extends BaseFreonGenerator
+    implements Callable<Void> {
+
+  enum Operation {
+    CREATE_FILE,
+    LOOKUP_FILE,
+    READ_FILE,
+    LIST_STATUS,
+    CREATE_KEY,
+    LOOKUP_KEY,
+    GET_KEYINFO,
+    HEAD_KEY,
+    READ_KEY,
+    LIST_KEYS,
+    INFO_BUCKET,
+    INFO_VOLUME,
+    MIXED,
+  }
+
+  @Option(names = {"-v", "--volume"},
+      description = "Name of the volume which contains the test data. Will be"
+          + " created if missing.",
+      defaultValue = "vol1")
+  private String volumeName;
+
+  @Option(names = {"-b", "--bucket"},
+      description = "Name of the bucket which contains the test data. Will be"
+          + " created if missing.",
+      defaultValue = "bucket1")
+  private String bucketName;
+
+  @Option(names = {"-s", "--size"},
+      description = "The size in byte of a file for the Create File/Key op.",
+      defaultValue = "0")
+  private int dataSize;
+
+  @Option(names = {"--buffer"},
+      description = "Size of buffer used to generated the key content.",
+      defaultValue = "4096")
+  private int bufferSize;
+
+  @Option(
+      names = "--batch-size",
+      description = "The number of key/file requests per LIST_KEY/LIST_STATUS"
+          + " request.",
+      defaultValue = "1000")
+  private int batchSize;
+
+  @Option(
+      names = "--random",
+      description = "random read/write if given. This means that it"
+          + " is possible to read/write the same file at the same time",
+      defaultValue = "false")
+  private boolean randomOp;
+
+  @Option(names = {"-o", "--operation"},
+      description = "The operation to perform, --ophelp Print detail")
+  private Operation operation;
+
+  @Option(names = {"--ops"},
+      description = "The operations list to perform, --ophelp Print detail")
+  private String operationsList;
+
+  @Option(names = {"--opsnum"},
+      description = "The number of threads per operations, the values sum"
+          + " must equal the number of threads, --ophelp Print detail")
+  private String operationsNum;
+
+  @Option(names = {"--ophelp"},
+      description = "Print operation help, or list available operation")
+  private boolean opHelp;
+
+  @Option(
+      names = "--om-service-id",
+      description = "OM Service ID"
+  )
+  private String omServiceID;
+
+  private OzoneManagerProtocol ozoneManagerClient;
+
+  private ThreadLocal<OmKeyArgs.Builder> omKeyArgsBuilder;
+
+  private OzoneBucket bucket;
+
+  private ContentGenerator contentGenerator;
+  private final byte[] readBuffer = new byte[4096];
+  private ReplicationConfig replicationConfig;
+  private Operation[] operations;
+  private boolean mixedOperation = false;
+
+  @Override
+  public Void call() throws Exception {
+    if (opHelp || operation == null) {
+      System.out.println(getUsage());
+      return null;
+    }
+    if (operation.equals(Operation.MIXED)) {
+      initMixedOperation();
+      mixedOperation = true;
+    }
+    init();
+    contentGenerator = new ContentGenerator(dataSize, bufferSize);
+    omKeyArgsBuilder = ThreadLocal.withInitial(this::createKeyArgsBuilder);
+    OzoneConfiguration conf = createOzoneConfiguration();
+    replicationConfig = ReplicationConfig.getDefault(conf);
+
+    try (OzoneClient rpcClient = createOzoneClient(omServiceID, conf)) {
+      ensureVolumeAndBucketExist(rpcClient, volumeName, bucketName);
+      ozoneManagerClient = createOmClient(conf, omServiceID);
+      bucket = rpcClient.getObjectStore().getVolume(volumeName)
+          .getBucket(bucketName);
+      runTests(this::applyOperation);
+    } finally {
+      if (ozoneManagerClient != null) {
+        ozoneManagerClient.close();
+        omKeyArgsBuilder.remove();
+      }
+    }
+    return null;
+  }
+
+  private void initMixedOperation() {
+    if (operationsList == null || operationsNum == null) {
+      throw new IllegalArgumentException(
+          "--ops and --opsnum must be given, if --operation is MIXED");
+    }
+    List<Operation> ops =
+        Arrays.stream(operationsList.split(",")).map(Operation::valueOf)
+            .collect(Collectors.toList());
+    List<Integer> opsNum =
+        Arrays.stream(operationsNum.split(",")).map(Integer::valueOf)
+            .collect(Collectors.toList());
+    if (ops.size() != opsNum.size()
+        || opsNum.stream().mapToInt(x -> x).sum() != getThreadNo()) {
+      throw new IllegalArgumentException(
+          "the --opsnum values sum must equal the number of threads");
+    }
+
+    int index = 0;
+    // if --ops is A,B,C --opsnum is 3,2,1
+    // so the operations will be [A, A, A, B, B, C]
+    // so the thread with seq id [0, 2] will execute A,
+    // the thread with seq id [3, 4] will execute A,
+    // the thread with seq id [5, 5] will execute C
+    operations = new Operation[getThreadNo()];
+    for (int i = 0; i < ops.size(); i++) {
+      Operation op = ops.get(i);
+      int num = opsNum.get(i);
+      for (int j = 0; j < num; j++) {
+        operations[index] = op;
+        index++;
+      }
+    }
+  }
+
+  public static String getUsage() {
+    return String.join("\n", ImmutableList.of(
+        "A tool to measure the Ozone om performance",
+        "support Operation: ",
+        "  " + EnumSet.allOf(Operation.class).stream().map(Enum::toString)
+            .collect(Collectors.joining(", ")),
+        "\nExample: ",
+        "# create 25000 keys, run time 180s",
+        "$ bin/ozone freon ommg --operation CREATE_KEY -n 25000"
+            + " --duration  180s\n",
+        "# read 25000 keys, run time 180s",
+        "$ bin/ozone freon ommg --operation READ_KEY -n 25000"
+            + " --duration 180s\n",
+        "# 20 threads, list 1000 keys each request, and run time 180s",
+        "$ bin/ozone freon ommg --operation LIST_KEYS -t 20 --batch-size 1000"
+            + " --duration 180s\n",
+        "# 10 threads, 1 threads list keys, 5 threads create file,"
+            + " 4 threads lookup file and run time 180s",
+        "$ bin/ozone freon ommg"
+            + " --operation MIXED --ops CREATE_FILE,LOOKUP_FILE,LIST_STATUS"
+            + " --opsnum 5,4,1 -t 10 -n 1000 --duration 180s\n",
+        "Note that: You must create a sufficient number of "
+            + "objects before executing read-related tests\n"
+    ));
+  }
+
+  private OmKeyArgs.Builder createKeyArgsBuilder() {
+    UserGroupInformation ugi = null;
+    try {
+      ugi = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return new Builder()
+        .setBucketName(bucketName)
+        .setVolumeName(volumeName)
+        .setReplicationConfig(replicationConfig)
+        .setLocationInfoList(new ArrayList<>())
+        .setAcls(OzoneAclUtil.getAclList(ugi.getUserName(), ugi.getGroupNames(),
+            ALL, ALL));
+  }
+
+  private String getPath(long counter) {
+    // Ensure that the dictionary order of path String
+    // is the same as the order of numeric.
+    // This is useful for LIST_KEYS/LIST_STATUS.
+    // The file "0..0001" + 1000 will be "0..1001"
+    //
+    // The size is 19, because the decimal long type can have up to 19 digits
+    return StringUtils.leftPad(String.valueOf(counter), 19, '0');
+  }
+
+  @Override
+  public Supplier<String> realTimeStatusSupplier() {
+    final Map<String, Long> maxValueRecorder = new HashMap<>();
+    final Map<String, Long> valueRecorder = new HashMap<>();
+    final Map<String, Instant> instantsRecorder = new HashMap<>();
+    return () -> {
+      StringBuilder sb = new StringBuilder();
+      for (Map.Entry<String, Timer> entry
+          : getMetrics().getTimers(MetricFilter.ALL).entrySet()) {
+        String name = entry.getKey();
+        long maxValue = maxValueRecorder.getOrDefault(name, -1L);
+        long preValue = valueRecorder.getOrDefault(name, 0L);
+        Instant preInstant = instantsRecorder.getOrDefault(name, Instant.now());
+
+        long curValue = entry.getValue().getCount();
+        Instant now =  Instant.now();
+        long duration = Duration.between(preInstant, now).getSeconds();
+        long rate = ((curValue - preValue) / (duration == 0 ? 1 : duration));
+        maxValue = Math.max(rate, maxValue);
+
+        maxValueRecorder.put(name, maxValue);
+        valueRecorder.put(name, curValue);
+        instantsRecorder.put(name, now);
+        sb.append(" ")
+            .append(name)
+            .append(": rate ")
+            .append(rate)
+            .append(" max ")
+            .append(maxValue);
+      }
+      sb.append("  ");
+      return sb.toString();
+    };
+  }
+
+  @SuppressWarnings("checkstyle:EmptyBlock")
+  private void applyOperation(long counter) throws Exception {
+    OmKeyArgs keyArgs;
+    String keyName;
+    long threadSeqId;
+    String startKeyName;
+    if (mixedOperation) {
+      threadSeqId = getThreadSequenceId();
+      operation = operations[(int)threadSeqId];
+    }
+    if (randomOp) {
+      counter = ThreadLocalRandom.current().nextLong(getTestNo());
+    }
+    switch (operation) {
+    case CREATE_KEY:
+      keyName = getPath(counter);
+      getMetrics().timer(operation.name()).time(() -> {
+        try (OutputStream stream = bucket.createKey(keyName, dataSize)) {
+          contentGenerator.write(stream);
+        }
+        return null;
+      });
+      break;
+    case LOOKUP_KEY:
+      keyName = getPath(counter);
+      keyArgs = omKeyArgsBuilder.get().setKeyName(keyName).build();
+      getMetrics().timer(operation.name()).time(() -> {
+        ozoneManagerClient.lookupKey(keyArgs);
+        return null;
+      });
+      break;
+    case GET_KEYINFO:
+      keyName = getPath(counter);
+      keyArgs = omKeyArgsBuilder.get().setKeyName(keyName).build();
+      getMetrics().timer(operation.name()).time(() -> {
+        ozoneManagerClient.getKeyInfo(keyArgs, false);
+        return null;
+      });
+      break;
+    case HEAD_KEY:
+      keyName = getPath(counter);
+      keyArgs = omKeyArgsBuilder.get()
+          .setKeyName(keyName).setHeadOp(true).build();
+      getMetrics().timer(operation.name()).time(() -> {
+        ozoneManagerClient.getKeyInfo(keyArgs, false);
+        return null;
+      });
+      break;
+    case READ_KEY:
+      keyName = getPath(counter);
+      getMetrics().timer(operation.name()).time(() -> {
+        try (OzoneInputStream stream = bucket.readKey(keyName)) {
+          while ((stream.read(readBuffer)) >= 0) {
+          }
+        }
+        return null;
+      });
+      break;
+    case READ_FILE:
+      keyName = getPath(counter);
+      getMetrics().timer(operation.name()).time(() -> {
+        try (OzoneInputStream stream = bucket.readFile(keyName)) {
+          while ((stream.read(readBuffer)) >= 0) {
+          }
+        }
+        return null;
+      });
+      break;
+    case CREATE_FILE:
+      keyName = getPath(counter);
+      getMetrics().timer(operation.name()).time(() -> {
+        try (OutputStream stream = bucket.createFile(
+            keyName, dataSize, replicationConfig, true, false)) {
+          contentGenerator.write(stream);
+        }
+        return null;
+      });
+      break;
+    case LOOKUP_FILE:
+      keyName = getPath(counter);
+      keyArgs = omKeyArgsBuilder.get().setKeyName(keyName).build();
+      getMetrics().timer(operation.name()).time(() -> {
+        ozoneManagerClient.lookupFile(keyArgs);
+        return null;
+      });
+      break;
+    case LIST_KEYS:
+      threadSeqId = getThreadSequenceId();
+      startKeyName = getPath(threadSeqId * batchSize);
+      getMetrics().timer(operation.name()).time(() -> {
+        List<OmKeyInfo> keyInfoList = ozoneManagerClient.listKeys(
+            volumeName, bucketName, startKeyName, "", batchSize);
+        if (keyInfoList.size() + 1 < batchSize) {
+          throw new NoSuchFileException(
+              "There are not enough files for testing you should use "
+                  + "CREATE_FILE to create at least batch-size * threads = "
+                  + batchSize * getThreadNo());
+        }
+        return null;
+      });
+      break;
+    case LIST_STATUS:
+      threadSeqId = getThreadSequenceId();
+      startKeyName = getPath(threadSeqId * batchSize);
+      keyArgs = omKeyArgsBuilder.get().setKeyName("").build();
+      getMetrics().timer(operation.name()).time(() -> {
+        List<OzoneFileStatus> fileStatusList = ozoneManagerClient.listStatus(
+            keyArgs, false, startKeyName, batchSize);
+        if (fileStatusList.size() + 1 < batchSize) {
+          throw new NoSuchFileException(
+              "There are not enough files for testing you should use "
+                  + "CREATE_FILE to create at least batch-size * threads = "
+                  + batchSize * getThreadNo());
+        }
+        return null;
+      });
+      break;
+    case INFO_BUCKET:
+      getMetrics().timer(operation.name()).time(() -> {
+            try {
+              ozoneManagerClient.getBucketInfo(volumeName, bucketName);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+      );
+      break;
+    case INFO_VOLUME:
+      getMetrics().timer(operation.name()).time(() -> {
+            try {
+              ozoneManagerClient.getVolumeInfo(volumeName);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+      );
+      break;
+    default:
+      throw new IllegalStateException("Unrecognized write command " +
+          "type request " + operation);
+    }
+  }
+
+}
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java
index cc450aa12f..f4addf7401 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
 import java.io.PrintStream;
 import java.util.concurrent.TimeUnit;
 import java.util.function.LongSupplier;
+import java.util.function.Supplier;
 
 /**
  * Creates and runs a ProgressBar in new Thread which gets printed on
@@ -44,6 +45,7 @@ public class ProgressBar {
    * True if the progress bar is used from an interactive environment (shell).
    */
   private boolean interactive;
+  private Supplier<String> supplier;
 
   /**
    * Creates a new ProgressBar instance which prints the progress on the given
@@ -55,7 +57,7 @@ public class ProgressBar {
    */
   public ProgressBar(final PrintStream stream, final long maxValue,
                      final LongSupplier currentValue) {
-    this(stream, maxValue, currentValue, System.console() != null);
+    this(stream, maxValue, currentValue, System.console() != null, () -> "");
   }
 
   /**
@@ -64,15 +66,18 @@ public class ProgressBar {
    *
    * @param stream       to display the progress
    * @param maxValue     Maximum value of the progress
-   * @param currentValue Supplier that provides the current value
+   * @param currentValue Supplier that provides the current progressbar value
    * @param interactive  Print progressbar for interactive environments.
+   * @param supplier     Supplier that provides the real time message
    */
   public ProgressBar(final PrintStream stream, final long maxValue,
-      final LongSupplier currentValue, boolean interactive) {
+      final LongSupplier currentValue, boolean interactive,
+      final Supplier<String> supplier) {
     this.maxValue = maxValue;
     this.currentValue = currentValue;
     this.thread = new Thread(getProgressBar(stream));
     this.interactive = interactive;
+    this.supplier = supplier;
   }
 
   /**
@@ -133,7 +138,7 @@ public class ProgressBar {
           Thread.currentThread().interrupt();
         }
       }
-      print(stream, maxValue);
+      print(stream, currentValue.getAsLong());
       println(stream);
       running = false;
     };
@@ -168,12 +173,16 @@ public class ProgressBar {
     stream.print('\r');
     double percent = 100.0 * value / maxValue;
     StringBuilder sb = new StringBuilder();
+    String realTimeMessage = supplier.get();
+    int shrinkTimes = 1;
+    if (realTimeMessage.length() != 0) {
+      shrinkTimes = 3;
+    }
     sb.append(" ").append(String.format("%.2f", percent)).append("% |");
-
-    for (int i = 0; i <= percent; i++) {
+    for (int i = 0; i <= percent / shrinkTimes; i++) {
       sb.append('█');
     }
-    for (int j = 0; j < 100 - percent; j++) {
+    for (int j = 0; j < (100 - percent) / shrinkTimes; j++) {
       sb.append(' ');
     }
     sb.append("|  ");
@@ -183,6 +192,8 @@ public class ProgressBar {
     String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600,
         (timeInSec % 3600) / 60, timeInSec % 60);
     sb.append(" Time: ").append(timeToPrint);
+    sb.append("|  ");
+    sb.append(realTimeMessage);
     stream.print(sb.toString());
   }
 }
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java
index 25b19456b0..50233a454d 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java
@@ -55,7 +55,7 @@ public class TestProgressBar {
     long maxValue = 10L;
 
     ProgressBar progressbar =
-        new ProgressBar(stream, maxValue, currentValue, true);
+        new ProgressBar(stream, maxValue, currentValue, true, () -> "");
 
     Runnable task = () -> LongStream.range(0, maxValue)
         .forEach(counter -> numberOfKeysAdded.getAndIncrement());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org