You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/07/27 05:51:51 UTC
hadoop git commit: HDFS-12180. Ozone: Corona: Add stats and progress
bar to corona. Contributed by Nandakumar.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 c651868c4 -> 5b3cac765
HDFS-12180. Ozone: Corona: Add stats and progress bar to corona. Contributed by Nandakumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5b3cac76
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5b3cac76
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5b3cac76
Branch: refs/heads/HDFS-7240
Commit: 5b3cac765731a9ae1936303706b880547b65eedd
Parents: c651868
Author: Anu Engineer <ae...@apache.org>
Authored: Wed Jul 26 22:48:06 2017 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Wed Jul 26 22:48:06 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/ozone/OzoneClientImpl.java | 6 +-
.../java/org/apache/hadoop/ozone/Corona.java | 207 +++++++++++++++++--
2 files changed, 199 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b3cac76/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
index 11e8fff..01807b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
@@ -55,7 +55,11 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
import java.util.stream.Collectors;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b3cac76/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java
index dc63a5f..581ccc6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java
@@ -77,6 +77,7 @@ public final class Corona extends Configured implements Tool {
private static final String HELP = "help";
private static final String MODE = "mode";
private static final String SOURCE = "source";
+ private static final String NUM_OF_THREADS = "numOfThreads";
private static final String NUM_OF_VOLUMES = "numOfVolumes";
private static final String NUM_OF_BUCKETS = "numOfBuckets";
private static final String NUM_OF_KEYS = "numOfKeys";
@@ -85,19 +86,21 @@ public final class Corona extends Configured implements Tool {
private static final String SOURCE_DEFAULT =
"https://commoncrawl.s3.amazonaws.com/" +
"crawl-data/CC-MAIN-2017-17/warc.paths.gz";
+ private static final String NUM_OF_THREADS_DEFAULT = "10";
private static final String NUM_OF_VOLUMES_DEFAULT = "10";
private static final String NUM_OF_BUCKETS_DEFAULT = "1000";
private static final String NUM_OF_KEYS_DEFAULT = "500000";
- private static final int NUM_OF_THREADS_DEFAULT = 10;
-
private static final Logger LOG =
LoggerFactory.getLogger(Corona.class);
private boolean printUsage = false;
+ private boolean completed = false;
+ private boolean exception = false;
private String mode;
private String source;
+ private String numOfThreads;
private String numOfVolumes;
private String numOfBuckets;
private String numOfKeys;
@@ -107,18 +110,29 @@ public final class Corona extends Configured implements Tool {
private long startTime;
+ private AtomicLong volumeCreationTime;
+ private AtomicLong bucketCreationTime;
+ private AtomicLong keyCreationTime;
+ private AtomicLong keyWriteTime;
+
+ private AtomicLong totalBytesWritten;
+
private AtomicInteger numberOfVolumesCreated;
private AtomicInteger numberOfBucketsCreated;
private AtomicLong numberOfKeysAdded;
private Corona(Configuration conf) throws IOException {
startTime = System.nanoTime();
+ volumeCreationTime = new AtomicLong();
+ bucketCreationTime = new AtomicLong();
+ keyCreationTime = new AtomicLong();
+ keyWriteTime = new AtomicLong();
+ totalBytesWritten = new AtomicLong();
numberOfVolumesCreated = new AtomicInteger();
numberOfBucketsCreated = new AtomicInteger();
numberOfKeysAdded = new AtomicLong();
OzoneClientFactory.setConfiguration(conf);
ozoneClient = OzoneClientFactory.getRpcClient();
- processor = Executors.newFixedThreadPool(NUM_OF_THREADS_DEFAULT);
}
@Override
@@ -130,13 +144,15 @@ public final class Corona extends Configured implements Tool {
usage();
System.exit(0);
}
+ LOG.info("Number of Threads: " + numOfThreads);
+ processor = Executors.newFixedThreadPool(Integer.parseInt(numOfThreads));
addShutdownHook();
if(mode.equals("online")) {
LOG.info("Mode: online");
throw new UnsupportedOperationException("Not yet implemented.");
} else {
LOG.info("Mode: offline");
- LOG.info("Number of Volumes: {}.", numOfBuckets);
+ LOG.info("Number of Volumes: {}.", numOfVolumes);
LOG.info("Number of Buckets per Volume: {}.", numOfBuckets);
LOG.info("Number of Keys per Bucket: {}.", numOfKeys);
for(int i = 0; i < Integer.parseInt(numOfVolumes); i++) {
@@ -144,8 +160,13 @@ public final class Corona extends Configured implements Tool {
RandomStringUtils.randomNumeric(5);
processor.submit(new OfflineProcessor(volume));
}
+ Thread progressbar = getProgressBarThread();
+ LOG.info("Starting progress bar Thread.");
+ progressbar.start();
processor.shutdown();
processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
+ completed = true;
+ progressbar.join();
return 0;
}
}
@@ -170,6 +191,12 @@ public final class Corona extends Configured implements Tool {
OptionBuilder.withArgName("value");
OptionBuilder.hasArg();
+ OptionBuilder.withDescription("number of threads to be launched " +
+ "for the run");
+ Option optNumOfThreads = OptionBuilder.create(NUM_OF_THREADS);
+
+ OptionBuilder.withArgName("value");
+ OptionBuilder.hasArg();
OptionBuilder.withDescription("specifies number of Volumes to be " +
"created in offline mode");
Option optNumOfVolumes = OptionBuilder.create(NUM_OF_VOLUMES);
@@ -189,6 +216,7 @@ public final class Corona extends Configured implements Tool {
options.addOption(optHelp);
options.addOption(optMode);
options.addOption(optSource);
+ options.addOption(optNumOfThreads);
options.addOption(optNumOfVolumes);
options.addOption(optNumOfBuckets);
options.addOption(optNumOfKeys);
@@ -204,6 +232,9 @@ public final class Corona extends Configured implements Tool {
source = cmdLine.hasOption(SOURCE) ?
cmdLine.getOptionValue(SOURCE) : SOURCE_DEFAULT;
+ numOfThreads = cmdLine.hasOption(NUM_OF_THREADS) ?
+ cmdLine.getOptionValue(NUM_OF_THREADS) : NUM_OF_THREADS_DEFAULT;
+
numOfVolumes = cmdLine.hasOption(NUM_OF_VOLUMES) ?
cmdLine.getOptionValue(NUM_OF_VOLUMES) : NUM_OF_VOLUMES_DEFAULT;
@@ -216,6 +247,8 @@ public final class Corona extends Configured implements Tool {
private void usage() {
System.out.println("Options supported are:");
+ System.out.println("-numOfThreads <value> "
+ + "number of threads to be launched for the run.");
System.out.println("-mode [online | offline] "
+ "specifies the mode in which Corona should run.");
System.out.println("-source <url> "
@@ -245,7 +278,9 @@ public final class Corona extends Configured implements Tool {
this.totalKeys = Integer.parseInt(numOfKeys);
this.volume = volume;
LOG.trace("Creating volume: {}", volume);
+ long start = System.nanoTime();
ozoneClient.createVolume(this.volume);
+ volumeCreationTime.getAndAdd(System.nanoTime() - start);
numberOfVolumesCreated.getAndIncrement();
}
@@ -256,7 +291,9 @@ public final class Corona extends Configured implements Tool {
RandomStringUtils.randomNumeric(5);
try {
LOG.trace("Creating bucket: {} in volume: {}", bucket, volume);
+ long start = System.nanoTime();
ozoneClient.createBucket(volume, bucket);
+ bucketCreationTime.getAndAdd(System.nanoTime() - start);
numberOfBucketsCreated.getAndIncrement();
for (int k = 0; k < totalKeys; k++) {
String key = "key-" + k + "-" +
@@ -265,17 +302,24 @@ public final class Corona extends Configured implements Tool {
try {
LOG.trace("Adding key: {} in bucket: {} of volume: {}",
key, bucket, volume);
+ long keyCreateStart = System.nanoTime();
OzoneOutputStream os = ozoneClient.createKey(
volume, bucket, key, value.length);
+ keyCreationTime.getAndAdd(System.nanoTime() - keyCreateStart);
+ long keyWriteStart = System.nanoTime();
os.write(value);
os.close();
+ keyWriteTime.getAndAdd(System.nanoTime() - keyWriteStart);
+ totalBytesWritten.getAndAdd(value.length);
numberOfKeysAdded.getAndIncrement();
} catch (Exception e) {
+ exception = true;
LOG.error("Exception while adding key: {} in bucket: {}" +
" of volume: {}.", key, bucket, volume, e);
}
}
} catch (Exception e) {
+ exception = true;
LOG.error("Exception while creating bucket: {}" +
" in volume: {}.", bucket, volume, e);
}
@@ -287,11 +331,82 @@ public final class Corona extends Configured implements Tool {
* Adds ShutdownHook to print statistics.
*/
private void addShutdownHook() {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- printStats(System.out);
+ Runtime.getRuntime().addShutdownHook(
+ new Thread(() -> printStats(System.out)));
+ }
+
+ private Thread getProgressBarThread() {
+ long maxValue = Integer.parseInt(numOfVolumes) *
+ Integer.parseInt(numOfBuckets) *
+ Integer.parseInt(numOfKeys);
+ Thread progressBarThread = new Thread(
+ new ProgressBar(System.out, maxValue));
+ progressBarThread.setName("ProgressBar");
+ return progressBarThread;
+ }
+
+ private class ProgressBar implements Runnable {
+
+ private final long refreshInterval = 1000L;
+
+ private PrintStream stream;
+ private long maxValue;
+
+ ProgressBar(PrintStream stream, long maxValue) {
+ this.stream = stream;
+ this.maxValue = maxValue;
+ }
+
+ @Override
+ public void run() {
+ try {
+ stream.println();
+ long keys;
+ while((keys = numberOfKeysAdded.get()) < maxValue) {
+ print(keys);
+ if(completed) {
+ break;
+ }
+ Thread.sleep(refreshInterval);
+ }
+ if(exception) {
+ stream.println();
+ stream.println("Incomplete termination, " +
+ "check log for exception.");
+ } else {
+ print(maxValue);
+ }
+ stream.println();
+ } catch (InterruptedException e) {
+ }
+ }
+
+ /**
+ * Given current value prints the progress bar.
+ *
+ * @param currentValue
+ */
+ private void print(long currentValue) {
+ stream.print('\r');
+ double percent = 100.0 * currentValue / maxValue;
+ StringBuilder sb = new StringBuilder();
+ sb.append(" " + String.format("%.2f", percent) + "% |");
+
+ for (int i = 0; i <= percent; i++) {
+ sb.append('█');
+ }
+ for (int j = 0; j < 100 - percent; j++) {
+ sb.append(' ');
}
- });
+ sb.append("| ");
+ sb.append(currentValue + "/" + maxValue);
+ long timeInSec = TimeUnit.SECONDS.convert(
+ System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+ String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600,
+ (timeInSec % 3600) / 60, timeInSec % 60);
+ sb.append(" Time: " + timeToPrint);
+ stream.print(sb);
+ }
}
/**
@@ -300,15 +415,81 @@ public final class Corona extends Configured implements Tool {
* @param out PrintStream
*/
private void printStats(PrintStream out) {
- long timeInSec = TimeUnit.SECONDS.convert(
- System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
- String timeToPrint = timeInSec < 60 ? timeInSec + " seconds" :
- TimeUnit.MINUTES.convert(timeInSec, TimeUnit.SECONDS) + " minuites";
+ int threadCount = Integer.parseInt(numOfThreads);
+
+ long endTime = System.nanoTime() - startTime;
+ String execTime = String.format("%02d:%02d:%02d",
+ TimeUnit.NANOSECONDS.toHours(endTime),
+ TimeUnit.NANOSECONDS.toMinutes(endTime) -
+ TimeUnit.HOURS.toMinutes(
+ TimeUnit.NANOSECONDS.toHours(endTime)),
+ TimeUnit.NANOSECONDS.toSeconds(endTime) -
+ TimeUnit.MINUTES.toSeconds(
+ TimeUnit.NANOSECONDS.toMinutes(endTime)));
+
+ long volumeTime = volumeCreationTime.longValue();
+ String prettyVolumeTime = String.format("%02d:%02d:%02d:%02d",
+ TimeUnit.NANOSECONDS.toHours(volumeTime),
+ TimeUnit.NANOSECONDS.toMinutes(volumeTime) -
+ TimeUnit.HOURS.toMinutes(
+ TimeUnit.NANOSECONDS.toHours(volumeTime)),
+ TimeUnit.NANOSECONDS.toSeconds(volumeTime) -
+ TimeUnit.MINUTES.toSeconds(
+ TimeUnit.NANOSECONDS.toMinutes(volumeTime)),
+ TimeUnit.NANOSECONDS.toMillis(volumeTime) -
+ TimeUnit.SECONDS.toMillis(
+ TimeUnit.NANOSECONDS.toSeconds(volumeTime)));
+
+ long bucketTime = bucketCreationTime.longValue() / threadCount;
+ String prettyBucketTime = String.format("%02d:%02d:%02d:%02d",
+ TimeUnit.NANOSECONDS.toHours(bucketTime),
+ TimeUnit.NANOSECONDS.toMinutes(bucketTime) -
+ TimeUnit.HOURS.toMinutes(
+ TimeUnit.NANOSECONDS.toHours(bucketTime)),
+ TimeUnit.NANOSECONDS.toSeconds(bucketTime) -
+ TimeUnit.MINUTES.toSeconds(
+ TimeUnit.NANOSECONDS.toMinutes(bucketTime)),
+ TimeUnit.NANOSECONDS.toMillis(bucketTime) -
+ TimeUnit.SECONDS.toMillis(
+ TimeUnit.NANOSECONDS.toSeconds(bucketTime)));
+
+ long totalKeyCreationTime = keyCreationTime.longValue() / threadCount;
+ String prettyKeyCreationTime = String.format("%02d:%02d:%02d:%02d",
+ TimeUnit.NANOSECONDS.toHours(totalKeyCreationTime),
+ TimeUnit.NANOSECONDS.toMinutes(totalKeyCreationTime) -
+ TimeUnit.HOURS.toMinutes(
+ TimeUnit.NANOSECONDS.toHours(totalKeyCreationTime)),
+ TimeUnit.NANOSECONDS.toSeconds(totalKeyCreationTime) -
+ TimeUnit.MINUTES.toSeconds(
+ TimeUnit.NANOSECONDS.toMinutes(totalKeyCreationTime)),
+ TimeUnit.NANOSECONDS.toMillis(totalKeyCreationTime) -
+ TimeUnit.SECONDS.toMillis(
+ TimeUnit.NANOSECONDS.toSeconds(totalKeyCreationTime)));
+
+ long totalKeyWriteTime = keyWriteTime.longValue() / threadCount;
+ String prettyKeyWriteTime = String.format("%02d:%02d:%02d:%02d",
+ TimeUnit.NANOSECONDS.toHours(totalKeyWriteTime),
+ TimeUnit.NANOSECONDS.toMinutes(totalKeyWriteTime) -
+ TimeUnit.HOURS.toMinutes(
+ TimeUnit.NANOSECONDS.toHours(totalKeyWriteTime)),
+ TimeUnit.NANOSECONDS.toSeconds(totalKeyWriteTime) -
+ TimeUnit.MINUTES.toSeconds(
+ TimeUnit.NANOSECONDS.toMinutes(totalKeyWriteTime)),
+ TimeUnit.NANOSECONDS.toMillis(totalKeyWriteTime) -
+ TimeUnit.SECONDS.toMillis(
+ TimeUnit.NANOSECONDS.toSeconds(totalKeyWriteTime)));
+
+ out.println();
out.println("***************************************************");
out.println("Number of Volumes created: " + numberOfVolumesCreated);
out.println("Number of Buckets created: " + numberOfBucketsCreated);
out.println("Number of Keys added: " + numberOfKeysAdded);
- out.println("Execution time: " + timeToPrint);
+ out.println("Time spent in volume creation: " + prettyVolumeTime);
+ out.println("Time spent in bucket creation: " + prettyBucketTime);
+ out.println("Time spent in key creation: " + prettyKeyCreationTime);
+ out.println("Time spent in writing keys: " + prettyKeyWriteTime);
+ out.println("Total bytes written: " + totalBytesWritten);
+ out.println("Total Execution time: " + execTime);
out.println("***************************************************");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org