You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2023/04/17 13:01:50 UTC
[ozone] branch master updated: HDDS-7908. Support OM Metadata operation Generator in `Ozone freon` (#4251)
This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new cffa3864b9 HDDS-7908. Support OM Metadata operation Generator in `Ozone freon` (#4251)
cffa3864b9 is described below
commit cffa3864b974499078fbdb37506ed992319b09c5
Author: XiChen <32...@users.noreply.github.com>
AuthorDate: Mon Apr 17 21:01:41 2023 +0800
HDDS-7908. Support OM Metadata operation Generator in `Ozone freon` (#4251)
---
.../hadoop/ozone/freon/BaseFreonGenerator.java | 118 +++++-
.../java/org/apache/hadoop/ozone/freon/Freon.java | 3 +-
.../hadoop/ozone/freon/OmMetadataGenerator.java | 452 +++++++++++++++++++++
.../org/apache/hadoop/ozone/freon/ProgressBar.java | 25 +-
.../apache/hadoop/ozone/freon/TestProgressBar.java | 2 +-
5 files changed, 580 insertions(+), 20 deletions(-)
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java
index 90ee15e61f..35e753b41d 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java
@@ -18,18 +18,24 @@ package org.apache.hadoop.ozone.freon;
import java.io.IOException;
import java.io.InputStream;
+import java.time.Duration;
+import java.time.Instant;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.TimeDurationUtil;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.utils.HAUtils;
@@ -60,6 +66,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
import org.apache.ratis.protocol.ClientId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import picocli.CommandLine;
import picocli.CommandLine.Option;
import picocli.CommandLine.ParentCommand;
@@ -92,6 +99,11 @@ public class BaseFreonGenerator {
defaultValue = "10")
private int threadNo;
+ @Option(names = {"--duration"},
+ description = "Duration to run the test. "
+ + " Can be '30s', '5m', '1h', '7d' etc..")
+ private String duration;
+
@Option(names = {"-f", "--fail-at-end"},
description = "If turned on, all the tasks will be executed even if "
+ "there are failures.")
@@ -104,6 +116,14 @@ public class BaseFreonGenerator {
defaultValue = "")
private String prefix = "";
+ @Option(names = {"--verbose"},
+ description = "More verbose output. "
+ + "Show all the command line Option info.")
+ private boolean verbose;
+
+ @CommandLine.Spec
+ private CommandLine.Model.CommandSpec spec;
+
private MetricRegistry metrics = new MetricRegistry();
private AtomicLong successCounter;
@@ -111,12 +131,19 @@ public class BaseFreonGenerator {
private AtomicLong attemptCounter;
private long startTime;
+ private long durationInSecond;
+ private boolean timebase;
private PathSchema pathSchema;
private String spanName;
private ExecutorService executor;
private ProgressBar progressBar;
+ private final ThreadLocal<Long> threadSequenceId = new ThreadLocal<>();
+ private final AtomicLong id = new AtomicLong(0);
+
+ private final AtomicBoolean completed = new AtomicBoolean(false);
+
/**
* The main logic to execute a test generator.
*
@@ -153,15 +180,24 @@ public class BaseFreonGenerator {
* concurrently in {@code executor}.
*/
private void taskLoop(TaskProvider provider) {
- while (true) {
+ threadSequenceId.set(id.getAndIncrement());
+ while (!completed.get()) {
long counter = attemptCounter.getAndIncrement();
-
- //in case of an other failed test, we shouldn't execute more tasks.
- if (counter >= testNo || (!failAtEnd && failureCounter.get() > 0)) {
- break;
+ if (timebase) {
+ if (System.currentTimeMillis()
+ > startTime + TimeUnit.SECONDS.toMillis(durationInSecond)) {
+ completed.set(true);
+ break;
+ }
+ } else {
+ //in case of an other failed test, we shouldn't execute more tasks.
+ if (counter >= testNo || (!failAtEnd && failureCounter.get() > 0)) {
+ completed.set(true);
+ break;
+ }
}
- tryNextTask(provider, counter);
+ tryNextTask(provider, counter % testNo);
}
taskLoopCompleted();
@@ -198,8 +234,7 @@ public class BaseFreonGenerator {
* thread.
*/
private void waitForCompletion() {
- while (successCounter.get() + failureCounter.get() < testNo && (
- failureCounter.get() == 0 || failAtEnd)) {
+ while (!completed.get() && (failureCounter.get() == 0 || failAtEnd)) {
try {
Thread.sleep(CHECK_INTERVAL_MILLIS);
} catch (InterruptedException e) {
@@ -215,6 +250,7 @@ public class BaseFreonGenerator {
} else {
progressBar.shutdown();
}
+ threadSequenceId.remove();
executor.shutdown();
try {
executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
@@ -250,6 +286,22 @@ public class BaseFreonGenerator {
//replace environment variables to support multi-node execution
prefix = resolvePrefix(prefix);
}
+ if (duration != null) {
+ durationInSecond = TimeDurationUtil.getTimeDurationHelper(
+ "--runtime", duration, TimeUnit.SECONDS);
+ if (durationInSecond <= 0) {
+ throw new IllegalArgumentException(
+ "Incomplete command, "
+ + "the runtime must be given, and must not be negative");
+ }
+ timebase = true;
+ }
+
+ if (testNo <= 0) {
+ throw new IllegalArgumentException(
+ "Invalid command, "
+ + "the testNo must be a positive integer");
+ }
LOG.info("Executing test with prefix {} " +
"and number-of-tests {}", prefix, testNo);
@@ -263,17 +315,33 @@ public class BaseFreonGenerator {
LOG.error("HTTP server can't be stopped.", ex);
}
printReport();
+ if (verbose) {
+ printOption();
+ }
}, 10);
executor = Executors.newFixedThreadPool(threadNo);
-
- progressBar = new ProgressBar(System.out, testNo, successCounter::get,
- freonCommand.isInteractive());
+ long maxValue;
+ LongSupplier supplier;
+ if (duration != null) {
+ maxValue = durationInSecond;
+ supplier = () -> Duration.between(
+ Instant.ofEpochMilli(startTime), Instant.now()).getSeconds();
+ } else {
+ maxValue = testNo;
+ supplier = successCounter::get;
+ }
+ progressBar = new ProgressBar(System.out, maxValue, supplier,
+ freonCommand.isInteractive(), realTimeStatusSupplier());
progressBar.start();
startTime = System.currentTimeMillis();
}
+ public Supplier<String> realTimeStatusSupplier() {
+ return () -> "";
+ }
+
/**
* Resolve environment variables in the prefixes.
*/
@@ -314,6 +382,23 @@ public class BaseFreonGenerator {
messages.forEach(print);
}
+ /**
+ * Print Option info about the executed tests.
+ */
+ public void printOption() {
+ List<String> messages = new LinkedList<>();
+ messages.add("\nOption:");
+ for (CommandLine.Model.OptionSpec option : spec.options()) {
+ String name = option.longestName();
+ messages.add(name + "=" + option.getValue());
+ }
+
+ Consumer<String> print = freonCommand.isInteractive()
+ ? System.out::println
+ : LOG::info;
+ messages.forEach(print);
+ }
+
/**
* Print out reports with the given message.
*/
@@ -484,6 +569,17 @@ public class BaseFreonGenerator {
this.threadNo = threadNo;
}
+ /**
+ * Get current Thread sequence ID.
+ * Similar to Thread tid, but sequence ID starts at 0 and is consecutive
+ * each thread will be set a self-incrementing sequence id
+ * after it be created.
+ * @return Current Thread sequence ID
+ */
+ public long getThreadSequenceId() {
+ return threadSequenceId.get();
+ }
+
protected OzoneClient createOzoneClient(String omServiceID,
OzoneConfiguration conf) throws Exception {
if (omServiceID != null) {
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
index fcb9a5b27e..d346866352 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
@@ -70,7 +70,8 @@ import picocli.CommandLine.Option;
OmRPCLoadGenerator.class,
OzoneClientKeyReadWriteOps.class,
RangeKeysGenerator.class,
- DatanodeSimulator.class
+ DatanodeSimulator.class,
+ OmMetadataGenerator.class
},
versionProvider = HddsVersionProvider.class,
mixinStandardHelpOptions = true)
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java
new file mode 100644
index 0000000000..6ce460563f
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.NoSuchFileException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricFilter;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs.Builder;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.security.UserGroupInformation;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
+
+/**
+ * Data generator tool test om performance.
+ */
+@Command(name = "ommg",
+ aliases = "om-metadata-generator",
+ description = "Create metadata operation to the OM.",
+ versionProvider = HddsVersionProvider.class,
+ mixinStandardHelpOptions = true,
+ showDefaultValues = true)
+public class OmMetadataGenerator extends BaseFreonGenerator
+ implements Callable<Void> {
+
+ enum Operation {
+ CREATE_FILE,
+ LOOKUP_FILE,
+ READ_FILE,
+ LIST_STATUS,
+ CREATE_KEY,
+ LOOKUP_KEY,
+ GET_KEYINFO,
+ HEAD_KEY,
+ READ_KEY,
+ LIST_KEYS,
+ INFO_BUCKET,
+ INFO_VOLUME,
+ MIXED,
+ }
+
+ @Option(names = {"-v", "--volume"},
+ description = "Name of the volume which contains the test data. Will be"
+ + " created if missing.",
+ defaultValue = "vol1")
+ private String volumeName;
+
+ @Option(names = {"-b", "--bucket"},
+ description = "Name of the bucket which contains the test data. Will be"
+ + " created if missing.",
+ defaultValue = "bucket1")
+ private String bucketName;
+
+ @Option(names = {"-s", "--size"},
+ description = "The size in byte of a file for the Create File/Key op.",
+ defaultValue = "0")
+ private int dataSize;
+
+ @Option(names = {"--buffer"},
+ description = "Size of buffer used to generated the key content.",
+ defaultValue = "4096")
+ private int bufferSize;
+
+ @Option(
+ names = "--batch-size",
+ description = "The number of key/file requests per LIST_KEY/LIST_STATUS"
+ + " request.",
+ defaultValue = "1000")
+ private int batchSize;
+
+ @Option(
+ names = "--random",
+ description = "random read/write if given. This means that it"
+ + " is possible to read/write the same file at the same time",
+ defaultValue = "false")
+ private boolean randomOp;
+
+ @Option(names = {"-o", "--operation"},
+ description = "The operation to perform, --ophelp Print detail")
+ private Operation operation;
+
+ @Option(names = {"--ops"},
+ description = "The operations list to perform, --ophelp Print detail")
+ private String operationsList;
+
+ @Option(names = {"--opsnum"},
+ description = "The number of threads per operations, the values sum"
+ + " must equal the number of threads, --ophelp Print detail")
+ private String operationsNum;
+
+ @Option(names = {"--ophelp"},
+ description = "Print operation help, or list available operation")
+ private boolean opHelp;
+
+ @Option(
+ names = "--om-service-id",
+ description = "OM Service ID"
+ )
+ private String omServiceID;
+
+ private OzoneManagerProtocol ozoneManagerClient;
+
+ private ThreadLocal<OmKeyArgs.Builder> omKeyArgsBuilder;
+
+ private OzoneBucket bucket;
+
+ private ContentGenerator contentGenerator;
+ private final byte[] readBuffer = new byte[4096];
+ private ReplicationConfig replicationConfig;
+ private Operation[] operations;
+ private boolean mixedOperation = false;
+
+ @Override
+ public Void call() throws Exception {
+ if (opHelp || operation == null) {
+ System.out.println(getUsage());
+ return null;
+ }
+ if (operation.equals(Operation.MIXED)) {
+ initMixedOperation();
+ mixedOperation = true;
+ }
+ init();
+ contentGenerator = new ContentGenerator(dataSize, bufferSize);
+ omKeyArgsBuilder = ThreadLocal.withInitial(this::createKeyArgsBuilder);
+ OzoneConfiguration conf = createOzoneConfiguration();
+ replicationConfig = ReplicationConfig.getDefault(conf);
+
+ try (OzoneClient rpcClient = createOzoneClient(omServiceID, conf)) {
+ ensureVolumeAndBucketExist(rpcClient, volumeName, bucketName);
+ ozoneManagerClient = createOmClient(conf, omServiceID);
+ bucket = rpcClient.getObjectStore().getVolume(volumeName)
+ .getBucket(bucketName);
+ runTests(this::applyOperation);
+ } finally {
+ if (ozoneManagerClient != null) {
+ ozoneManagerClient.close();
+ omKeyArgsBuilder.remove();
+ }
+ }
+ return null;
+ }
+
+ private void initMixedOperation() {
+ if (operationsList == null || operationsNum == null) {
+ throw new IllegalArgumentException(
+ "--ops and --opsnum must be given, if --operation is MIXED");
+ }
+ List<Operation> ops =
+ Arrays.stream(operationsList.split(",")).map(Operation::valueOf)
+ .collect(Collectors.toList());
+ List<Integer> opsNum =
+ Arrays.stream(operationsNum.split(",")).map(Integer::valueOf)
+ .collect(Collectors.toList());
+ if (ops.size() != opsNum.size()
+ || opsNum.stream().mapToInt(x -> x).sum() != getThreadNo()) {
+ throw new IllegalArgumentException(
+ "the --opsnum values sum must equal the number of threads");
+ }
+
+ int index = 0;
+ // if --ops is A,B,C --opsnum is 3,2,1
+ // so the operations will be [A, A, A, B, B, C]
+ // so the thread with seq id [0, 2] will execute A,
+ // the thread with seq id [3, 4] will execute A,
+ // the thread with seq id [5, 5] will execute C
+ operations = new Operation[getThreadNo()];
+ for (int i = 0; i < ops.size(); i++) {
+ Operation op = ops.get(i);
+ int num = opsNum.get(i);
+ for (int j = 0; j < num; j++) {
+ operations[index] = op;
+ index++;
+ }
+ }
+ }
+
+ public static String getUsage() {
+ return String.join("\n", ImmutableList.of(
+ "A tool to measure the Ozone om performance",
+ "support Operation: ",
+ " " + EnumSet.allOf(Operation.class).stream().map(Enum::toString)
+ .collect(Collectors.joining(", ")),
+ "\nExample: ",
+ "# create 25000 keys, run time 180s",
+ "$ bin/ozone freon ommg --operation CREATE_KEY -n 25000"
+ + " --duration 180s\n",
+ "# read 25000 keys, run time 180s",
+ "$ bin/ozone freon ommg --operation READ_KEY -n 25000"
+ + " --duration 180s\n",
+ "# 20 threads, list 1000 keys each request, and run time 180s",
+ "$ bin/ozone freon ommg --operation LIST_KEYS -t 20 --batch-size 1000"
+ + " --duration 180s\n",
+ "# 10 threads, 1 threads list keys, 5 threads create file,"
+ + " 4 threads lookup file and run time 180s",
+ "$ bin/ozone freon ommg"
+ + " --operation MIXED --ops CREATE_FILE,LOOKUP_FILE,LIST_STATUS"
+ + " --opsnum 5,4,1 -t 10 -n 1000 --duration 180s\n",
+ "Note that: You must create a sufficient number of "
+ + "objects before executing read-related tests\n"
+ ));
+ }
+
+ private OmKeyArgs.Builder createKeyArgsBuilder() {
+ UserGroupInformation ugi = null;
+ try {
+ ugi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return new Builder()
+ .setBucketName(bucketName)
+ .setVolumeName(volumeName)
+ .setReplicationConfig(replicationConfig)
+ .setLocationInfoList(new ArrayList<>())
+ .setAcls(OzoneAclUtil.getAclList(ugi.getUserName(), ugi.getGroupNames(),
+ ALL, ALL));
+ }
+
+ private String getPath(long counter) {
+ // Ensure that the dictionary order of path String
+ // is the same as the order of numeric.
+ // This is useful for LIST_KEYS/LIST_STATUS.
+ // The file "0..0001" + 1000 will be "0..1001"
+ //
+ // The size is 19, because the decimal long type can have up to 19 digits
+ return StringUtils.leftPad(String.valueOf(counter), 19, '0');
+ }
+
+ @Override
+ public Supplier<String> realTimeStatusSupplier() {
+ final Map<String, Long> maxValueRecorder = new HashMap<>();
+ final Map<String, Long> valueRecorder = new HashMap<>();
+ final Map<String, Instant> instantsRecorder = new HashMap<>();
+ return () -> {
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<String, Timer> entry
+ : getMetrics().getTimers(MetricFilter.ALL).entrySet()) {
+ String name = entry.getKey();
+ long maxValue = maxValueRecorder.getOrDefault(name, -1L);
+ long preValue = valueRecorder.getOrDefault(name, 0L);
+ Instant preInstant = instantsRecorder.getOrDefault(name, Instant.now());
+
+ long curValue = entry.getValue().getCount();
+ Instant now = Instant.now();
+ long duration = Duration.between(preInstant, now).getSeconds();
+ long rate = ((curValue - preValue) / (duration == 0 ? 1 : duration));
+ maxValue = Math.max(rate, maxValue);
+
+ maxValueRecorder.put(name, maxValue);
+ valueRecorder.put(name, curValue);
+ instantsRecorder.put(name, now);
+ sb.append(" ")
+ .append(name)
+ .append(": rate ")
+ .append(rate)
+ .append(" max ")
+ .append(maxValue);
+ }
+ sb.append(" ");
+ return sb.toString();
+ };
+ }
+
+ @SuppressWarnings("checkstyle:EmptyBlock")
+ private void applyOperation(long counter) throws Exception {
+ OmKeyArgs keyArgs;
+ String keyName;
+ long threadSeqId;
+ String startKeyName;
+ if (mixedOperation) {
+ threadSeqId = getThreadSequenceId();
+ operation = operations[(int)threadSeqId];
+ }
+ if (randomOp) {
+ counter = ThreadLocalRandom.current().nextLong(getTestNo());
+ }
+ switch (operation) {
+ case CREATE_KEY:
+ keyName = getPath(counter);
+ getMetrics().timer(operation.name()).time(() -> {
+ try (OutputStream stream = bucket.createKey(keyName, dataSize)) {
+ contentGenerator.write(stream);
+ }
+ return null;
+ });
+ break;
+ case LOOKUP_KEY:
+ keyName = getPath(counter);
+ keyArgs = omKeyArgsBuilder.get().setKeyName(keyName).build();
+ getMetrics().timer(operation.name()).time(() -> {
+ ozoneManagerClient.lookupKey(keyArgs);
+ return null;
+ });
+ break;
+ case GET_KEYINFO:
+ keyName = getPath(counter);
+ keyArgs = omKeyArgsBuilder.get().setKeyName(keyName).build();
+ getMetrics().timer(operation.name()).time(() -> {
+ ozoneManagerClient.getKeyInfo(keyArgs, false);
+ return null;
+ });
+ break;
+ case HEAD_KEY:
+ keyName = getPath(counter);
+ keyArgs = omKeyArgsBuilder.get()
+ .setKeyName(keyName).setHeadOp(true).build();
+ getMetrics().timer(operation.name()).time(() -> {
+ ozoneManagerClient.getKeyInfo(keyArgs, false);
+ return null;
+ });
+ break;
+ case READ_KEY:
+ keyName = getPath(counter);
+ getMetrics().timer(operation.name()).time(() -> {
+ try (OzoneInputStream stream = bucket.readKey(keyName)) {
+ while ((stream.read(readBuffer)) >= 0) {
+ }
+ }
+ return null;
+ });
+ break;
+ case READ_FILE:
+ keyName = getPath(counter);
+ getMetrics().timer(operation.name()).time(() -> {
+ try (OzoneInputStream stream = bucket.readFile(keyName)) {
+ while ((stream.read(readBuffer)) >= 0) {
+ }
+ }
+ return null;
+ });
+ break;
+ case CREATE_FILE:
+ keyName = getPath(counter);
+ getMetrics().timer(operation.name()).time(() -> {
+ try (OutputStream stream = bucket.createFile(
+ keyName, dataSize, replicationConfig, true, false)) {
+ contentGenerator.write(stream);
+ }
+ return null;
+ });
+ break;
+ case LOOKUP_FILE:
+ keyName = getPath(counter);
+ keyArgs = omKeyArgsBuilder.get().setKeyName(keyName).build();
+ getMetrics().timer(operation.name()).time(() -> {
+ ozoneManagerClient.lookupFile(keyArgs);
+ return null;
+ });
+ break;
+ case LIST_KEYS:
+ threadSeqId = getThreadSequenceId();
+ startKeyName = getPath(threadSeqId * batchSize);
+ getMetrics().timer(operation.name()).time(() -> {
+ List<OmKeyInfo> keyInfoList = ozoneManagerClient.listKeys(
+ volumeName, bucketName, startKeyName, "", batchSize);
+ if (keyInfoList.size() + 1 < batchSize) {
+ throw new NoSuchFileException(
+ "There are not enough files for testing you should use "
+ + "CREATE_FILE to create at least batch-size * threads = "
+ + batchSize * getThreadNo());
+ }
+ return null;
+ });
+ break;
+ case LIST_STATUS:
+ threadSeqId = getThreadSequenceId();
+ startKeyName = getPath(threadSeqId * batchSize);
+ keyArgs = omKeyArgsBuilder.get().setKeyName("").build();
+ getMetrics().timer(operation.name()).time(() -> {
+ List<OzoneFileStatus> fileStatusList = ozoneManagerClient.listStatus(
+ keyArgs, false, startKeyName, batchSize);
+ if (fileStatusList.size() + 1 < batchSize) {
+ throw new NoSuchFileException(
+ "There are not enough files for testing you should use "
+ + "CREATE_FILE to create at least batch-size * threads = "
+ + batchSize * getThreadNo());
+ }
+ return null;
+ });
+ break;
+ case INFO_BUCKET:
+ getMetrics().timer(operation.name()).time(() -> {
+ try {
+ ozoneManagerClient.getBucketInfo(volumeName, bucketName);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ );
+ break;
+ case INFO_VOLUME:
+ getMetrics().timer(operation.name()).time(() -> {
+ try {
+ ozoneManagerClient.getVolumeInfo(volumeName);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ );
+ break;
+ default:
+ throw new IllegalStateException("Unrecognized write command " +
+ "type request " + operation);
+ }
+ }
+
+}
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java
index cc450aa12f..f4addf7401 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import java.io.PrintStream;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
+import java.util.function.Supplier;
/**
* Creates and runs a ProgressBar in new Thread which gets printed on
@@ -44,6 +45,7 @@ public class ProgressBar {
* True if the progress bar is used from an interactive environment (shell).
*/
private boolean interactive;
+ private Supplier<String> supplier;
/**
* Creates a new ProgressBar instance which prints the progress on the given
@@ -55,7 +57,7 @@ public class ProgressBar {
*/
public ProgressBar(final PrintStream stream, final long maxValue,
final LongSupplier currentValue) {
- this(stream, maxValue, currentValue, System.console() != null);
+ this(stream, maxValue, currentValue, System.console() != null, () -> "");
}
/**
@@ -64,15 +66,18 @@ public class ProgressBar {
*
* @param stream to display the progress
* @param maxValue Maximum value of the progress
- * @param currentValue Supplier that provides the current value
+ * @param currentValue Supplier that provides the current progressbar value
* @param interactive Print progressbar for interactive environments.
+ * @param supplier Supplier that provides the real time message
*/
public ProgressBar(final PrintStream stream, final long maxValue,
- final LongSupplier currentValue, boolean interactive) {
+ final LongSupplier currentValue, boolean interactive,
+ final Supplier<String> supplier) {
this.maxValue = maxValue;
this.currentValue = currentValue;
this.thread = new Thread(getProgressBar(stream));
this.interactive = interactive;
+ this.supplier = supplier;
}
/**
@@ -133,7 +138,7 @@ public class ProgressBar {
Thread.currentThread().interrupt();
}
}
- print(stream, maxValue);
+ print(stream, currentValue.getAsLong());
println(stream);
running = false;
};
@@ -168,12 +173,16 @@ public class ProgressBar {
stream.print('\r');
double percent = 100.0 * value / maxValue;
StringBuilder sb = new StringBuilder();
+ String realTimeMessage = supplier.get();
+ int shrinkTimes = 1;
+ if (realTimeMessage.length() != 0) {
+ shrinkTimes = 3;
+ }
sb.append(" ").append(String.format("%.2f", percent)).append("% |");
-
- for (int i = 0; i <= percent; i++) {
+ for (int i = 0; i <= percent / shrinkTimes; i++) {
sb.append('█');
}
- for (int j = 0; j < 100 - percent; j++) {
+ for (int j = 0; j < (100 - percent) / shrinkTimes; j++) {
sb.append(' ');
}
sb.append("| ");
@@ -183,6 +192,8 @@ public class ProgressBar {
String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600,
(timeInSec % 3600) / 60, timeInSec % 60);
sb.append(" Time: ").append(timeToPrint);
+ sb.append("| ");
+ sb.append(realTimeMessage);
stream.print(sb.toString());
}
}
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java
index 25b19456b0..50233a454d 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java
@@ -55,7 +55,7 @@ public class TestProgressBar {
long maxValue = 10L;
ProgressBar progressbar =
- new ProgressBar(stream, maxValue, currentValue, true);
+ new ProgressBar(stream, maxValue, currentValue, true, () -> "");
Runnable task = () -> LongStream.range(0, maxValue)
.forEach(counter -> numberOfKeysAdded.getAndIncrement());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org