You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2021/01/22 14:28:37 UTC

[hadoop] branch branch-3.3 updated: HADOOP-17272. ABFS Streams to support IOStatistics API (#2604)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new d20b2de  HADOOP-17272. ABFS Streams to support IOStatistics API (#2604)
d20b2de is described below

commit d20b2deac33796ca3e294726bab806069e2fabc0
Author: Mehakmeet Singh <me...@gmail.com>
AuthorDate: Tue Jan 12 21:18:09 2021 +0530

    HADOOP-17272. ABFS Streams to support IOStatistics API (#2604)
    
    Contributed by Mehakmeet Singh.
    
    Change-Id: I3445dec84b9b9e43bb1e41f709944ea05416bd74
---
 .../hadoop/fs/statistics/StreamStatisticNames.java |  72 +++++++++
 .../fs/azurebfs/services/AbfsInputStream.java      |  23 ++-
 .../services/AbfsInputStreamStatistics.java        |  15 +-
 .../services/AbfsInputStreamStatisticsImpl.java    | 162 +++++++++++++--------
 .../fs/azurebfs/services/AbfsOutputStream.java     | 125 ++++++++++------
 .../services/AbfsOutputStreamStatistics.java       |  17 ++-
 .../services/AbfsOutputStreamStatisticsImpl.java   | 130 ++++++++++-------
 .../azurebfs/ITestAbfsInputStreamStatistics.java   |  43 +++++-
 .../azurebfs/ITestAbfsOutputStreamStatistics.java  |  31 ++++
 .../azurebfs/TestAbfsOutputStreamStatistics.java   |  27 +---
 10 files changed, 444 insertions(+), 201 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
index 02072d4..bbb8517 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -286,6 +286,78 @@ public final class StreamStatisticNames {
   public static final String STREAM_WRITE_TOTAL_DATA
       = "stream_write_total_data";
 
+  /**
+   * Number of bytes to upload from an OutputStream.
+   */
+  public static final String BYTES_TO_UPLOAD
+      = "bytes_upload";
+
+  /**
+   * Number of bytes uploaded successfully to the object store.
+   */
+  public static final String BYTES_UPLOAD_SUCCESSFUL
+      = "bytes_upload_successfully";
+
+  /**
+   * Number of bytes failed to upload to the object store.
+   */
+  public static final String BYTES_UPLOAD_FAILED
+      = "bytes_upload_failed";
+
+  /**
+   * Total time spent on waiting for a task to complete.
+   */
+  public static final String TIME_SPENT_ON_TASK_WAIT
+      = "time_spent_task_wait";
+
+  /**
+   * Number of task queue shrunk operations.
+   */
+  public static final String QUEUE_SHRUNK_OPS
+      = "queue_shrunk_ops";
+
+  /**
+   * Number of times current buffer is written to the service.
+   */
+  public static final String WRITE_CURRENT_BUFFER_OPERATIONS
+      = "write_current_buffer_ops";
+
+  /**
+   * Total time spent on completing a PUT request.
+   */
+  public static final String TIME_SPENT_ON_PUT_REQUEST
+      = "time_spent_on_put_request";
+
+  /**
+   * Number of seeks in buffer.
+   */
+  public static final String SEEK_IN_BUFFER
+      = "seek_in_buffer";
+
+  /**
+   * Number of bytes read from the buffer.
+   */
+  public static final String BYTES_READ_BUFFER
+      = "bytes_read_buffer";
+
+  /**
+   * Total number of remote read operations performed.
+   */
+  public static final String REMOTE_READ_OP
+      = "remote_read_op";
+
+  /**
+   * Total number of bytes read from readAhead.
+   */
+  public static final String READ_AHEAD_BYTES_READ
+      = "read_ahead_bytes_read";
+
+  /**
+   * Total number of bytes read from remote operations.
+   */
+  public static final String REMOTE_BYTES_READ
+      = "remote_bytes_read";
+
   private StreamStatisticNames() {
   }
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 1d109f4..c1de031 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -37,6 +37,11 @@ import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
 
 import static java.lang.Math.max;
 import static java.lang.Math.min;
@@ -48,7 +53,7 @@ import static org.apache.hadoop.util.StringUtils.toLowerCase;
  * The AbfsInputStream for AbfsClient.
  */
 public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
-        StreamCapabilities {
+        StreamCapabilities, IOStatisticsSource {
   private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
   //  Footer size is set to qualify for both ORC and parquet files
   public static final int FOOTER_SIZE = 16 * ONE_KB;
@@ -92,6 +97,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
   private long bytesFromRemoteRead; // bytes read remotely; for testing
 
   private final AbfsInputStreamContext context;
+  private IOStatistics ioStatistics;
 
   public AbfsInputStream(
           final AbfsClient client,
@@ -120,6 +126,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     // Propagate the config values to ReadBufferManager so that the first instance
     // to initialize can set the readAheadBlockSize
     ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize);
+    if (streamStatistics != null) {
+      ioStatistics = streamStatistics.getIOStatistics();
+    }
   }
 
   public String getPath() {
@@ -152,7 +161,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     int lastReadBytes;
     int totalReadBytes = 0;
     if (streamStatistics != null) {
-      streamStatistics.readOperationStarted(off, len);
+      streamStatistics.readOperationStarted();
     }
     incrementReadOps();
     do {
@@ -431,7 +440,10 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     AbfsPerfTracker tracker = client.getAbfsPerfTracker();
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
       LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
-      op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
+      op = IOStatisticsBinding.trackDuration((IOStatisticsStore) ioStatistics,
+          StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+          () -> client.read(path, position, b, offset, length,
+              tolerateOobAppends ? "*" : eTag, cachedSasToken.get()));
       cachedSasToken.update(op.getSasToken());
       if (streamStatistics != null) {
         streamStatistics.remoteReadOperation();
@@ -694,6 +706,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     return alwaysReadBufferSize;
   }
 
+  @Override
+  public IOStatistics getIOStatistics() {
+    return ioStatistics;
+  }
+
   /**
    * Get the statistics of the stream.
    * @return a string value.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
index c910a1f..0066346 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
@@ -19,12 +19,14 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 
 /**
  * Interface for statistics for the AbfsInputStream.
  */
 @InterfaceStability.Unstable
-public interface AbfsInputStreamStatistics {
+public interface AbfsInputStreamStatistics extends IOStatisticsSource {
   /**
    * Seek backwards, incrementing the seek and backward seek counters.
    *
@@ -73,11 +75,8 @@ public interface AbfsInputStreamStatistics {
 
   /**
    * A {@code read(byte[] buf, int off, int len)} operation has started.
-   *
-   * @param pos starting position of the read.
-   * @param len length of bytes to read.
    */
-  void readOperationStarted(long pos, long len);
+  void readOperationStarted();
 
   /**
    * Records a successful remote read operation.
@@ -97,6 +96,12 @@ public interface AbfsInputStreamStatistics {
   void remoteBytesRead(long bytes);
 
   /**
+   * Get the IOStatisticsStore instance from AbfsInputStreamStatistics.
+   * @return instance of IOStatisticsStore which extends IOStatistics.
+   */
+  IOStatistics getIOStatistics();
+
+  /**
    * Makes the string of all the AbfsInputStream statistics.
    * @return the string with all the statistics.
    */
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
index 12cc407..bd09762 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
@@ -18,23 +18,50 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+
 /**
  * Stats for the AbfsInputStream.
  */
 public class AbfsInputStreamStatisticsImpl
     implements AbfsInputStreamStatistics {
-  private long seekOperations;
-  private long forwardSeekOperations;
-  private long backwardSeekOperations;
-  private long bytesRead;
-  private long bytesSkippedOnSeek;
-  private long bytesBackwardsOnSeek;
-  private long seekInBuffer;
-  private long readOperations;
-  private long bytesReadFromBuffer;
-  private long remoteReadOperations;
-  private long readAheadBytesRead;
-  private long remoteBytesRead;
+
+  private final IOStatisticsStore ioStatisticsStore = iostatisticsStore()
+      .withCounters(
+          StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS,
+          StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS,
+          StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS,
+          StreamStatisticNames.STREAM_READ_BYTES,
+          StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED,
+          StreamStatisticNames.STREAM_READ_OPERATIONS,
+          StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS,
+          StreamStatisticNames.SEEK_IN_BUFFER,
+          StreamStatisticNames.BYTES_READ_BUFFER,
+          StreamStatisticNames.REMOTE_READ_OP,
+          StreamStatisticNames.READ_AHEAD_BYTES_READ,
+          StreamStatisticNames.REMOTE_BYTES_READ
+          )
+      .withDurationTracking(ACTION_HTTP_GET_REQUEST)
+      .build();
+
+  /* Reference to the atomic counter for frequently updated counters to avoid
+   * cost of the map lookup on every increment.
+   */
+  private final AtomicLong bytesRead =
+      ioStatisticsStore.getCounterReference(StreamStatisticNames.STREAM_READ_BYTES);
+  private final AtomicLong readOps =
+      ioStatisticsStore.getCounterReference(StreamStatisticNames.STREAM_READ_OPERATIONS);
+  private final AtomicLong seekOps =
+      ioStatisticsStore.getCounterReference(StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS);
 
   /**
    * Seek backwards, incrementing the seek and backward seek counters.
@@ -44,9 +71,9 @@ public class AbfsInputStreamStatisticsImpl
    */
   @Override
   public void seekBackwards(long negativeOffset) {
-    seekOperations++;
-    backwardSeekOperations++;
-    bytesBackwardsOnSeek -= negativeOffset;
+    seekOps.incrementAndGet();
+    ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS);
+    ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS, negativeOffset);
   }
 
   /**
@@ -58,11 +85,9 @@ public class AbfsInputStreamStatisticsImpl
    */
   @Override
   public void seekForwards(long skipped) {
-    seekOperations++;
-    forwardSeekOperations++;
-    if (skipped > 0) {
-      bytesSkippedOnSeek += skipped;
-    }
+    seekOps.incrementAndGet();
+    ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS);
+    ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED, skipped);
   }
 
   /**
@@ -90,9 +115,7 @@ public class AbfsInputStreamStatisticsImpl
    */
   @Override
   public void bytesRead(long bytes) {
-    if (bytes > 0) {
-      bytesRead += bytes;
-    }
+    bytesRead.addAndGet(bytes);
   }
 
   /**
@@ -104,9 +127,7 @@ public class AbfsInputStreamStatisticsImpl
    */
   @Override
   public void bytesReadFromBuffer(long bytes) {
-    if (bytes > 0) {
-      bytesReadFromBuffer += bytes;
-    }
+    ioStatisticsStore.incrementCounter(StreamStatisticNames.BYTES_READ_BUFFER, bytes);
   }
 
   /**
@@ -116,18 +137,15 @@ public class AbfsInputStreamStatisticsImpl
    */
   @Override
   public void seekInBuffer() {
-    seekInBuffer++;
+    ioStatisticsStore.incrementCounter(StreamStatisticNames.SEEK_IN_BUFFER);
   }
 
   /**
    * A {@code read(byte[] buf, int off, int len)} operation has started.
-   *
-   * @param pos starting position of the read.
-   * @param len length of bytes to read.
    */
   @Override
-  public void readOperationStarted(long pos, long len) {
-    readOperations++;
+  public void readOperationStarted() {
+    readOps.incrementAndGet();
   }
 
   /**
@@ -137,9 +155,7 @@ public class AbfsInputStreamStatisticsImpl
    */
   @Override
   public void readAheadBytesRead(long bytes) {
-    if (bytes > 0) {
-      readAheadBytesRead += bytes;
-    }
+    ioStatisticsStore.incrementCounter(StreamStatisticNames.READ_AHEAD_BYTES_READ, bytes);
   }
 
   /**
@@ -149,9 +165,7 @@ public class AbfsInputStreamStatisticsImpl
    */
   @Override
   public void remoteBytesRead(long bytes) {
-    if (bytes > 0) {
-      remoteBytesRead += bytes;
-    }
+    ioStatisticsStore.incrementCounter(StreamStatisticNames.REMOTE_BYTES_READ, bytes);
   }
 
   /**
@@ -161,55 +175,88 @@ public class AbfsInputStreamStatisticsImpl
    */
   @Override
   public void remoteReadOperation() {
-    remoteReadOperations++;
+    ioStatisticsStore.incrementCounter(StreamStatisticNames.REMOTE_READ_OP);
+  }
+
+  /**
+   * Getter for IOStatistics instance used.
+   * @return IOStatisticsStore instance which extends IOStatistics.
+   */
+  @Override
+  public IOStatistics getIOStatistics() {
+    return ioStatisticsStore;
   }
 
+  @VisibleForTesting
   public long getSeekOperations() {
-    return seekOperations;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS);
   }
 
+  @VisibleForTesting
   public long getForwardSeekOperations() {
-    return forwardSeekOperations;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS);
   }
 
+  @VisibleForTesting
   public long getBackwardSeekOperations() {
-    return backwardSeekOperations;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS);
   }
 
+  @VisibleForTesting
   public long getBytesRead() {
-    return bytesRead;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_BYTES);
   }
 
+  @VisibleForTesting
   public long getBytesSkippedOnSeek() {
-    return bytesSkippedOnSeek;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED);
   }
 
+  @VisibleForTesting
   public long getBytesBackwardsOnSeek() {
-    return bytesBackwardsOnSeek;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS);
   }
 
