You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/07/03 10:41:57 UTC

[hadoop] branch trunk updated: HADOOP-16961. ABFS: Adding metrics to AbfsInputStream (#2076)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3b5c9a9  HADOOP-16961. ABFS: Adding metrics to AbfsInputStream (#2076)
3b5c9a9 is described below

commit 3b5c9a90c07e6360007f3f4aa357aa665b47ca3a
Author: Mehakmeet Singh <me...@gmail.com>
AuthorDate: Fri Jul 3 16:11:35 2020 +0530

    HADOOP-16961. ABFS: Adding metrics to AbfsInputStream (#2076)
    
    
    Contributed by Mehakmeet Singh.
---
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |   2 +
 .../fs/azurebfs/services/AbfsInputStream.java      |  68 +++++
 .../azurebfs/services/AbfsInputStreamContext.java  |  12 +
 .../services/AbfsInputStreamStatistics.java        |  93 +++++++
 .../services/AbfsInputStreamStatisticsImpl.java    | 205 ++++++++++++++
 .../azurebfs/ITestAbfsInputStreamStatistics.java   | 297 +++++++++++++++++++++
 .../fs/azurebfs/TestAbfsInputStreamStatistics.java |  55 ++++
 7 files changed, 732 insertions(+)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 397afc8..c310e29 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl;
@@ -511,6 +512,7 @@ public class AzureBlobFileSystemStore implements Closeable {
             .withReadBufferSize(abfsConfiguration.getReadBufferSize())
             .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
             .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
+            .withStreamStatistics(new AbfsInputStreamStatisticsImpl())
             .build();
   }
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 50380c9..a809bde 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -68,6 +68,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
   //                                                      of valid bytes in buffer)
   private boolean closed = false;
 
+  /** Stream statistics. */
+  private final AbfsInputStreamStatistics streamStatistics;
+
   public AbfsInputStream(
           final AbfsClient client,
           final Statistics statistics,
@@ -86,6 +89,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     this.readAheadEnabled = true;
     this.cachedSasToken = new CachedSASToken(
         abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
+    this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
   }
 
   public String getPath() {
@@ -105,10 +109,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
 
   @Override
   public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
+    // check if buffer is null before logging the length
+    if (b != null) {
+      LOG.debug("read requested b.length = {} offset = {} len = {}", b.length,
+          off, len);
+    } else {
+      LOG.debug("read requested b = null offset = {} len = {}", off, len);
+    }
+
     int currentOff = off;
     int currentLen = len;
     int lastReadBytes;
     int totalReadBytes = 0;
+    if (streamStatistics != null) {
+      streamStatistics.readOperationStarted(off, len);
+    }
     incrementReadOps();
     do {
       lastReadBytes = readOneBlock(b, currentOff, currentLen);
@@ -130,6 +145,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     }
 
     Preconditions.checkNotNull(b);
+    LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
+        off, len);
 
     if (len == 0) {
       return 0;
@@ -155,6 +172,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
       bCursor = 0;
       limit = 0;
       if (buffer == null) {
+        LOG.debug("created new buffer size {}", bufferSize);
         buffer = new byte[bufferSize];
       }
 
@@ -183,6 +201,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     if (statistics != null) {
       statistics.incrementBytesRead(bytesToRead);
     }
+    if (streamStatistics != null) {
+      // Bytes read from the local buffer.
+      streamStatistics.bytesReadFromBuffer(bytesToRead);
+      streamStatistics.bytesRead(bytesToRead);
+    }
     return bytesToRead;
   }
 
@@ -200,8 +223,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
       int numReadAheads = this.readAheadQueueDepth;
       long nextSize;
       long nextOffset = position;
+      LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads);
       while (numReadAheads > 0 && nextOffset < contentLength) {
         nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
+        LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
+            nextOffset, nextSize);
         ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
         nextOffset = nextOffset + nextSize;
         numReadAheads--;
@@ -211,6 +237,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
       receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
       if (receivedBytes > 0) {
         incrementReadOps();
+        LOG.debug("Received data from read ahead, not doing remote read");
         return receivedBytes;
       }
 
@@ -218,6 +245,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
       receivedBytes = readRemote(position, b, offset, length);
       return receivedBytes;
     } else {
+      LOG.debug("read ahead disabled, reading remote");
       return readRemote(position, b, offset, length);
     }
   }
