You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/07/27 05:51:51 UTC

hadoop git commit: HDFS-12180. Ozone: Corona: Add stats and progress bar to corona. Contributed by Nandakumar.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 c651868c4 -> 5b3cac765


HDFS-12180. Ozone: Corona: Add stats and progress bar to corona. Contributed by  Nandakumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5b3cac76
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5b3cac76
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5b3cac76

Branch: refs/heads/HDFS-7240
Commit: 5b3cac765731a9ae1936303706b880547b65eedd
Parents: c651868
Author: Anu Engineer <ae...@apache.org>
Authored: Wed Jul 26 22:48:06 2017 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Wed Jul 26 22:48:06 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/ozone/OzoneClientImpl.java    |   6 +-
 .../java/org/apache/hadoop/ozone/Corona.java    | 207 +++++++++++++++++--
 2 files changed, 199 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b3cac76/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
index 11e8fff..01807b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
@@ -55,7 +55,11 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b3cac76/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java
index dc63a5f..581ccc6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java
@@ -77,6 +77,7 @@ public final class Corona extends Configured implements Tool {
   private static final String HELP = "help";
   private static final String MODE = "mode";
   private static final String SOURCE = "source";
+  private static final String NUM_OF_THREADS = "numOfThreads";
   private static final String NUM_OF_VOLUMES = "numOfVolumes";
   private static final String NUM_OF_BUCKETS = "numOfBuckets";
   private static final String NUM_OF_KEYS = "numOfKeys";
@@ -85,19 +86,21 @@ public final class Corona extends Configured implements Tool {
   private static final String SOURCE_DEFAULT =
       "https://commoncrawl.s3.amazonaws.com/" +
           "crawl-data/CC-MAIN-2017-17/warc.paths.gz";
+  private static final String NUM_OF_THREADS_DEFAULT = "10";
   private static final String NUM_OF_VOLUMES_DEFAULT = "10";
   private static final String NUM_OF_BUCKETS_DEFAULT = "1000";
   private static final String NUM_OF_KEYS_DEFAULT = "500000";
 
-  private static final int NUM_OF_THREADS_DEFAULT = 10;
-
   private static final Logger LOG =
       LoggerFactory.getLogger(Corona.class);
 
   private boolean printUsage = false;
+  private boolean completed = false;
+  private boolean exception = false;
 
   private String mode;
   private String source;
+  private String numOfThreads;
   private String numOfVolumes;
   private String numOfBuckets;
   private String numOfKeys;
@@ -107,18 +110,29 @@ public final class Corona extends Configured implements Tool {
 
   private long startTime;
 
+  private AtomicLong volumeCreationTime;
+  private AtomicLong bucketCreationTime;
+  private AtomicLong keyCreationTime;
+  private AtomicLong keyWriteTime;
+
+  private AtomicLong totalBytesWritten;
+
   private AtomicInteger numberOfVolumesCreated;
   private AtomicInteger numberOfBucketsCreated;
   private AtomicLong numberOfKeysAdded;
 
   private Corona(Configuration conf) throws IOException {
     startTime = System.nanoTime();
+    volumeCreationTime = new AtomicLong();
+    bucketCreationTime = new AtomicLong();
+    keyCreationTime = new AtomicLong();
+    keyWriteTime = new AtomicLong();
+    totalBytesWritten = new AtomicLong();
     numberOfVolumesCreated = new AtomicInteger();
     numberOfBucketsCreated = new AtomicInteger();
     numberOfKeysAdded = new AtomicLong();
     OzoneClientFactory.setConfiguration(conf);
     ozoneClient = OzoneClientFactory.getRpcClient();
-    processor = Executors.newFixedThreadPool(NUM_OF_THREADS_DEFAULT);
   }
 
   @Override
@@ -130,13 +144,15 @@ public final class Corona extends Configured implements Tool {
       usage();
       System.exit(0);
     }
+    LOG.info("Number of Threads: " + numOfThreads);
+    processor = Executors.newFixedThreadPool(Integer.parseInt(numOfThreads));
     addShutdownHook();
     if(mode.equals("online")) {
       LOG.info("Mode: online");
       throw new UnsupportedOperationException("Not yet implemented.");
     } else {
       LOG.info("Mode: offline");
-      LOG.info("Number of Volumes: {}.", numOfBuckets);
+      LOG.info("Number of Volumes: {}.", numOfVolumes);
       LOG.info("Number of Buckets per Volume: {}.", numOfBuckets);
       LOG.info("Number of Keys per Bucket: {}.", numOfKeys);
       for(int i = 0; i < Integer.parseInt(numOfVolumes); i++) {
@@ -144,8 +160,13 @@ public final class Corona extends Configured implements Tool {
             RandomStringUtils.randomNumeric(5);
         processor.submit(new OfflineProcessor(volume));
       }
+      Thread progressbar = getProgressBarThread();
+      LOG.info("Starting progress bar Thread.");
+      progressbar.start();
       processor.shutdown();
       processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
+      completed = true;
+      progressbar.join();
       return 0;
     }
   }
@@ -170,6 +191,12 @@ public final class Corona extends Configured implements Tool {
 
     OptionBuilder.withArgName("value");
     OptionBuilder.hasArg();
+    OptionBuilder.withDescription("number of threads to be launched " +
+        "for the run");
+    Option optNumOfThreads = OptionBuilder.create(NUM_OF_THREADS);
+
+    OptionBuilder.withArgName("value");
+    OptionBuilder.hasArg();
     OptionBuilder.withDescription("specifies number of Volumes to be " +
         "created in offline mode");
     Option optNumOfVolumes = OptionBuilder.create(NUM_OF_VOLUMES);
@@ -189,6 +216,7 @@ public final class Corona extends Configured implements Tool {
     options.addOption(optHelp);
     options.addOption(optMode);
     options.addOption(optSource);
+    options.addOption(optNumOfThreads);
     options.addOption(optNumOfVolumes);
     options.addOption(optNumOfBuckets);
     options.addOption(optNumOfKeys);
@@ -204,6 +232,9 @@ public final class Corona extends Configured implements Tool {
     source = cmdLine.hasOption(SOURCE) ?
         cmdLine.getOptionValue(SOURCE) : SOURCE_DEFAULT;
 
+    numOfThreads = cmdLine.hasOption(NUM_OF_THREADS) ?
+        cmdLine.getOptionValue(NUM_OF_THREADS) : NUM_OF_THREADS_DEFAULT;
+
     numOfVolumes = cmdLine.hasOption(NUM_OF_VOLUMES) ?
         cmdLine.getOptionValue(NUM_OF_VOLUMES) : NUM_OF_VOLUMES_DEFAULT;
 
@@ -216,6 +247,8 @@ public final class Corona extends Configured implements Tool {
 
   private void usage() {
     System.out.println("Options supported are:");
+    System.out.println("-numOfThreads <value>           "
+        + "number of threads to be launched for the run.");
     System.out.println("-mode [online | offline]        "
         + "specifies the mode in which Corona should run.");
     System.out.println("-source <url>                   "
@@ -245,7 +278,9 @@ public final class Corona extends Configured implements Tool {
       this.totalKeys = Integer.parseInt(numOfKeys);
       this.volume = volume;
       LOG.trace("Creating volume: {}", volume);
+      long start = System.nanoTime();
       ozoneClient.createVolume(this.volume);
+      volumeCreationTime.getAndAdd(System.nanoTime() - start);
       numberOfVolumesCreated.getAndIncrement();
     }
 
@@ -256,7 +291,9 @@ public final class Corona extends Configured implements Tool {
             RandomStringUtils.randomNumeric(5);
         try {
           LOG.trace("Creating bucket: {} in volume: {}", bucket, volume);
+          long start = System.nanoTime();
           ozoneClient.createBucket(volume, bucket);
+          bucketCreationTime.getAndAdd(System.nanoTime() - start);
           numberOfBucketsCreated.getAndIncrement();
           for (int k = 0; k < totalKeys; k++) {
             String key = "key-" + k + "-" +
@@ -265,17 +302,24 @@ public final class Corona extends Configured implements Tool {
             try {
               LOG.trace("Adding key: {} in bucket: {} of volume: {}",
                   key, bucket, volume);
+              long keyCreateStart = System.nanoTime();
               OzoneOutputStream os = ozoneClient.createKey(
                   volume, bucket, key, value.length);
+              keyCreationTime.getAndAdd(System.nanoTime() - keyCreateStart);
+              long keyWriteStart = System.nanoTime();
               os.write(value);
               os.close();
+              keyWriteTime.getAndAdd(System.nanoTime() - keyWriteStart);
+              totalBytesWritten.getAndAdd(value.length);
               numberOfKeysAdded.getAndIncrement();
             } catch (Exception e) {
+              exception = true;
               LOG.error("Exception while adding key: {} in bucket: {}" +
                   " of volume: {}.", key, bucket, volume, e);
             }
           }
         } catch (Exception e) {
+          exception = true;
           LOG.error("Exception while creating bucket: {}" +
               " in volume: {}.", bucket, volume, e);
         }
@@ -287,11 +331,82 @@ public final class Corona extends Configured implements Tool {
    * Adds ShutdownHook to print statistics.
    */
   private void addShutdownHook() {
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      public void run() {
-        printStats(System.out);
+    Runtime.getRuntime().addShutdownHook(
+        new Thread(() -> printStats(System.out)));
+  }
+
+  private Thread getProgressBarThread() {
+    long maxValue = Integer.parseInt(numOfVolumes) *
+        Integer.parseInt(numOfBuckets) *
+        Integer.parseInt(numOfKeys);
+    Thread progressBarThread = new Thread(
+        new ProgressBar(System.out, maxValue));
+    progressBarThread.setName("ProgressBar");
+    return progressBarThread;
+  }
+
+  private class ProgressBar implements Runnable {
+
+    private final long refreshInterval = 1000L;
+
+    private PrintStream stream;
+    private long maxValue;
+
+    ProgressBar(PrintStream stream, long maxValue) {
+      this.stream = stream;
+      this.maxValue = maxValue;
+    }
+
+    @Override
+    public void run() {
+      try {
+        stream.println();
+        long keys;
+        while((keys = numberOfKeysAdded.get()) < maxValue) {
+          print(keys);
+          if(completed) {
+            break;
+          }
+          Thread.sleep(refreshInterval);
+        }
+        if(exception) {
+          stream.println();
+          stream.println("Incomplete termination, " +
+              "check log for exception.");
+        } else {
+          print(maxValue);
+        }
+        stream.println();
+      } catch (InterruptedException e) {
+      }
+    }
+
+    /**
+     * Given current value prints the progress bar.
+     *
+     * @param currentValue
+     */
+    private void print(long currentValue) {
+      stream.print('\r');
+      double percent = 100.0 * currentValue / maxValue;
+      StringBuilder sb = new StringBuilder();
+      sb.append(" " + String.format("%.2f", percent) + "% |");
+
+      for (int i = 0; i <= percent; i++) {
+        sb.append('█');
+      }
+      for (int j = 0; j < 100 - percent; j++) {
+        sb.append(' ');
       }
-    });
+      sb.append("|  ");
+      sb.append(currentValue + "/" + maxValue);
+      long timeInSec = TimeUnit.SECONDS.convert(
+          System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+      String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600,
+          (timeInSec % 3600) / 60, timeInSec % 60);
+      sb.append(" Time: " + timeToPrint);
+      stream.print(sb);
+    }
   }
 
   /**
@@ -300,15 +415,81 @@ public final class Corona extends Configured implements Tool {
    * @param out PrintStream
    */
   private void printStats(PrintStream out) {
-    long timeInSec = TimeUnit.SECONDS.convert(
-        System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-    String timeToPrint = timeInSec < 60 ? timeInSec + " seconds" :
-        TimeUnit.MINUTES.convert(timeInSec, TimeUnit.SECONDS) + " minuites";
+    int threadCount = Integer.parseInt(numOfThreads);
+
+    long endTime = System.nanoTime() - startTime;
+    String execTime = String.format("%02d:%02d:%02d",
+        TimeUnit.NANOSECONDS.toHours(endTime),
+        TimeUnit.NANOSECONDS.toMinutes(endTime) -
+            TimeUnit.HOURS.toMinutes(
+                TimeUnit.NANOSECONDS.toHours(endTime)),
+        TimeUnit.NANOSECONDS.toSeconds(endTime) -
+            TimeUnit.MINUTES.toSeconds(
+                TimeUnit.NANOSECONDS.toMinutes(endTime)));
+
+    long volumeTime = volumeCreationTime.longValue();
+    String prettyVolumeTime = String.format("%02d:%02d:%02d:%02d",
+        TimeUnit.NANOSECONDS.toHours(volumeTime),
+        TimeUnit.NANOSECONDS.toMinutes(volumeTime) -
+            TimeUnit.HOURS.toMinutes(
+                TimeUnit.NANOSECONDS.toHours(volumeTime)),
+        TimeUnit.NANOSECONDS.toSeconds(volumeTime) -
+            TimeUnit.MINUTES.toSeconds(
+                TimeUnit.NANOSECONDS.toMinutes(volumeTime)),
+        TimeUnit.NANOSECONDS.toMillis(volumeTime) -
+            TimeUnit.SECONDS.toMillis(
+                TimeUnit.NANOSECONDS.toSeconds(volumeTime)));
+
+    long bucketTime = bucketCreationTime.longValue() / threadCount;
+    String prettyBucketTime = String.format("%02d:%02d:%02d:%02d",
+        TimeUnit.NANOSECONDS.toHours(bucketTime),
+        TimeUnit.NANOSECONDS.toMinutes(bucketTime) -
+            TimeUnit.HOURS.toMinutes(
+                TimeUnit.NANOSECONDS.toHours(bucketTime)),
+        TimeUnit.NANOSECONDS.toSeconds(bucketTime) -
+            TimeUnit.MINUTES.toSeconds(
+                TimeUnit.NANOSECONDS.toMinutes(bucketTime)),
+        TimeUnit.NANOSECONDS.toMillis(bucketTime) -
+            TimeUnit.SECONDS.toMillis(
+                TimeUnit.NANOSECONDS.toSeconds(bucketTime)));
+
+    long totalKeyCreationTime = keyCreationTime.longValue() / threadCount;
+    String prettyKeyCreationTime = String.format("%02d:%02d:%02d:%02d",
+        TimeUnit.NANOSECONDS.toHours(totalKeyCreationTime),
+        TimeUnit.NANOSECONDS.toMinutes(totalKeyCreationTime) -
+            TimeUnit.HOURS.toMinutes(
+                TimeUnit.NANOSECONDS.toHours(totalKeyCreationTime)),
+        TimeUnit.NANOSECONDS.toSeconds(totalKeyCreationTime) -
+            TimeUnit.MINUTES.toSeconds(
+                TimeUnit.NANOSECONDS.toMinutes(totalKeyCreationTime)),
+        TimeUnit.NANOSECONDS.toMillis(totalKeyCreationTime) -
+            TimeUnit.SECONDS.toMillis(
+                TimeUnit.NANOSECONDS.toSeconds(totalKeyCreationTime)));
+
+    long totalKeyWriteTime = keyWriteTime.longValue() / threadCount;
+    String prettyKeyWriteTime = String.format("%02d:%02d:%02d:%02d",
+        TimeUnit.NANOSECONDS.toHours(totalKeyWriteTime),
+        TimeUnit.NANOSECONDS.toMinutes(totalKeyWriteTime) -
+            TimeUnit.HOURS.toMinutes(
+                TimeUnit.NANOSECONDS.toHours(totalKeyWriteTime)),
+        TimeUnit.NANOSECONDS.toSeconds(totalKeyWriteTime) -
+            TimeUnit.MINUTES.toSeconds(
+                TimeUnit.NANOSECONDS.toMinutes(totalKeyWriteTime)),
+        TimeUnit.NANOSECONDS.toMillis(totalKeyWriteTime) -
+            TimeUnit.SECONDS.toMillis(
+                TimeUnit.NANOSECONDS.toSeconds(totalKeyWriteTime)));
+
+    out.println();
     out.println("***************************************************");
     out.println("Number of Volumes created: " + numberOfVolumesCreated);
     out.println("Number of Buckets created: " + numberOfBucketsCreated);
     out.println("Number of Keys added: " + numberOfKeysAdded);
-    out.println("Execution time: " + timeToPrint);
+    out.println("Time spent in volume creation: " + prettyVolumeTime);
+    out.println("Time spent in bucket creation: " + prettyBucketTime);
+    out.println("Time spent in key creation: " + prettyKeyCreationTime);
+    out.println("Time spent in writing keys: " + prettyKeyWriteTime);
+    out.println("Total bytes written: " + totalBytesWritten);
+    out.println("Total Execution time: " + execTime);
     out.println("***************************************************");
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org