+  @VisibleForTesting
   public long getSeekInBuffer() {
-    return seekInBuffer;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.SEEK_IN_BUFFER);
+
   }
 
+  @VisibleForTesting
   public long getReadOperations() {
-    return readOperations;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_OPERATIONS);
   }
 
+  @VisibleForTesting
   public long getBytesReadFromBuffer() {
-    return bytesReadFromBuffer;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_READ_BUFFER);
   }
 
+  @VisibleForTesting
   public long getRemoteReadOperations() {
-    return remoteReadOperations;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.REMOTE_READ_OP);
   }
 
+  @VisibleForTesting
   public long getReadAheadBytesRead() {
-    return readAheadBytesRead;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.READ_AHEAD_BYTES_READ);
   }
 
+  @VisibleForTesting
   public long getRemoteBytesRead() {
-    return remoteBytesRead;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.REMOTE_BYTES_READ);
+  }
+
+  /**
+   * Getter for the mean value of the time taken to complete a HTTP GET
+   * request by AbfsInputStream.
+   * @return mean value.
+   */
+  @VisibleForTesting
+  public double getActionHttpGetRequest() {
+    return ioStatisticsStore.meanStatistics().
+        get(ACTION_HTTP_GET_REQUEST + SUFFIX_MEAN).mean();
   }
 
   /**
@@ -223,18 +270,7 @@ public class AbfsInputStreamStatisticsImpl
   public String toString() {
     final StringBuilder sb = new StringBuilder(
         "StreamStatistics{");
-    sb.append(", SeekOperations=").append(seekOperations);
-    sb.append(", ForwardSeekOperations=").append(forwardSeekOperations);
-    sb.append(", BackwardSeekOperations=").append(backwardSeekOperations);
-    sb.append(", BytesSkippedOnSeek=").append(bytesSkippedOnSeek);
-    sb.append(", BytesBackwardsOnSeek=").append(bytesBackwardsOnSeek);
-    sb.append(", seekInBuffer=").append(seekInBuffer);
-    sb.append(", BytesRead=").append(bytesRead);
-    sb.append(", ReadOperations=").append(readOperations);
-    sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer);
-    sb.append(", remoteReadOperations=").append(remoteReadOperations);
-    sb.append(", readAheadBytesRead=").append(readAheadBytesRead);
-    sb.append(", remoteBytesRead=").append(remoteBytesRead);
+    sb.append(ioStatisticsStore.toString());
     sb.append('}');
     return sb.toString();
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 402fdda..53bdfe9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -43,6 +42,12 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationExcep
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
 import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
 import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.FSExceptionMessages;
@@ -57,7 +62,8 @@ import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestPara
 /**
  * The BlobFsOutputStream for Rest AbfsClient.
  */