@@ -247,6 +275,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
       LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
       op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
       cachedSasToken.update(op.getSasToken());
+      if (streamStatistics != null) {
+        streamStatistics.remoteReadOperation();
+      }
+      LOG.debug("issuing HTTP GET request params position = {} b.length = {} "
+          + "offset = {} length = {}", position, b.length, offset, length);
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
       incrementReadOps();
     } catch (AzureBlobFileSystemException ex) {
@@ -262,6 +295,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     if (bytesRead > Integer.MAX_VALUE) {
       throw new IOException("Unexpected Content-Length");
     }
+    LOG.debug("HTTP request read bytes = {}", bytesRead);
     return (int) bytesRead;
   }
 
@@ -282,6 +316,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
    */
   @Override
   public synchronized void seek(long n) throws IOException {
+    LOG.debug("requested seek to position {}", n);
     if (closed) {
       throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
     }
@@ -292,13 +327,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
       throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
     }
 
+    if (streamStatistics != null) {
+      streamStatistics.seek(n, fCursor);
+    }
+
     if (n>=fCursor-limit && n<=fCursor) { // within buffer
       bCursor = (int) (n-(fCursor-limit));
+      if (streamStatistics != null) {
+        streamStatistics.seekInBuffer();
+      }
       return;
     }
 
     // next read will read from here
     fCursor = n;
+    LOG.debug("set fCursor to {}", fCursor);
 
     //invalidate buffer
     limit = 0;
@@ -390,6 +433,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
   public synchronized void close() throws IOException {
     closed = true;
     buffer = null; // de-reference the buffer so it can be GC'ed sooner
+    LOG.debug("Closing {}", this);
   }
 
   /**
@@ -443,4 +487,28 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     this.cachedSasToken = cachedSasToken;
   }
 
+  /**
+   * Getter for AbfsInputStreamStatistics.
+   *
+   * @return an instance of AbfsInputStreamStatistics.
+   */
+  @VisibleForTesting
+  public AbfsInputStreamStatistics getStreamStatistics() {
+    return streamStatistics;
+  }
+
+  /**
+   * Get the statistics of the stream.
+   * @return a string value.
+   */
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(super.toString());
+    if (streamStatistics != null) {
+      sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
+      sb.append(streamStatistics.toString());
+      sb.append("}");
+    }
+    return sb.toString();
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
index a847b56..f8d3b2a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -29,6 +29,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
 
   private boolean tolerateOobAppends;
 
+  private AbfsInputStreamStatistics streamStatistics;
+
   public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
     super(sasTokenRenewPeriodForStreamsInSeconds);
   }
@@ -52,6 +54,12 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
     return this;
   }
 
