You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2021/01/22 14:28:37 UTC
[hadoop] branch branch-3.3 updated: HADOOP-17272. ABFS Streams to
support IOStatistics API (#2604)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new d20b2de HADOOP-17272. ABFS Streams to support IOStatistics API (#2604)
d20b2de is described below
commit d20b2deac33796ca3e294726bab806069e2fabc0
Author: Mehakmeet Singh <me...@gmail.com>
AuthorDate: Tue Jan 12 21:18:09 2021 +0530
HADOOP-17272. ABFS Streams to support IOStatistics API (#2604)
Contributed by Mehakmeet Singh.
Change-Id: I3445dec84b9b9e43bb1e41f709944ea05416bd74
---
.../hadoop/fs/statistics/StreamStatisticNames.java | 72 +++++++++
.../fs/azurebfs/services/AbfsInputStream.java | 23 ++-
.../services/AbfsInputStreamStatistics.java | 15 +-
.../services/AbfsInputStreamStatisticsImpl.java | 162 +++++++++++++--------
.../fs/azurebfs/services/AbfsOutputStream.java | 125 ++++++++++------
.../services/AbfsOutputStreamStatistics.java | 17 ++-
.../services/AbfsOutputStreamStatisticsImpl.java | 130 ++++++++++-------
.../azurebfs/ITestAbfsInputStreamStatistics.java | 43 +++++-
.../azurebfs/ITestAbfsOutputStreamStatistics.java | 31 ++++
.../azurebfs/TestAbfsOutputStreamStatistics.java | 27 +---
10 files changed, 444 insertions(+), 201 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
index 02072d4..bbb8517 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -286,6 +286,78 @@ public final class StreamStatisticNames {
public static final String STREAM_WRITE_TOTAL_DATA
= "stream_write_total_data";
+ /**
+ * Number of bytes to upload from an OutputStream.
+ */
+ public static final String BYTES_TO_UPLOAD
+ = "bytes_upload";
+
+ /**
+ * Number of bytes uploaded successfully to the object store.
+ */
+ public static final String BYTES_UPLOAD_SUCCESSFUL
+ = "bytes_upload_successfully";
+
+ /**
+ * Number of bytes failed to upload to the object store.
+ */
+ public static final String BYTES_UPLOAD_FAILED
+ = "bytes_upload_failed";
+
+ /**
+ * Total time spent on waiting for a task to complete.
+ */
+ public static final String TIME_SPENT_ON_TASK_WAIT
+ = "time_spent_task_wait";
+
+ /**
+ * Number of task queue shrunk operations.
+ */
+ public static final String QUEUE_SHRUNK_OPS
+ = "queue_shrunk_ops";
+
+ /**
+ * Number of times current buffer is written to the service.
+ */
+ public static final String WRITE_CURRENT_BUFFER_OPERATIONS
+ = "write_current_buffer_ops";
+
+ /**
+ * Total time spent on completing a PUT request.
+ */
+ public static final String TIME_SPENT_ON_PUT_REQUEST
+ = "time_spent_on_put_request";
+
+ /**
+ * Number of seeks in buffer.
+ */
+ public static final String SEEK_IN_BUFFER
+ = "seek_in_buffer";
+
+ /**
+ * Number of bytes read from the buffer.
+ */
+ public static final String BYTES_READ_BUFFER
+ = "bytes_read_buffer";
+
+ /**
+ * Total number of remote read operations performed.
+ */
+ public static final String REMOTE_READ_OP
+ = "remote_read_op";
+
+ /**
+ * Total number of bytes read from readAhead.
+ */
+ public static final String READ_AHEAD_BYTES_READ
+ = "read_ahead_bytes_read";
+
+ /**
+ * Total number of bytes read from remote operations.
+ */
+ public static final String REMOTE_BYTES_READ
+ = "remote_bytes_read";
+
private StreamStatisticNames() {
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 1d109f4..c1de031 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -37,6 +37,11 @@ import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import static java.lang.Math.max;
import static java.lang.Math.min;
@@ -48,7 +53,7 @@ import static org.apache.hadoop.util.StringUtils.toLowerCase;
* The AbfsInputStream for AbfsClient.
*/
public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
- StreamCapabilities {
+ StreamCapabilities, IOStatisticsSource {
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
// Footer size is set to qualify for both ORC and parquet files
public static final int FOOTER_SIZE = 16 * ONE_KB;
@@ -92,6 +97,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private long bytesFromRemoteRead; // bytes read remotely; for testing
private final AbfsInputStreamContext context;
+ private IOStatistics ioStatistics;
public AbfsInputStream(
final AbfsClient client,
@@ -120,6 +126,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
// Propagate the config values to ReadBufferManager so that the first instance
// to initialize can set the readAheadBlockSize
ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize);
+ if (streamStatistics != null) {
+ ioStatistics = streamStatistics.getIOStatistics();
+ }
}
public String getPath() {
@@ -152,7 +161,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
int lastReadBytes;
int totalReadBytes = 0;
if (streamStatistics != null) {
- streamStatistics.readOperationStarted(off, len);
+ streamStatistics.readOperationStarted();
}
incrementReadOps();
do {
@@ -431,7 +440,10 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
- op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
+ op = IOStatisticsBinding.trackDuration((IOStatisticsStore) ioStatistics,
+ StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+ () -> client.read(path, position, b, offset, length,
+ tolerateOobAppends ? "*" : eTag, cachedSasToken.get()));
cachedSasToken.update(op.getSasToken());
if (streamStatistics != null) {
streamStatistics.remoteReadOperation();
@@ -694,6 +706,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
return alwaysReadBufferSize;
}
+ @Override
+ public IOStatistics getIOStatistics() {
+ return ioStatistics;
+ }
+
/**
* Get the statistics of the stream.
* @return a string value.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
index c910a1f..0066346 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
@@ -19,12 +19,14 @@
package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
/**
* Interface for statistics for the AbfsInputStream.
*/
@InterfaceStability.Unstable
-public interface AbfsInputStreamStatistics {
+public interface AbfsInputStreamStatistics extends IOStatisticsSource {
/**
* Seek backwards, incrementing the seek and backward seek counters.
*
@@ -73,11 +75,8 @@ public interface AbfsInputStreamStatistics {
/**
* A {@code read(byte[] buf, int off, int len)} operation has started.
- *
- * @param pos starting position of the read.
- * @param len length of bytes to read.
*/
- void readOperationStarted(long pos, long len);
+ void readOperationStarted();
/**
* Records a successful remote read operation.
@@ -97,6 +96,12 @@ public interface AbfsInputStreamStatistics {
void remoteBytesRead(long bytes);
/**
+ * Get the IOStatisticsStore instance from AbfsInputStreamStatistics.
+ * @return instance of IOStatisticsStore which extends IOStatistics.
+ */
+ IOStatistics getIOStatistics();
+
+ /**
* Makes the string of all the AbfsInputStream statistics.
* @return the string with all the statistics.
*/
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
index 12cc407..bd09762 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
@@ -18,23 +18,50 @@
package org.apache.hadoop.fs.azurebfs.services;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+
/**
* Stats for the AbfsInputStream.
*/
public class AbfsInputStreamStatisticsImpl
implements AbfsInputStreamStatistics {
- private long seekOperations;
- private long forwardSeekOperations;
- private long backwardSeekOperations;
- private long bytesRead;
- private long bytesSkippedOnSeek;
- private long bytesBackwardsOnSeek;
- private long seekInBuffer;
- private long readOperations;
- private long bytesReadFromBuffer;
- private long remoteReadOperations;
- private long readAheadBytesRead;
- private long remoteBytesRead;
+
+ private final IOStatisticsStore ioStatisticsStore = iostatisticsStore()
+ .withCounters(
+ StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS,
+ StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS,
+ StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS,
+ StreamStatisticNames.STREAM_READ_BYTES,
+ StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED,
+ StreamStatisticNames.STREAM_READ_OPERATIONS,
+ StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS,
+ StreamStatisticNames.SEEK_IN_BUFFER,
+ StreamStatisticNames.BYTES_READ_BUFFER,
+ StreamStatisticNames.REMOTE_READ_OP,
+ StreamStatisticNames.READ_AHEAD_BYTES_READ,
+ StreamStatisticNames.REMOTE_BYTES_READ
+ )
+ .withDurationTracking(ACTION_HTTP_GET_REQUEST)
+ .build();
+
+ /* Reference to the atomic counter for frequently updated counters to avoid
+ * cost of the map lookup on every increment.
+ */
+ private final AtomicLong bytesRead =
+ ioStatisticsStore.getCounterReference(StreamStatisticNames.STREAM_READ_BYTES);
+ private final AtomicLong readOps =
+ ioStatisticsStore.getCounterReference(StreamStatisticNames.STREAM_READ_OPERATIONS);
+ private final AtomicLong seekOps =
+ ioStatisticsStore.getCounterReference(StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS);
/**
* Seek backwards, incrementing the seek and backward seek counters.
@@ -44,9 +71,9 @@ public class AbfsInputStreamStatisticsImpl
*/
@Override
public void seekBackwards(long negativeOffset) {
- seekOperations++;
- backwardSeekOperations++;
- bytesBackwardsOnSeek -= negativeOffset;
+ seekOps.incrementAndGet();
+ ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS);
+ ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS, negativeOffset);
}
/**
@@ -58,11 +85,9 @@ public class AbfsInputStreamStatisticsImpl
*/
@Override
public void seekForwards(long skipped) {
- seekOperations++;
- forwardSeekOperations++;
- if (skipped > 0) {
- bytesSkippedOnSeek += skipped;
- }
+ seekOps.incrementAndGet();
+ ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS);
+ ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED, skipped);
}
/**
@@ -90,9 +115,7 @@ public class AbfsInputStreamStatisticsImpl
*/
@Override
public void bytesRead(long bytes) {
- if (bytes > 0) {
- bytesRead += bytes;
- }
+ bytesRead.addAndGet(bytes);
}
/**
@@ -104,9 +127,7 @@ public class AbfsInputStreamStatisticsImpl
*/
@Override
public void bytesReadFromBuffer(long bytes) {
- if (bytes > 0) {
- bytesReadFromBuffer += bytes;
- }
+ ioStatisticsStore.incrementCounter(StreamStatisticNames.BYTES_READ_BUFFER, bytes);
}
/**
@@ -116,18 +137,15 @@ public class AbfsInputStreamStatisticsImpl
*/
@Override
public void seekInBuffer() {
- seekInBuffer++;
+ ioStatisticsStore.incrementCounter(StreamStatisticNames.SEEK_IN_BUFFER);
}
/**
* A {@code read(byte[] buf, int off, int len)} operation has started.
- *
- * @param pos starting position of the read.
- * @param len length of bytes to read.
*/
@Override
- public void readOperationStarted(long pos, long len) {
- readOperations++;
+ public void readOperationStarted() {
+ readOps.incrementAndGet();
}
/**
@@ -137,9 +155,7 @@ public class AbfsInputStreamStatisticsImpl
*/
@Override
public void readAheadBytesRead(long bytes) {
- if (bytes > 0) {
- readAheadBytesRead += bytes;
- }
+ ioStatisticsStore.incrementCounter(StreamStatisticNames.READ_AHEAD_BYTES_READ, bytes);
}
/**
@@ -149,9 +165,7 @@ public class AbfsInputStreamStatisticsImpl
*/
@Override
public void remoteBytesRead(long bytes) {
- if (bytes > 0) {
- remoteBytesRead += bytes;
- }
+ ioStatisticsStore.incrementCounter(StreamStatisticNames.REMOTE_BYTES_READ, bytes);
}
/**
@@ -161,55 +175,88 @@ public class AbfsInputStreamStatisticsImpl
*/
@Override
public void remoteReadOperation() {
- remoteReadOperations++;
+ ioStatisticsStore.incrementCounter(StreamStatisticNames.REMOTE_READ_OP);
+ }
+
+ /**
+ * Getter for IOStatistics instance used.
+ * @return IOStatisticsStore instance which extends IOStatistics.
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return ioStatisticsStore;
}
+ @VisibleForTesting
public long getSeekOperations() {
- return seekOperations;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS);
}
+ @VisibleForTesting
public long getForwardSeekOperations() {
- return forwardSeekOperations;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS);
}
+ @VisibleForTesting
public long getBackwardSeekOperations() {
- return backwardSeekOperations;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS);
}
+ @VisibleForTesting
public long getBytesRead() {
- return bytesRead;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_BYTES);
}
+ @VisibleForTesting
public long getBytesSkippedOnSeek() {
- return bytesSkippedOnSeek;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED);
}
+ @VisibleForTesting
public long getBytesBackwardsOnSeek() {
- return bytesBackwardsOnSeek;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS);
}
+ @VisibleForTesting
public long getSeekInBuffer() {
- return seekInBuffer;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.SEEK_IN_BUFFER);
+
}
+ @VisibleForTesting
public long getReadOperations() {
- return readOperations;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_OPERATIONS);
}
+ @VisibleForTesting
public long getBytesReadFromBuffer() {
- return bytesReadFromBuffer;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_READ_BUFFER);
}
+ @VisibleForTesting
public long getRemoteReadOperations() {
- return remoteReadOperations;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.REMOTE_READ_OP);
}
+ @VisibleForTesting
public long getReadAheadBytesRead() {
- return readAheadBytesRead;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.READ_AHEAD_BYTES_READ);
}
+ @VisibleForTesting
public long getRemoteBytesRead() {
- return remoteBytesRead;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.REMOTE_BYTES_READ);
+ }
+
+ /**
+ * Getter for the mean value of the time taken to complete a HTTP GET
+ * request by AbfsInputStream.
+ * @return mean value.
+ */
+ @VisibleForTesting
+ public double getActionHttpGetRequest() {
+ return ioStatisticsStore.meanStatistics().
+ get(ACTION_HTTP_GET_REQUEST + SUFFIX_MEAN).mean();
}
/**
@@ -223,18 +270,7 @@ public class AbfsInputStreamStatisticsImpl
public String toString() {
final StringBuilder sb = new StringBuilder(
"StreamStatistics{");
- sb.append(", SeekOperations=").append(seekOperations);
- sb.append(", ForwardSeekOperations=").append(forwardSeekOperations);
- sb.append(", BackwardSeekOperations=").append(backwardSeekOperations);
- sb.append(", BytesSkippedOnSeek=").append(bytesSkippedOnSeek);
- sb.append(", BytesBackwardsOnSeek=").append(bytesBackwardsOnSeek);
- sb.append(", seekInBuffer=").append(seekInBuffer);
- sb.append(", BytesRead=").append(bytesRead);
- sb.append(", ReadOperations=").append(readOperations);
- sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer);
- sb.append(", remoteReadOperations=").append(remoteReadOperations);
- sb.append(", readAheadBytesRead=").append(readAheadBytesRead);
- sb.append(", remoteBytesRead=").append(remoteBytesRead);
+ sb.append(ioStatisticsStore.toString());
sb.append('}');
return sb.toString();
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 402fdda..53bdfe9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -43,6 +42,12 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationExcep
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.FSExceptionMessages;
@@ -57,7 +62,8 @@ import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestPara
/**
* The BlobFsOutputStream for Rest AbfsClient.
*/
-public class AbfsOutputStream extends OutputStream implements Syncable, StreamCapabilities {
+public class AbfsOutputStream extends OutputStream implements Syncable,
+ StreamCapabilities, IOStatisticsSource {
private final AbfsClient client;
private final String path;
@@ -97,6 +103,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private final Statistics statistics;
private final AbfsOutputStreamStatistics outputStreamStatistics;
+ private IOStatistics ioStatistics;
private static final Logger LOG =
LoggerFactory.getLogger(AbfsOutputStream.class);
@@ -144,6 +151,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.cachedSasToken = new CachedSASToken(
abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
+ if (outputStreamStatistics != null) {
+ this.ioStatistics = outputStreamStatistics.getIOStatistics();
+ }
}
/**
@@ -354,11 +364,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
if (bufferIndex == 0) {
return;
}
- outputStreamStatistics.writeCurrentBuffer();
-
final byte[] bytes = buffer;
final int bytesLength = bufferIndex;
- outputStreamStatistics.bytesToUpload(bytesLength);
+ if (outputStreamStatistics != null) {
+ outputStreamStatistics.writeCurrentBuffer();
+ outputStreamStatistics.bytesToUpload(bytesLength);
+ }
buffer = byteBufferPool.getBuffer(false, bufferSize).array();
bufferIndex = 0;
final long offset = position;
@@ -370,7 +381,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
bytesLength, APPEND_MODE, true);
AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get());
cachedSasToken.update(op.getSasToken());
- outputStreamStatistics.uploadSuccessful(bytesLength);
+ if (outputStreamStatistics != null) {
+ outputStreamStatistics.uploadSuccessful(bytesLength);
+ }
perfInfo.registerResult(op.getResult());
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
perfInfo.registerSuccess(true);
@@ -402,55 +415,63 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
if (bufferIndex == 0) {
return;
}
- outputStreamStatistics.writeCurrentBuffer();
numOfAppendsToServerSinceLastFlush++;
final byte[] bytes = buffer;
final int bytesLength = bufferIndex;
- outputStreamStatistics.bytesToUpload(bytesLength);
+ if (outputStreamStatistics != null) {
+ outputStreamStatistics.writeCurrentBuffer();
+ outputStreamStatistics.bytesToUpload(bytesLength);
+ }
buffer = byteBufferPool.getBuffer(false, bufferSize).array();
bufferIndex = 0;
final long offset = position;
position += bytesLength;
if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
- long start = System.currentTimeMillis();
- waitForTaskToComplete();
- outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis());
- }
-
- final Future<Void> job = completionService.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- AbfsPerfTracker tracker = client.getAbfsPerfTracker();
- try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
- "writeCurrentBufferToService", "append")) {
- AppendRequestParameters.Mode
- mode = APPEND_MODE;
- if (isFlush & isClose) {
- mode = FLUSH_CLOSE_MODE;
- } else if (isFlush) {
- mode = FLUSH_MODE;
- }
-
- AppendRequestParameters reqParams = new AppendRequestParameters(
- offset, 0, bytesLength, mode, false);
- AbfsRestOperation op = client.append(path, bytes, reqParams,
- cachedSasToken.get());
-
- cachedSasToken.update(op.getSasToken());
- perfInfo.registerResult(op.getResult());
- byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
- perfInfo.registerSuccess(true);
- return null;
+ //Tracking time spent on waiting for task to complete.
+ if (outputStreamStatistics != null) {
+ try (DurationTracker ignored = outputStreamStatistics.timeSpentTaskWait()) {
+ waitForTaskToComplete();
}
+ } else {
+ waitForTaskToComplete();
+ }
+ }
+ final Future<Void> job =
+ completionService.submit(IOStatisticsBinding
+ .trackDurationOfCallable((IOStatisticsStore) ioStatistics,
+ StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST,
+ () -> {
+ AbfsPerfTracker tracker = client.getAbfsPerfTracker();
+ try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
+ "writeCurrentBufferToService", "append")) {
+ AppendRequestParameters.Mode
+ mode = APPEND_MODE;
+ if (isFlush & isClose) {
+ mode = FLUSH_CLOSE_MODE;
+ } else if (isFlush) {
+ mode = FLUSH_MODE;
+ }
+ AppendRequestParameters reqParams = new AppendRequestParameters(
+ offset, 0, bytesLength, mode, false);
+ AbfsRestOperation op = client.append(path, bytes, reqParams,
+ cachedSasToken.get());
+ cachedSasToken.update(op.getSasToken());
+ perfInfo.registerResult(op.getResult());
+ byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
+ perfInfo.registerSuccess(true);
+ return null;
+ }
+ })
+ );
+
+ if (outputStreamStatistics != null) {
+ if (job.isCancelled()) {
+ outputStreamStatistics.uploadFailed(bytesLength);
+ } else {
+ outputStreamStatistics.uploadSuccessful(bytesLength);
}
- });
-
- if (job.isCancelled()) {
- outputStreamStatistics.uploadFailed(bytesLength);
- } else {
- outputStreamStatistics.uploadSuccessful(bytesLength);
}
writeOperations.add(new WriteOperation(job, offset, bytesLength));
@@ -527,7 +548,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
lastTotalAppendOffset += writeOperations.peek().length;
writeOperations.remove();
// Incrementing statistics to indicate queue has been shrunk.
- outputStreamStatistics.queueShrunk();
+ if (outputStreamStatistics != null) {
+ outputStreamStatistics.queueShrunk();
+ }
}
} catch (Exception e) {
if (e.getCause() instanceof AzureBlobFileSystemException) {
@@ -615,6 +638,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
return isAppendBlob;
}
+ @Override
+ public IOStatistics getIOStatistics() {
+ return ioStatistics;
+ }
+
/**
* Appending AbfsOutputStream statistics to base toString().
*
@@ -623,9 +651,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
- sb.append("AbfsOuputStream@").append(this.hashCode()).append("){");
- sb.append(outputStreamStatistics.toString());
- sb.append("}");
+ if (outputStreamStatistics != null) {
+ sb.append("AbfsOutputStream@").append(this.hashCode());
+ sb.append("){");
+ sb.append(outputStreamStatistics.toString());
+ sb.append("}");
+ }
return sb.toString();
}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java
index c9fe0dd..c57d5d9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java
@@ -19,12 +19,15 @@
package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
/**
* Interface for {@link AbfsOutputStream} statistics.
*/
@InterfaceStability.Unstable
-public interface AbfsOutputStreamStatistics {
+public interface AbfsOutputStreamStatistics extends IOStatisticsSource {
/**
* Number of bytes to be uploaded.
@@ -49,11 +52,9 @@ public interface AbfsOutputStreamStatistics {
/**
* Time spent in waiting for tasks to be completed in the blocking queue.
- *
- * @param start millisecond at which the wait for task to be complete begins.
- * @param end millisecond at which the wait is completed for the task.
+ * @return instance of the DurationTracker that tracks the time for waiting.
*/
- void timeSpentTaskWait(long start, long end);
+ DurationTracker timeSpentTaskWait();
/**
* Number of times task queue is shrunk.
@@ -66,6 +67,12 @@ public interface AbfsOutputStreamStatistics {
void writeCurrentBuffer();
/**
+ * Get the IOStatisticsStore instance from AbfsOutputStreamStatistics.
+ * @return instance of IOStatisticsStore which extends IOStatistics.
+ */
+ IOStatistics getIOStatistics();
+
+ /**
* Method to form a string of all AbfsOutputStream statistics and their
* values.
*
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java
index cd5a29e..b07cf28 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java
@@ -18,32 +18,47 @@
package org.apache.hadoop.fs.azurebfs.services;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+
/**
* OutputStream statistics implementation for Abfs.
*/
public class AbfsOutputStreamStatisticsImpl
implements AbfsOutputStreamStatistics {
- private long bytesToUpload;
- private long bytesUploadSuccessful;
- private long bytesUploadFailed;
- /**
- * Counter to get the total time spent while waiting for tasks to complete
- * in the blocking queue inside the thread executor.
- */
- private long timeSpentOnTaskWait;
- /**
- * Counter to get the total number of queue shrink operations done {@code
- * AbfsOutputStream#shrinkWriteOperationQueue()} by AbfsOutputStream to
- * remove the write operations which were successfully done by
- * AbfsOutputStream from the task queue.
- */
- private long queueShrunkOps;
- /**
- * Counter to get the total number of times the current buffer is written
- * to the service {@code AbfsOutputStream#writeCurrentBufferToService()} via
- * AbfsClient and appended to the data store by AbfsRestOperation.
+
+ private final IOStatisticsStore ioStatisticsStore = iostatisticsStore()
+ .withCounters(
+ StreamStatisticNames.BYTES_TO_UPLOAD,
+ StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL,
+ StreamStatisticNames.BYTES_UPLOAD_FAILED,
+ StreamStatisticNames.QUEUE_SHRUNK_OPS,
+ StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS
+ )
+ .withDurationTracking(
+ StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST,
+ StreamStatisticNames.TIME_SPENT_ON_TASK_WAIT
+ )
+ .build();
+
+ /* Reference to the atomic counter for frequently updated counters to avoid
+ * cost of the map lookup on every increment.
*/
- private long writeCurrentBufferOperations;
+ private final AtomicLong bytesUpload =
+ ioStatisticsStore.getCounterReference(StreamStatisticNames.BYTES_TO_UPLOAD);
+ private final AtomicLong bytesUploadedSuccessfully =
+ ioStatisticsStore.getCounterReference(StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL);
+ private final AtomicLong writeCurrentBufferOps =
+ ioStatisticsStore.getCounterReference(StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS);
/**
* Records the need to upload bytes and increments the total bytes that
@@ -53,9 +68,7 @@ public class AbfsOutputStreamStatisticsImpl
*/
@Override
public void bytesToUpload(long bytes) {
- if (bytes > 0) {
- bytesToUpload += bytes;
- }
+ bytesUpload.addAndGet(bytes);
}
/**
@@ -66,9 +79,7 @@ public class AbfsOutputStreamStatisticsImpl
*/
@Override
public void uploadSuccessful(long bytes) {
- if (bytes > 0) {
- bytesUploadSuccessful += bytes;
- }
+ bytesUploadedSuccessfully.addAndGet(bytes);
}
/**
@@ -78,9 +89,7 @@ public class AbfsOutputStreamStatisticsImpl
*/
@Override
public void uploadFailed(long bytes) {
- if (bytes > 0) {
- bytesUploadFailed += bytes;
- }
+ ioStatisticsStore.incrementCounter(StreamStatisticNames.BYTES_UPLOAD_FAILED, bytes);
}
/**
@@ -96,14 +105,10 @@ public class AbfsOutputStreamStatisticsImpl
* This time spent while waiting for the task to be completed is being
* recorded in this counter.
*
- * @param startTime time(in milliseconds) before the wait for task to be
- * completed is begin.
- * @param endTime time(in milliseconds) after the wait for the task to be
- * completed is done.
*/
@Override
- public void timeSpentTaskWait(long startTime, long endTime) {
- timeSpentOnTaskWait += endTime - startTime;
+ public DurationTracker timeSpentTaskWait() {
+ return ioStatisticsStore.trackDuration(StreamStatisticNames.TIME_SPENT_ON_TASK_WAIT);
}
/**
@@ -114,7 +119,7 @@ public class AbfsOutputStreamStatisticsImpl
*/
@Override
public void queueShrunk() {
- queueShrunkOps++;
+ ioStatisticsStore.incrementCounter(StreamStatisticNames.QUEUE_SHRUNK_OPS);
}
/**
@@ -125,31 +130,59 @@ public class AbfsOutputStreamStatisticsImpl
*/
@Override
public void writeCurrentBuffer() {
- writeCurrentBufferOperations++;
+ writeCurrentBufferOps.incrementAndGet();
}
+ /**
+ * {@inheritDoc}
+ *
+ * A getter for IOStatisticsStore instance which extends IOStatistics.
+ *
+ * @return IOStatisticsStore instance.
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return ioStatisticsStore;
+ }
+
+ @VisibleForTesting
public long getBytesToUpload() {
- return bytesToUpload;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_TO_UPLOAD);
}
+ @VisibleForTesting
public long getBytesUploadSuccessful() {
- return bytesUploadSuccessful;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL);
}
+ @VisibleForTesting
public long getBytesUploadFailed() {
- return bytesUploadFailed;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_UPLOAD_FAILED);
}
+ @VisibleForTesting
public long getTimeSpentOnTaskWait() {
- return timeSpentOnTaskWait;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.TIME_SPENT_ON_TASK_WAIT);
}
+ @VisibleForTesting
public long getQueueShrunkOps() {
- return queueShrunkOps;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.QUEUE_SHRUNK_OPS);
}
+ @VisibleForTesting
public long getWriteCurrentBufferOperations() {
- return writeCurrentBufferOperations;
+ return ioStatisticsStore.counters().get(StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS);
+ }
+
+ /**
+ * Getter for mean value of time taken to complete a PUT request by
+ * AbfsOutputStream.
+ * @return mean value.
+ */
+ @VisibleForTesting
+ public double getTimeSpentOnPutRequest() {
+ return ioStatisticsStore.meanStatistics().get(StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST + StoreStatisticNames.SUFFIX_MEAN).mean();
}
/**
@@ -160,16 +193,7 @@ public class AbfsOutputStreamStatisticsImpl
@Override public String toString() {
final StringBuilder outputStreamStats = new StringBuilder(
"OutputStream Statistics{");
- outputStreamStats.append(", bytes_upload=").append(bytesToUpload);
- outputStreamStats.append(", bytes_upload_successfully=")
- .append(bytesUploadSuccessful);
- outputStreamStats.append(", bytes_upload_failed=")
- .append(bytesUploadFailed);
- outputStreamStats.append(", time_spent_task_wait=")
- .append(timeSpentOnTaskWait);
- outputStreamStats.append(", queue_shrunk_ops=").append(queueShrunkOps);
- outputStreamStats.append(", write_current_buffer_ops=")
- .append(writeCurrentBufferOperations);
+ outputStreamStats.append(ioStatisticsStore.toString());
outputStreamStats.append("}");
return outputStreamStats.toString();
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
index 52dfdf2..a33a76e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
@@ -139,10 +139,9 @@ public class ITestAbfsInputStreamStatistics
* forwardSeekOps - Since we are doing a forward seek inside a loop
* for OPERATION times, total forward seeks would be OPERATIONS.
*
- * bytesBackwardsOnSeek - Since we are doing backward seeks from end of
- * file in a ONE_MB file each time, this would mean the bytes from
- * backward seek would be OPERATIONS * ONE_MB. Since this is backward
- * seek this value is expected be to be negative.
+ * negativeBytesBackwardsOnSeek - Since we are doing backward seeks from
+ * end of file in a ONE_MB file each time, this would mean the bytes from
+ * backward seek would be OPERATIONS * ONE_MB.
*
* bytesSkippedOnSeek - Since, we move from start to end in seek, but
* our fCursor(position of cursor) always remain at end of file, this
@@ -160,7 +159,7 @@ public class ITestAbfsInputStreamStatistics
assertEquals("Mismatch in forwardSeekOps value", OPERATIONS,
stats.getForwardSeekOperations());
assertEquals("Mismatch in bytesBackwardsOnSeek value",
- -1 * OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek());
+ OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek());
assertEquals("Mismatch in bytesSkippedOnSeek value",
0, stats.getBytesSkippedOnSeek());
assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS,
@@ -367,6 +366,40 @@ public class ITestAbfsInputStreamStatistics
}
/**
+ * Testing time taken by AbfsInputStream to complete a GET request.
+ */
+ @Test
+ public void testActionHttpGetRequest() throws IOException {
+ describe("Test to check the correct value of Time taken by http get "
+ + "request in AbfsInputStream");
+ AzureBlobFileSystem fs = getFileSystem();
+ AzureBlobFileSystemStore abfss = fs.getAbfsStore();
+ Path actionHttpGetRequestPath = path(getMethodName());
+ AbfsInputStream abfsInputStream = null;
+ AbfsOutputStream abfsOutputStream = null;
+ try {
+ abfsOutputStream = createAbfsOutputStreamWithFlushEnabled(fs,
+ actionHttpGetRequestPath);
+ abfsOutputStream.write('a');
+ abfsOutputStream.hflush();
+
+ abfsInputStream =
+ abfss.openFileForRead(actionHttpGetRequestPath, fs.getFsStatistics());
+ abfsInputStream.read();
+ AbfsInputStreamStatisticsImpl abfsInputStreamStatistics =
+ (AbfsInputStreamStatisticsImpl) abfsInputStream.getStreamStatistics();
+
+ LOG.info("AbfsInputStreamStats info: {}", abfsInputStreamStatistics.toString());
+ Assertions.assertThat(
+ abfsInputStreamStatistics.getActionHttpGetRequest())
+ .describedAs("Mismatch in time taken by a GET request")
+ .isGreaterThan(0.0);
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, abfsInputStream, abfsOutputStream);
+ }
+ }
+
+ /**
* Method to assert the initial values of the statistics.
*
* @param actualValue the actual value of the statistics.
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java
index c8640dd..392e80a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java
@@ -20,7 +20,10 @@ package org.apache.hadoop.fs.azurebfs;
import java.io.IOException;
+import org.assertj.core.api.Assertions;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
@@ -31,7 +34,10 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl;
*/
public class ITestAbfsOutputStreamStatistics
extends AbstractAbfsIntegrationTest {
+
private static final int OPERATIONS = 10;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestAbfsOutputStreamStatistics.class);
public ITestAbfsOutputStreamStatistics() throws Exception {
}
@@ -220,6 +226,31 @@ public class ITestAbfsOutputStreamStatistics
}
/**
+ * Test to check correct value of time spent on a PUT request in
+ * AbfsOutputStream.
+ */
+ @Test
+ public void testAbfsOutputStreamDurationTrackerPutRequest() throws IOException {
+ describe("Testing to check if DurationTracker for PUT request is working "
+ + "correctly.");
+ AzureBlobFileSystem fs = getFileSystem();
+ Path pathForPutRequest = path(getMethodName());
+
+ try(AbfsOutputStream outputStream =
+ createAbfsOutputStreamWithFlushEnabled(fs, pathForPutRequest)) {
+ outputStream.write('a');
+ outputStream.hflush();
+
+ AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics =
+ getAbfsOutputStreamStatistics(outputStream);
+ LOG.info("AbfsOutputStreamStats info: {}", abfsOutputStreamStatistics.toString());
+ Assertions.assertThat(abfsOutputStreamStatistics.getTimeSpentOnPutRequest())
+ .describedAs("Mismatch in timeSpentOnPutRequest DurationTracker")
+ .isGreaterThan(0.0);
+ }
+ }
+
+ /**
* Method to get the AbfsOutputStream statistics.
*
* @param out AbfsOutputStream whose statistics is needed.
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java
index 58f0023..5f94043 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java
@@ -94,17 +94,11 @@ public class TestAbfsOutputStreamStatistics
assertEquals("Mismatch in time spent on waiting for tasks to complete", 0,
abfsOutputStreamStatistics.getTimeSpentOnTaskWait());
- int smallRandomStartTime =
- new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE);
- int smallRandomEndTime =
- new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE)
- + smallRandomStartTime;
- int smallDiff = smallRandomEndTime - smallRandomStartTime;
abfsOutputStreamStatistics
- .timeSpentTaskWait(smallRandomStartTime, smallRandomEndTime);
- //Test for small random value of timeSpentWaitTask.
+ .timeSpentTaskWait();
+ //Test for one op call value of timeSpentWaitTask.
assertEquals("Mismatch in time spent on waiting for tasks to complete",
- smallDiff, abfsOutputStreamStatistics.getTimeSpentOnTaskWait());
+ 1, abfsOutputStreamStatistics.getTimeSpentOnTaskWait());
//Reset statistics for the next test.
abfsOutputStreamStatistics = new AbfsOutputStreamStatisticsImpl();
@@ -113,23 +107,16 @@ public class TestAbfsOutputStreamStatistics
* Entering multiple values for timeSpentTaskWait() to check the
* summation is happening correctly. Also calculating the expected result.
*/
- int expectedRandomDiff = 0;
for (int i = 0; i < OPERATIONS; i++) {
- int largeRandomStartTime =
- new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE);
- int largeRandomEndTime = new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE)
- + largeRandomStartTime;
- abfsOutputStreamStatistics
- .timeSpentTaskWait(largeRandomStartTime, largeRandomEndTime);
- expectedRandomDiff += largeRandomEndTime - largeRandomStartTime;
+ abfsOutputStreamStatistics.timeSpentTaskWait();
}
/*
- * Test to check correct value of timeSpentTaskWait after multiple
- * random values are passed in it.
+ * Test to check correct value of timeSpentTaskWait after OPERATIONS
+ * number of op calls.
*/
assertEquals("Mismatch in time spent on waiting for tasks to complete",
- expectedRandomDiff,
+ OPERATIONS,
abfsOutputStreamStatistics.getTimeSpentOnTaskWait());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org