-public class AbfsOutputStream extends OutputStream implements Syncable, StreamCapabilities {
+public class AbfsOutputStream extends OutputStream implements Syncable,
+    StreamCapabilities, IOStatisticsSource {
 
   private final AbfsClient client;
   private final String path;
@@ -97,6 +103,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
 
   private final Statistics statistics;
   private final AbfsOutputStreamStatistics outputStreamStatistics;
+  private IOStatistics ioStatistics;
 
   private static final Logger LOG =
       LoggerFactory.getLogger(AbfsOutputStream.class);
@@ -144,6 +151,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
     this.cachedSasToken = new CachedSASToken(
         abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
+    if (outputStreamStatistics != null) {
+      this.ioStatistics = outputStreamStatistics.getIOStatistics();
+    }
   }
 
   /**
@@ -354,11 +364,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     if (bufferIndex == 0) {
       return;
     }
-    outputStreamStatistics.writeCurrentBuffer();
-
     final byte[] bytes = buffer;
     final int bytesLength = bufferIndex;
-    outputStreamStatistics.bytesToUpload(bytesLength);
+    if (outputStreamStatistics != null) {
+      outputStreamStatistics.writeCurrentBuffer();
+      outputStreamStatistics.bytesToUpload(bytesLength);
+    }
     buffer = byteBufferPool.getBuffer(false, bufferSize).array();
     bufferIndex = 0;
     final long offset = position;
@@ -370,7 +381,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
           bytesLength, APPEND_MODE, true);
       AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get());
       cachedSasToken.update(op.getSasToken());
-      outputStreamStatistics.uploadSuccessful(bytesLength);
+      if (outputStreamStatistics != null) {
+        outputStreamStatistics.uploadSuccessful(bytesLength);
+      }
       perfInfo.registerResult(op.getResult());
       byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
       perfInfo.registerSuccess(true);
@@ -402,55 +415,63 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     if (bufferIndex == 0) {
       return;
     }
-    outputStreamStatistics.writeCurrentBuffer();
     numOfAppendsToServerSinceLastFlush++;
 
     final byte[] bytes = buffer;
     final int bytesLength = bufferIndex;
-    outputStreamStatistics.bytesToUpload(bytesLength);
+    if (outputStreamStatistics != null) {
+      outputStreamStatistics.writeCurrentBuffer();
+      outputStreamStatistics.bytesToUpload(bytesLength);
+    }
     buffer = byteBufferPool.getBuffer(false, bufferSize).array();
     bufferIndex = 0;
     final long offset = position;
     position += bytesLength;
 
     if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
-      long start = System.currentTimeMillis();
-      waitForTaskToComplete();
-      outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis());
-    }
-
-    final Future<Void> job = completionService.submit(new Callable<Void>() {
-      @Override
-      public Void call() throws Exception {
-        AbfsPerfTracker tracker = client.getAbfsPerfTracker();
-        try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
-                "writeCurrentBufferToService", "append")) {
-          AppendRequestParameters.Mode
-              mode = APPEND_MODE;
-          if (isFlush & isClose) {
-            mode = FLUSH_CLOSE_MODE;
-          } else if (isFlush) {
-            mode = FLUSH_MODE;
-          }
-
-          AppendRequestParameters reqParams = new AppendRequestParameters(
-              offset, 0, bytesLength, mode, false);
-          AbfsRestOperation op = client.append(path, bytes, reqParams,
-              cachedSasToken.get());
-
-          cachedSasToken.update(op.getSasToken());
-          perfInfo.registerResult(op.getResult());
-          byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
-          perfInfo.registerSuccess(true);
-          return null;
+      //Tracking time spent on waiting for task to complete.
+      if (outputStreamStatistics != null) {
+        try (DurationTracker ignored = outputStreamStatistics.timeSpentTaskWait()) {
+          waitForTaskToComplete();
         }
+      } else {
+        waitForTaskToComplete();
+      }
+    }
+    final Future<Void> job =
+        completionService.submit(IOStatisticsBinding
+            .trackDurationOfCallable((IOStatisticsStore) ioStatistics,
+                StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST,
+                () -> {
+                  AbfsPerfTracker tracker = client.getAbfsPerfTracker();
+                  try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
+                      "writeCurrentBufferToService", "append")) {
+                    AppendRequestParameters.Mode
+                        mode = APPEND_MODE;
+                    if (isFlush & isClose) {
+                      mode = FLUSH_CLOSE_MODE;
+                    } else if (isFlush) {
+                      mode = FLUSH_MODE;
+                    }
+                    AppendRequestParameters reqParams = new AppendRequestParameters(
+                        offset, 0, bytesLength, mode, false);
+                    AbfsRestOperation op = client.append(path, bytes, reqParams,
+                        cachedSasToken.get());
+                    cachedSasToken.update(op.getSasToken());
+                    perfInfo.registerResult(op.getResult());
+                    byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
+                    perfInfo.registerSuccess(true);
+                    return null;
+                  }
+                })
+        );
+
+    if (outputStreamStatistics != null) {
+      if (job.isCancelled()) {
+        outputStreamStatistics.uploadFailed(bytesLength);
+      } else {
+        outputStreamStatistics.uploadSuccessful(bytesLength);
       }
-    });
-
-    if (job.isCancelled()) {
-      outputStreamStatistics.uploadFailed(bytesLength);
-    } else {
-      outputStreamStatistics.uploadSuccessful(bytesLength);
     }
     writeOperations.add(new WriteOperation(job, offset, bytesLength));
 
@@ -527,7 +548,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
         lastTotalAppendOffset += writeOperations.peek().length;
         writeOperations.remove();
         // Incrementing statistics to indicate queue has been shrunk.
-        outputStreamStatistics.queueShrunk();
+        if (outputStreamStatistics != null) {
+          outputStreamStatistics.queueShrunk();
+        }
       }
     } catch (Exception e) {
       if (e.getCause() instanceof AzureBlobFileSystemException) {
@@ -615,6 +638,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     return isAppendBlob;
   }
 
+  @Override
+  public IOStatistics getIOStatistics() {
+    return ioStatistics;
+  }
+
   /**
    * Appending AbfsOutputStream statistics to base toString().
    *
@@ -623,9 +651,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(super.toString());
-    sb.append("AbfsOuputStream@").append(this.hashCode()).append("){");
-    sb.append(outputStreamStatistics.toString());
-    sb.append("}");
+    if (outputStreamStatistics != null) {
+      sb.append("AbfsOutputStream@").append(this.hashCode());
+      sb.append("){");
+      sb.append(outputStreamStatistics.toString());
+      sb.append("}");
+    }
     return sb.toString();
   }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java
index c9fe0dd..c57d5d9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java
@@ -19,12 +19,15 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 
 /**
  * Interface for {@link AbfsOutputStream} statistics.
  */
 @InterfaceStability.Unstable
-public interface AbfsOutputStreamStatistics {
+public interface AbfsOutputStreamStatistics extends IOStatisticsSource {
 
   /**
    * Number of bytes to be uploaded.
@@ -49,11 +52,9 @@ public interface AbfsOutputStreamStatistics {
 
   /**
    * Time spent in waiting for tasks to be completed in the blocking queue.
-   *
-   * @param start millisecond at which the wait for task to be complete begins.
-   * @param end   millisecond at which the wait is completed for the task.
+   * @return instance of the DurationTracker that tracks the time for waiting.
    */
-  void timeSpentTaskWait(long start, long end);
+  DurationTracker timeSpentTaskWait();
 
   /**
    * Number of times task queue is shrunk.
@@ -66,6 +67,12 @@ public interface AbfsOutputStreamStatistics {
   void writeCurrentBuffer();
 
   /**
+   * Get the IOStatisticsStore instance from AbfsOutputStreamStatistics.
+   * @return instance of IOStatisticsStore which extends IOStatistics.
+   */
+  IOStatistics getIOStatistics();
+
+  /**
    * Method to form a string of all AbfsOutputStream statistics and their
    * values.
    *
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java
index cd5a29e..b07cf28 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java
@@ -18,32 +18,47 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+
 /**
  * OutputStream statistics implementation for Abfs.
  */
 public class AbfsOutputStreamStatisticsImpl
     implements AbfsOutputStreamStatistics {
-  private long bytesToUpload;
-  private long bytesUploadSuccessful;
-  private long bytesUploadFailed;
-  /**
-   * Counter to get the total time spent while waiting for tasks to complete
-   * in the blocking queue inside the thread executor.
-   */
-  private long timeSpentOnTaskWait;
-  /**
-   * Counter to get the total number of queue shrink operations done {@code
-   * AbfsOutputStream#shrinkWriteOperationQueue()} by AbfsOutputStream to
-   * remove the write operations which were successfully done by
-   * AbfsOutputStream from the task queue.
-   */
-  private long queueShrunkOps;
-  /**
-   * Counter to get the total number of times the current buffer is written
-   * to the service {@code AbfsOutputStream#writeCurrentBufferToService()} via
-   * AbfsClient and appended to the data store by AbfsRestOperation.
+
+  private final IOStatisticsStore ioStatisticsStore = iostatisticsStore()
+      .withCounters(
+          StreamStatisticNames.BYTES_TO_UPLOAD,
+          StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL,
+          StreamStatisticNames.BYTES_UPLOAD_FAILED,
+          StreamStatisticNames.QUEUE_SHRUNK_OPS,
+          StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS
+      )
+      .withDurationTracking(
+          StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST,
+          StreamStatisticNames.TIME_SPENT_ON_TASK_WAIT
+      )
+      .build();
+
+  /* Reference to the atomic counter for frequently updated counters to avoid
+   * cost of the map lookup on every increment.
    */
-  private long writeCurrentBufferOperations;
+  private final AtomicLong bytesUpload =
+      ioStatisticsStore.getCounterReference(StreamStatisticNames.BYTES_TO_UPLOAD);
+  private final AtomicLong bytesUploadedSuccessfully =
+      ioStatisticsStore.getCounterReference(StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL);
+  private final AtomicLong writeCurrentBufferOps =
+      ioStatisticsStore.getCounterReference(StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS);
 
   /**
    * Records the need to upload bytes and increments the total bytes that
@@ -53,9 +68,7 @@ public class AbfsOutputStreamStatisticsImpl
    */
   @Override
   public void bytesToUpload(long bytes) {
-    if (bytes > 0) {
-      bytesToUpload += bytes;
-    }
+    bytesUpload.addAndGet(bytes);
   }
 
   /**
@@ -66,9 +79,7 @@ public class AbfsOutputStreamStatisticsImpl
    */
   @Override
   public void uploadSuccessful(long bytes) {
-    if (bytes > 0) {
-      bytesUploadSuccessful += bytes;
-    }
+    bytesUploadedSuccessfully.addAndGet(bytes);
   }
 
   /**
@@ -78,9 +89,7 @@ public class AbfsOutputStreamStatisticsImpl
    */
   @Override
   public void uploadFailed(long bytes) {
-    if (bytes > 0) {
-      bytesUploadFailed += bytes;
-    }
+    ioStatisticsStore.incrementCounter(StreamStatisticNames.BYTES_UPLOAD_FAILED, bytes);
   }
 
   /**
@@ -96,14 +105,10 @@ public class AbfsOutputStreamStatisticsImpl
    * This time spent while waiting for the task to be completed is being
    * recorded in this counter.
    *
-   * @param startTime time(in milliseconds) before the wait for task to be
-   *                  completed is begin.
-   * @param endTime   time(in milliseconds) after the wait for the task to be
-   *                  completed is done.
    */
   @Override
-  public void timeSpentTaskWait(long startTime, long endTime) {
-    timeSpentOnTaskWait += endTime - startTime;
+  public DurationTracker timeSpentTaskWait() {
+    return ioStatisticsStore.trackDuration(StreamStatisticNames.TIME_SPENT_ON_TASK_WAIT);
   }
 
   /**
@@ -114,7 +119,7 @@ public class AbfsOutputStreamStatisticsImpl
    */
   @Override
   public void queueShrunk() {
-    queueShrunkOps++;
+    ioStatisticsStore.incrementCounter(StreamStatisticNames.QUEUE_SHRUNK_OPS);
   }
 
   /**
@@ -125,31 +130,59 @@ public class AbfsOutputStreamStatisticsImpl
    */
   @Override
   public void writeCurrentBuffer() {
-    writeCurrentBufferOperations++;
+    writeCurrentBufferOps.incrementAndGet();
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * A getter for IOStatisticsStore instance which extends IOStatistics.
+   *
+   * @return IOStatisticsStore instance.
+   */
+  @Override
+  public IOStatistics getIOStatistics() {
+    return ioStatisticsStore;
+  }
+
+  @VisibleForTesting
   public long getBytesToUpload() {
-    return bytesToUpload;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_TO_UPLOAD);
   }
 
+  @VisibleForTesting
   public long getBytesUploadSuccessful() {
-    return bytesUploadSuccessful;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL);
   }
 
+  @VisibleForTesting
   public long getBytesUploadFailed() {
-    return bytesUploadFailed;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_UPLOAD_FAILED);
   }
 
+  @VisibleForTesting
   public long getTimeSpentOnTaskWait() {
-    return timeSpentOnTaskWait;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.TIME_SPENT_ON_TASK_WAIT);
   }
 
+  @VisibleForTesting
   public long getQueueShrunkOps() {
-    return queueShrunkOps;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.QUEUE_SHRUNK_OPS);
   }
 
+  @VisibleForTesting
   public long getWriteCurrentBufferOperations() {
-    return writeCurrentBufferOperations;
+    return ioStatisticsStore.counters().get(StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS);
+  }
+
+  /**
+   * Getter for mean value of time taken to complete a PUT request by
+   * AbfsOutputStream.
+   * @return mean value.
+   */
+  @VisibleForTesting
+  public double getTimeSpentOnPutRequest() {
+    return ioStatisticsStore.meanStatistics().get(StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST + StoreStatisticNames.SUFFIX_MEAN).mean();
   }
 
   /**
@@ -160,16 +193,7 @@ public class AbfsOutputStreamStatisticsImpl
   @Override public String toString() {
     final StringBuilder outputStreamStats = new StringBuilder(
         "OutputStream Statistics{");
-    outputStreamStats.append(", bytes_upload=").append(bytesToUpload);
-    outputStreamStats.append(", bytes_upload_successfully=")
-        .append(bytesUploadSuccessful);
-    outputStreamStats.append(", bytes_upload_failed=")
-        .append(bytesUploadFailed);
-    outputStreamStats.append(", time_spent_task_wait=")
-        .append(timeSpentOnTaskWait);
-    outputStreamStats.append(", queue_shrunk_ops=").append(queueShrunkOps);
-    outputStreamStats.append(", write_current_buffer_ops=")
-        .append(writeCurrentBufferOperations);
+    outputStreamStats.append(ioStatisticsStore.toString());
     outputStreamStats.append("}");
     return outputStreamStats.toString();
   }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
index 52dfdf2..a33a76e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
@@ -139,10 +139,9 @@ public class ITestAbfsInputStreamStatistics
        * forwardSeekOps - Since we are doing a forward seek inside a loop
        * for OPERATION times, total forward seeks would be OPERATIONS.
        *
-       * bytesBackwardsOnSeek - Since we are doing backward seeks from end of
-       * file in a ONE_MB file each time, this would mean the bytes from
-       * backward seek would be OPERATIONS * ONE_MB. Since this is backward
-       * seek this value is expected be to be negative.
+       * negativeBytesBackwardsOnSeek - Since we are doing backward seeks from
+       * end of file in a ONE_MB file each time, this would mean the bytes from
+       * backward seek would be OPERATIONS * ONE_MB.
        *
        * bytesSkippedOnSeek - Since, we move from start to end in seek, but
        * our fCursor(position of cursor) always remain at end of file, this
@@ -160,7 +159,7 @@ public class ITestAbfsInputStreamStatistics
       assertEquals("Mismatch in forwardSeekOps value", OPERATIONS,
           stats.getForwardSeekOperations());
       assertEquals("Mismatch in bytesBackwardsOnSeek value",
-          -1 * OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek());
+          OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek());
       assertEquals("Mismatch in bytesSkippedOnSeek value",
           0, stats.getBytesSkippedOnSeek());
       assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS,
@@ -367,6 +366,40 @@ public class ITestAbfsInputStreamStatistics
   }
 
   /**
+   * Testing time taken by AbfsInputStream to complete a GET request.
+   */
+  @Test
+  public void testActionHttpGetRequest() throws IOException {
+    describe("Test to check the correct value of Time taken by http get "
+        + "request in AbfsInputStream");
+    AzureBlobFileSystem fs = getFileSystem();
+    AzureBlobFileSystemStore abfss = fs.getAbfsStore();
+    Path actionHttpGetRequestPath = path(getMethodName());
+    AbfsInputStream abfsInputStream = null;
+    AbfsOutputStream abfsOutputStream = null;
+    try {
+      abfsOutputStream = createAbfsOutputStreamWithFlushEnabled(fs,
+          actionHttpGetRequestPath);
+      abfsOutputStream.write('a');
+      abfsOutputStream.hflush();
+
+      abfsInputStream =
+          abfss.openFileForRead(actionHttpGetRequestPath, fs.getFsStatistics());
+      abfsInputStream.read();
+      AbfsInputStreamStatisticsImpl abfsInputStreamStatistics =
+          (AbfsInputStreamStatisticsImpl) abfsInputStream.getStreamStatistics();
+
+      LOG.info("AbfsInputStreamStats info: {}", abfsInputStreamStatistics.toString());
+      Assertions.assertThat(
+          abfsInputStreamStatistics.getActionHttpGetRequest())
+          .describedAs("Mismatch in time taken by a GET request")
+          .isGreaterThan(0.0);
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, abfsInputStream, abfsOutputStream);
+    }
+  }
+
+  /**
    * Method to assert the initial values of the statistics.
    *
    * @param actualValue the actual value of the statistics.
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java
index c8640dd..392e80a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java
@@ -20,7 +20,10 @@ package org.apache.hadoop.fs.azurebfs;
 
 import java.io.IOException;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
@@ -31,7 +34,10 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl;
  */
 public class ITestAbfsOutputStreamStatistics
     extends AbstractAbfsIntegrationTest {
+
   private static final int OPERATIONS = 10;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestAbfsOutputStreamStatistics.class);
 
   public ITestAbfsOutputStreamStatistics() throws Exception {
   }
@@ -220,6 +226,31 @@ public class ITestAbfsOutputStreamStatistics
   }
 
   /**
+   * Test to check correct value of time spent on a PUT request in
+   * AbfsOutputStream.
+   */
+  @Test
+  public void testAbfsOutputStreamDurationTrackerPutRequest() throws IOException {
+    describe("Testing to check if DurationTracker for PUT request is working "
+        + "correctly.");
+    AzureBlobFileSystem fs = getFileSystem();
+    Path pathForPutRequest = path(getMethodName());
+
+    try(AbfsOutputStream outputStream =
+        createAbfsOutputStreamWithFlushEnabled(fs, pathForPutRequest)) {
+      outputStream.write('a');
+      outputStream.hflush();
+
+      AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics =
+          getAbfsOutputStreamStatistics(outputStream);
+      LOG.info("AbfsOutputStreamStats info: {}", abfsOutputStreamStatistics.toString());
+      Assertions.assertThat(abfsOutputStreamStatistics.getTimeSpentOnPutRequest())
+          .describedAs("Mismatch in timeSpentOnPutRequest DurationTracker")
+          .isGreaterThan(0.0);
+    }
+  }
+
+  /**
    * Method to get the AbfsOutputStream statistics.
    *
    * @param out AbfsOutputStream whose statistics is needed.
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java
index 58f0023..5f94043 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java
@@ -94,17 +94,11 @@ public class TestAbfsOutputStreamStatistics
     assertEquals("Mismatch in time spent on waiting for tasks to complete", 0,
         abfsOutputStreamStatistics.getTimeSpentOnTaskWait());
 
-    int smallRandomStartTime =
-        new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE);
-    int smallRandomEndTime =
-        new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE)
-            + smallRandomStartTime;
-    int smallDiff = smallRandomEndTime - smallRandomStartTime;
     abfsOutputStreamStatistics
-        .timeSpentTaskWait(smallRandomStartTime, smallRandomEndTime);
-    //Test for small random value of timeSpentWaitTask.
+        .timeSpentTaskWait();
+    //Test for one op call value of timeSpentWaitTask.
     assertEquals("Mismatch in time spent on waiting for tasks to complete",
-        smallDiff, abfsOutputStreamStatistics.getTimeSpentOnTaskWait());
+        1, abfsOutputStreamStatistics.getTimeSpentOnTaskWait());
 
     //Reset statistics for the next test.
     abfsOutputStreamStatistics = new AbfsOutputStreamStatisticsImpl();
@@ -113,23 +107,16 @@ public class TestAbfsOutputStreamStatistics
      * Entering multiple values for timeSpentTaskWait() to check the
      * summation is happening correctly. Also calculating the expected result.
      */
-    int expectedRandomDiff = 0;
     for (int i = 0; i < OPERATIONS; i++) {
-      int largeRandomStartTime =
-          new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE);
-      int largeRandomEndTime = new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE)
-          + largeRandomStartTime;
-      abfsOutputStreamStatistics
-          .timeSpentTaskWait(largeRandomStartTime, largeRandomEndTime);
-      expectedRandomDiff += largeRandomEndTime - largeRandomStartTime;
+       abfsOutputStreamStatistics.timeSpentTaskWait();
     }
 
     /*
-     * Test to check correct value of timeSpentTaskWait after multiple
-     * random values are passed in it.
+     * Test to check correct value of timeSpentTaskWait after OPERATIONS
+     * number of op calls.
      */
     assertEquals("Mismatch in time spent on waiting for tasks to complete",
-        expectedRandomDiff,
+        OPERATIONS,
         abfsOutputStreamStatistics.getTimeSpentOnTaskWait());
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org