+  public AbfsInputStreamContext withStreamStatistics(
+      final AbfsInputStreamStatistics streamStatistics) {
+    this.streamStatistics = streamStatistics;
+    return this;
+  }
+
   public AbfsInputStreamContext build() {
     // Validation of parameters to be done here.
     return this;
@@ -68,4 +76,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
   public boolean isTolerateOobAppends() {
     return tolerateOobAppends;
   }
+
+  public AbfsInputStreamStatistics getStreamStatistics() {
+    return streamStatistics;
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
new file mode 100644
index 0000000..2603394
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Interface for statistics for the AbfsInputStream.
+ */
+@InterfaceStability.Unstable
+public interface AbfsInputStreamStatistics {
+  /**
+   * Seek backwards, incrementing the seek and backward seek counters.
+   *
+   * @param negativeOffset how far was the seek?
+   *                       This is expected to be negative.
+   */
+  void seekBackwards(long negativeOffset);
+
+  /**
+   * Record a forward seek, adding a seek operation, a forward
+   * seek operation, and any bytes skipped.
+   *
+   * @param skipped number of bytes skipped by reading from the stream.
+   *                If the seek was implemented by a close + reopen, set this to zero.
+   */
+  void seekForwards(long skipped);
+
+  /**
+   * Record a forward or backward seek, adding a seek operation, a forward or
+   * a backward seek operation, and number of bytes skipped.
+   *
+   * @param seekTo     seek to the position.
+   * @param currentPos current position.
+   */
+  void seek(long seekTo, long currentPos);
+
+  /**
+   * Increment the bytes read counter by the number of bytes;
+   * no-op if the argument is negative.
+   *
+   * @param bytes number of bytes read.
+   */
+  void bytesRead(long bytes);
+
+  /**
+   * Record the total bytes read from buffer.
+   *
+   * @param bytes number of bytes that are read from buffer.
+   */
+  void bytesReadFromBuffer(long bytes);
+
+  /**
+   * Records the total number of seeks done in the buffer.
+   */
+  void seekInBuffer();
+
+  /**
+   * A {@code read(byte[] buf, int off, int len)} operation has started.
+   *
+   * @param pos starting position of the read.
+   * @param len length of bytes to read.
+   */
+  void readOperationStarted(long pos, long len);
+
+  /**
+   * Records a successful remote read operation.
+   */
+  void remoteReadOperation();
+
+  /**
+   * Makes the string of all the AbfsInputStream statistics.
+   * @return the string with all the statistics.
+   */
+  @Override
+  String toString();
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
new file mode 100644
index 0000000..fd18910
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * Stats for the AbfsInputStream.
+ */
+public class AbfsInputStreamStatisticsImpl
+    implements AbfsInputStreamStatistics {
+  private long seekOperations;
+  private long forwardSeekOperations;
+  private long backwardSeekOperations;
+  private long bytesRead;
+  private long bytesSkippedOnSeek;
+  private long bytesBackwardsOnSeek;
+  private long seekInBuffer;
+  private long readOperations;
+  private long bytesReadFromBuffer;
+  private long remoteReadOperations;
+
+  /**
+   * Seek backwards, incrementing the seek and backward seek counters.
+   *
+   * @param negativeOffset how far was the seek?
+   *                       This is expected to be negative.
+   */
+  @Override
+  public void seekBackwards(long negativeOffset) {
+    seekOperations++;
+    backwardSeekOperations++;
+    bytesBackwardsOnSeek -= negativeOffset;
+  }
+
+  /**
+   * Record a forward seek, adding a seek operation, a forward
+   * seek operation, and any bytes skipped.
+   *
+   * @param skipped number of bytes skipped by reading from the stream.
+   *                If the seek was implemented by a close + reopen, set this to zero.
+   */
+  @Override
+  public void seekForwards(long skipped) {
+    seekOperations++;
+    forwardSeekOperations++;
+    if (skipped > 0) {
+      bytesSkippedOnSeek += skipped;
+    }
+  }
+
+  /**
+   * Record a forward or backward seek, adding a seek operation, a forward or
+   * a backward seek operation, and number of bytes skipped.
+   * The seek direction will be calculated based on the parameters.
+   *
+   * @param seekTo     seek to the position.
+   * @param currentPos current position.
+   */
+  @Override
+  public void seek(long seekTo, long currentPos) {
+    if (seekTo >= currentPos) {
+      this.seekForwards(seekTo - currentPos);
+    } else {
+      this.seekBackwards(currentPos - seekTo);
+    }
+  }
+
+  /**
+   * Increment the bytes read counter by the number of bytes;
+   * no-op if the argument is negative.
+   *
+   * @param bytes number of bytes read.
+   */
+  @Override
+  public void bytesRead(long bytes) {
+    if (bytes > 0) {
+      bytesRead += bytes;
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * Total bytes read from the buffer.
+   *
+   * @param bytes number of bytes that are read from buffer.
+   */
+  @Override
+  public void bytesReadFromBuffer(long bytes) {
+    if (bytes > 0) {
+      bytesReadFromBuffer += bytes;
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * Increment the number of seeks in the buffer.
+   */
+  @Override
+  public void seekInBuffer() {
+    seekInBuffer++;
+  }
+
+  /**
+   * A {@code read(byte[] buf, int off, int len)} operation has started.
+   *
+   * @param pos starting position of the read.
+   * @param len length of bytes to read.
+   */
+  @Override
+  public void readOperationStarted(long pos, long len) {
+    readOperations++;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * Increment the counter when a remote read operation occurs.
+   */
+  @Override
+  public void remoteReadOperation() {
+    remoteReadOperations++;
+  }
+
+  public long getSeekOperations() {
+    return seekOperations;
+  }
+
+  public long getForwardSeekOperations() {
+    return forwardSeekOperations;
+  }
+
+  public long getBackwardSeekOperations() {
+    return backwardSeekOperations;
+  }
+
+  public long getBytesRead() {
+    return bytesRead;
+  }
+
+  public long getBytesSkippedOnSeek() {
+    return bytesSkippedOnSeek;
+  }
+
+  public long getBytesBackwardsOnSeek() {
+    return bytesBackwardsOnSeek;
+  }
+
+  public long getSeekInBuffer() {
+    return seekInBuffer;
+  }
+
+  public long getReadOperations() {
+    return readOperations;
+  }
+
+  public long getBytesReadFromBuffer() {
+    return bytesReadFromBuffer;
+  }
+
+  public long getRemoteReadOperations() {
+    return remoteReadOperations;
+  }
+
+  /**
+   * String operator describes all the current statistics.
+   * <b>Important: there are no guarantees as to the stability
+   * of this value.</b>
+   *
+   * @return the current values of the stream statistics.
+   */
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "StreamStatistics{");
+    sb.append(", SeekOperations=").append(seekOperations);
+    sb.append(", ForwardSeekOperations=").append(forwardSeekOperations);
+    sb.append(", BackwardSeekOperations=").append(backwardSeekOperations);
+    sb.append(", BytesSkippedOnSeek=").append(bytesSkippedOnSeek);
+    sb.append(", BytesBackwardsOnSeek=").append(bytesBackwardsOnSeek);
+    sb.append(", seekInBuffer=").append(seekInBuffer);
+    sb.append(", BytesRead=").append(bytesRead);
+    sb.append(", ReadOperations=").append(readOperations);
+    sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer);
+    sb.append(", remoteReadOperations=").append(remoteReadOperations);
+    sb.append('}');
+    return sb.toString();
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
new file mode 100644
index 0000000..7a62eca
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import org.apache.hadoop.io.IOUtils;
+
+public class ITestAbfsInputStreamStatistics
+    extends AbstractAbfsIntegrationTest {
+  private static final int OPERATIONS = 10;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestAbfsInputStreamStatistics.class);
+  private static final int ONE_MB = 1024 * 1024;
+  private static final int ONE_KB = 1024;
+  private byte[] defBuffer = new byte[ONE_MB];
+
+  public ITestAbfsInputStreamStatistics() throws Exception {
+  }
+
+  /**
+   * Test to check the initial values of the AbfsInputStream statistics.
+   */
+  @Test
+  public void testInitValues() throws IOException {
+    describe("Testing the initial values of AbfsInputStream Statistics");
+
+    AzureBlobFileSystem fs = getFileSystem();
+    AzureBlobFileSystemStore abfss = fs.getAbfsStore();
+    Path initValuesPath = path(getMethodName());
+    AbfsOutputStream outputStream = null;
+    AbfsInputStream inputStream = null;
+
+    try {
+
+      outputStream = createAbfsOutputStreamWithFlushEnabled(fs, initValuesPath);
+      inputStream = abfss.openFileForRead(initValuesPath, fs.getFsStatistics());
+
+      AbfsInputStreamStatisticsImpl stats =
+          (AbfsInputStreamStatisticsImpl) inputStream.getStreamStatistics();
+
+      checkInitValue(stats.getSeekOperations(), "seekOps");
+      checkInitValue(stats.getForwardSeekOperations(), "forwardSeekOps");
+      checkInitValue(stats.getBackwardSeekOperations(), "backwardSeekOps");
+      checkInitValue(stats.getBytesRead(), "bytesRead");
+      checkInitValue(stats.getBytesSkippedOnSeek(), "bytesSkippedOnSeek");
+      checkInitValue(stats.getBytesBackwardsOnSeek(), "bytesBackwardsOnSeek");
+      checkInitValue(stats.getSeekInBuffer(), "seekInBuffer");
+      checkInitValue(stats.getReadOperations(), "readOps");
+      checkInitValue(stats.getBytesReadFromBuffer(), "bytesReadFromBuffer");
+      checkInitValue(stats.getRemoteReadOperations(), "remoteReadOps");
+
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, outputStream, inputStream);
+    }
+  }
+
+  /**
+   * Test to check statistics from seek operation in AbfsInputStream.
+   */
+  @Test
+  public void testSeekStatistics() throws IOException {
+    describe("Testing the values of statistics from seek operations in "
+        + "AbfsInputStream");
+
+    AzureBlobFileSystem fs = getFileSystem();
+    AzureBlobFileSystemStore abfss = fs.getAbfsStore();
+    Path seekStatPath = path(getMethodName());
+
+    AbfsOutputStream out = null;
+    AbfsInputStream in = null;
+
+    try {
+      out = createAbfsOutputStreamWithFlushEnabled(fs, seekStatPath);
+
+      //Writing a default buffer in a file.
+      out.write(defBuffer);
+      out.hflush();
+      in = abfss.openFileForRead(seekStatPath, fs.getFsStatistics());
+
+      /*
+       * Writing 1MB buffer to the file, this would make the fCursor(Current
+       * position of cursor) to the end of file.
+       */
+      int result = in.read(defBuffer, 0, ONE_MB);
+      LOG.info("Result of read : {}", result);
+
+      /*
+       * Seeking to start of file and then back to end would result in a
+       * backward and a forward seek respectively 10 times.
+       */
+      for (int i = 0; i < OPERATIONS; i++) {
+        in.seek(0);
+        in.seek(ONE_MB);
+      }
+
+      AbfsInputStreamStatisticsImpl stats =
+          (AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
+
+      LOG.info("STATISTICS: {}", stats.toString());
+
+      /*
+       * seekOps - Since we are doing backward and forward seek OPERATIONS
+       * times, total seeks would be 2 * OPERATIONS.
+       *
+       * backwardSeekOps - Since we are doing a backward seek inside a loop
+       * for OPERATION times, total backward seeks would be OPERATIONS.
+       *
+       * forwardSeekOps - Since we are doing a forward seek inside a loop
+       * for OPERATION times, total forward seeks would be OPERATIONS.
+       *
+       * bytesBackwardsOnSeek - Since we are doing backward seeks from end of
+       * file in a ONE_MB file each time, this would mean the bytes from
+       * backward seek would be OPERATIONS * ONE_MB. Since this is backward
+       * seek this value is expected be to be negative.
+       *
+       * bytesSkippedOnSeek - Since, we move from start to end in seek, but
+       * our fCursor(position of cursor) always remain at end of file, this
+       * would mean no bytes were skipped on seek. Since, all forward seeks
+       * are in buffer.
+       *
+       * seekInBuffer - Since all seeks were in buffer, the seekInBuffer
+       * would be equal to 2 * OPERATIONS.
+       *
+       */
+      assertEquals("Mismatch in seekOps value", 2 * OPERATIONS,
+          stats.getSeekOperations());
+      assertEquals("Mismatch in backwardSeekOps value", OPERATIONS,
+          stats.getBackwardSeekOperations());
+      assertEquals("Mismatch in forwardSeekOps value", OPERATIONS,
+          stats.getForwardSeekOperations());
+      assertEquals("Mismatch in bytesBackwardsOnSeek value",
+          -1 * OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek());
+      assertEquals("Mismatch in bytesSkippedOnSeek value",
+          0, stats.getBytesSkippedOnSeek());
+      assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS,
+          stats.getSeekInBuffer());
+
+      in.close();
+      // Verifying whether stats are readable after stream is closed.
+      LOG.info("STATISTICS after closing: {}", stats.toString());
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, out, in);
+    }
+  }
+
+  /**
+   * Test to check statistics value from read operation in AbfsInputStream.
+   */
+  @Test
+  public void testReadStatistics() throws IOException {
+    describe("Testing the values of statistics from read operation in "
+        + "AbfsInputStream");
+
+    AzureBlobFileSystem fs = getFileSystem();
+    AzureBlobFileSystemStore abfss = fs.getAbfsStore();
+    Path readStatPath = path(getMethodName());
+
+    AbfsOutputStream out = null;
+    AbfsInputStream in = null;
+
+    try {
+      out = createAbfsOutputStreamWithFlushEnabled(fs, readStatPath);
+
+      /*
+       * Writing 1MB buffer to the file.
+       */
+      out.write(defBuffer);
+      out.hflush();
+      in = abfss.openFileForRead(readStatPath, fs.getFsStatistics());
+
+      /*
+       * Doing file read 10 times.
+       */
+      for (int i = 0; i < OPERATIONS; i++) {
+        in.read();
+      }
+
+      AbfsInputStreamStatisticsImpl stats =
+          (AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
+
+      LOG.info("STATISTICS: {}", stats.toString());
+
+      /*
+       * bytesRead - Since each time a single byte is read, total
+       * bytes read would be equal to OPERATIONS.
+       *
+       * readOps - Since each time read operation is performed OPERATIONS
+       * times, total number of read operations would be equal to OPERATIONS.
+       *
+       * remoteReadOps - Only a single remote read operation is done. Hence,
+       * total remote read ops is 1.
+       *
+       */
+      assertEquals("Mismatch in bytesRead value", OPERATIONS,
+          stats.getBytesRead());
+      assertEquals("Mismatch in readOps value", OPERATIONS,
+          stats.getReadOperations());
+      assertEquals("Mismatch in remoteReadOps value", 1,
+          stats.getRemoteReadOperations());
+
+      in.close();
+      // Verifying if stats are still readable after stream is closed.
+      LOG.info("STATISTICS after closing: {}", stats.toString());
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, out, in);
+    }
+  }
+
+  /**
+   * Testing AbfsInputStream works with null Statistics.
+   */
+  @Test
+  public void testWithNullStreamStatistics() throws IOException {
+    describe("Testing AbfsInputStream operations with statistics as null");
+
+    AzureBlobFileSystem fs = getFileSystem();
+    Path nullStatFilePath = path(getMethodName());
+    byte[] oneKbBuff = new byte[ONE_KB];
+
+    // Creating an AbfsInputStreamContext instance with null StreamStatistics.
+    AbfsInputStreamContext abfsInputStreamContext =
+        new AbfsInputStreamContext(
+            getConfiguration().getSasTokenRenewPeriodForStreamsInSeconds())
+            .withReadBufferSize(getConfiguration().getReadBufferSize())
+            .withReadAheadQueueDepth(getConfiguration().getReadAheadQueueDepth())
+            .withStreamStatistics(null)
+            .build();
+
+    AbfsOutputStream out = null;
+    AbfsInputStream in = null;
+
+    try {
+      out = createAbfsOutputStreamWithFlushEnabled(fs, nullStatFilePath);
+
+      // Writing a 1KB buffer in the file.
+      out.write(oneKbBuff);
+      out.hflush();
+
+      // AbfsRestOperation Instance required for eTag.
+      AbfsRestOperation abfsRestOperation =
+          fs.getAbfsClient().getPathStatus(nullStatFilePath.toUri().getPath(), false);
+
+      // AbfsInputStream with no StreamStatistics.
+      in = new AbfsInputStream(fs.getAbfsClient(), null,
+          nullStatFilePath.toUri().getPath(), ONE_KB,
+          abfsInputStreamContext,
+          abfsRestOperation.getResult().getResponseHeader("ETag"));
+
+      // Verifying that AbfsInputStream Operations works with null statistics.
+      assertNotEquals("AbfsInputStream read() with null statistics should "
+          + "work", -1, in.read());
+      in.seek(ONE_KB);
+
+      // Verifying toString() with no StreamStatistics.
+      LOG.info("AbfsInputStream: {}", in.toString());
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, out, in);
+    }
+  }
+
+  /**
+   * Method to assert the initial values of the statistics.
+   *
+   * @param actualValue the actual value of the statistics.
+   * @param statistic   the name of operation or statistic being asserted.
+   */
+  private void checkInitValue(long actualValue, String statistic) {
+    assertEquals("Mismatch in " + statistic + " value", 0, actualValue);
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsInputStreamStatistics.java
new file mode 100644
index 0000000..22c247f
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsInputStreamStatistics.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
+
+public class TestAbfsInputStreamStatistics extends AbstractAbfsIntegrationTest {
+
+  private static final int OPERATIONS = 100;
+
+  public TestAbfsInputStreamStatistics() throws Exception {
+  }
+
+  /**
+   * Test to check the bytesReadFromBuffer statistic value from AbfsInputStream.
+   */
+  @Test
+  public void testBytesReadFromBufferStatistic() {
+    describe("Testing bytesReadFromBuffer statistics value in AbfsInputStream");
+
+    AbfsInputStreamStatisticsImpl abfsInputStreamStatistics =
+        new AbfsInputStreamStatisticsImpl();
+
+    //Increment the bytesReadFromBuffer value.
+    for (int i = 0; i < OPERATIONS; i++) {
+      abfsInputStreamStatistics.bytesReadFromBuffer(1);
+    }
+
+    /*
+     * Since we incremented the bytesReadFromBuffer OPERATIONS times, this
+     * should be the expected value.
+     */
+    assertEquals("Mismatch in bytesReadFromBuffer value", OPERATIONS,
+        abfsInputStreamStatistics.getBytesReadFromBuffer());
+
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org