You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/07/03 10:41:57 UTC
[hadoop] branch trunk updated: HADOOP-16961. ABFS: Adding metrics
to AbfsInputStream (#2076)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3b5c9a9 HADOOP-16961. ABFS: Adding metrics to AbfsInputStream (#2076)
3b5c9a9 is described below
commit 3b5c9a90c07e6360007f3f4aa357aa665b47ca3a
Author: Mehakmeet Singh <me...@gmail.com>
AuthorDate: Fri Jul 3 16:11:35 2020 +0530
HADOOP-16961. ABFS: Adding metrics to AbfsInputStream (#2076)
Contributed by Mehakmeet Singh.
---
.../fs/azurebfs/AzureBlobFileSystemStore.java | 2 +
.../fs/azurebfs/services/AbfsInputStream.java | 68 +++++
.../azurebfs/services/AbfsInputStreamContext.java | 12 +
.../services/AbfsInputStreamStatistics.java | 93 +++++++
.../services/AbfsInputStreamStatisticsImpl.java | 205 ++++++++++++++
.../azurebfs/ITestAbfsInputStreamStatistics.java | 297 +++++++++++++++++++++
.../fs/azurebfs/TestAbfsInputStreamStatistics.java | 55 ++++
7 files changed, 732 insertions(+)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 397afc8..c310e29 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl;
@@ -511,6 +512,7 @@ public class AzureBlobFileSystemStore implements Closeable {
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
+ .withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.build();
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 50380c9..a809bde 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -68,6 +68,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
// of valid bytes in buffer)
private boolean closed = false;
+ /** Stream statistics. */
+ private final AbfsInputStreamStatistics streamStatistics;
+
public AbfsInputStream(
final AbfsClient client,
final Statistics statistics,
@@ -86,6 +89,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
this.readAheadEnabled = true;
this.cachedSasToken = new CachedSASToken(
abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
+ this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
}
public String getPath() {
@@ -105,10 +109,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
@Override
public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
+ // check if buffer is null before logging the length
+ if (b != null) {
+ LOG.debug("read requested b.length = {} offset = {} len = {}", b.length,
+ off, len);
+ } else {
+ LOG.debug("read requested b = null offset = {} len = {}", off, len);
+ }
+
int currentOff = off;
int currentLen = len;
int lastReadBytes;
int totalReadBytes = 0;
+ if (streamStatistics != null) {
+ streamStatistics.readOperationStarted(off, len);
+ }
incrementReadOps();
do {
lastReadBytes = readOneBlock(b, currentOff, currentLen);
@@ -130,6 +145,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
}
Preconditions.checkNotNull(b);
+ LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
+ off, len);
if (len == 0) {
return 0;
@@ -155,6 +172,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
bCursor = 0;
limit = 0;
if (buffer == null) {
+ LOG.debug("created new buffer size {}", bufferSize);
buffer = new byte[bufferSize];
}
@@ -183,6 +201,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
if (statistics != null) {
statistics.incrementBytesRead(bytesToRead);
}
+ if (streamStatistics != null) {
+ // Bytes read from the local buffer.
+ streamStatistics.bytesReadFromBuffer(bytesToRead);
+ streamStatistics.bytesRead(bytesToRead);
+ }
return bytesToRead;
}
@@ -200,8 +223,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
int numReadAheads = this.readAheadQueueDepth;
long nextSize;
long nextOffset = position;
+ LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads);
while (numReadAheads > 0 && nextOffset < contentLength) {
nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
+ LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
+ nextOffset, nextSize);
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
nextOffset = nextOffset + nextSize;
numReadAheads--;
@@ -211,6 +237,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
if (receivedBytes > 0) {
incrementReadOps();
+ LOG.debug("Received data from read ahead, not doing remote read");
return receivedBytes;
}
@@ -218,6 +245,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
receivedBytes = readRemote(position, b, offset, length);
return receivedBytes;
} else {
+ LOG.debug("read ahead disabled, reading remote");
return readRemote(position, b, offset, length);
}
}
@@ -247,6 +275,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
cachedSasToken.update(op.getSasToken());
+ if (streamStatistics != null) {
+ streamStatistics.remoteReadOperation();
+ }
+ LOG.debug("issuing HTTP GET request params position = {} b.length = {} "
+ + "offset = {} length = {}", position, b.length, offset, length);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
incrementReadOps();
} catch (AzureBlobFileSystemException ex) {
@@ -262,6 +295,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
+ LOG.debug("HTTP request read bytes = {}", bytesRead);
return (int) bytesRead;
}
@@ -282,6 +316,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
*/
@Override
public synchronized void seek(long n) throws IOException {
+ LOG.debug("requested seek to position {}", n);
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
@@ -292,13 +327,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
+ if (streamStatistics != null) {
+ streamStatistics.seek(n, fCursor);
+ }
+
if (n>=fCursor-limit && n<=fCursor) { // within buffer
bCursor = (int) (n-(fCursor-limit));
+ if (streamStatistics != null) {
+ streamStatistics.seekInBuffer();
+ }
return;
}
// next read will read from here
fCursor = n;
+ LOG.debug("set fCursor to {}", fCursor);
//invalidate buffer
limit = 0;
@@ -390,6 +433,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
public synchronized void close() throws IOException {
closed = true;
buffer = null; // de-reference the buffer so it can be GC'ed sooner
+ LOG.debug("Closing {}", this);
}
/**
@@ -443,4 +487,28 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
this.cachedSasToken = cachedSasToken;
}
+ /**
+ * Getter for AbfsInputStreamStatistics.
+ *
+ * @return an instance of AbfsInputStreamStatistics.
+ */
+ @VisibleForTesting
+ public AbfsInputStreamStatistics getStreamStatistics() {
+ return streamStatistics;
+ }
+
+ /**
+ * Get the statistics of the stream.
+ * @return a string value.
+ */
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(super.toString());
+ if (streamStatistics != null) {
+ sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
+ sb.append(streamStatistics.toString());
+ sb.append("}");
+ }
+ return sb.toString();
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
index a847b56..f8d3b2a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -29,6 +29,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
private boolean tolerateOobAppends;
+ private AbfsInputStreamStatistics streamStatistics;
+
public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
@@ -52,6 +54,12 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
return this;
}
+ public AbfsInputStreamContext withStreamStatistics(
+ final AbfsInputStreamStatistics streamStatistics) {
+ this.streamStatistics = streamStatistics;
+ return this;
+ }
+
public AbfsInputStreamContext build() {
// Validation of parameters to be done here.
return this;
@@ -68,4 +76,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
public boolean isTolerateOobAppends() {
return tolerateOobAppends;
}
+
+ public AbfsInputStreamStatistics getStreamStatistics() {
+ return streamStatistics;
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
new file mode 100644
index 0000000..2603394
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Interface for statistics for the AbfsInputStream.
+ */
+@InterfaceStability.Unstable
+public interface AbfsInputStreamStatistics {
+ /**
+ * Seek backwards, incrementing the seek and backward seek counters.
+ *
+ * @param negativeOffset how far was the seek?
+ * This is expected to be negative.
+ */
+ void seekBackwards(long negativeOffset);
+
+ /**
+ * Record a forward seek, adding a seek operation, a forward
+ * seek operation, and any bytes skipped.
+ *
+ * @param skipped number of bytes skipped by reading from the stream.
+ * If the seek was implemented by a close + reopen, set this to zero.
+ */
+ void seekForwards(long skipped);
+
+ /**
+ * Record a forward or backward seek, adding a seek operation, a forward or
+ * a backward seek operation, and number of bytes skipped.
+ *
+ * @param seekTo seek to the position.
+ * @param currentPos current position.
+ */
+ void seek(long seekTo, long currentPos);
+
+ /**
+ * Increment the bytes read counter by the number of bytes;
+ * no-op if the argument is negative.
+ *
+ * @param bytes number of bytes read.
+ */
+ void bytesRead(long bytes);
+
+ /**
+ * Record the total bytes read from buffer.
+ *
+ * @param bytes number of bytes that are read from buffer.
+ */
+ void bytesReadFromBuffer(long bytes);
+
+ /**
+ * Records the total number of seeks done in the buffer.
+ */
+ void seekInBuffer();
+
+ /**
+ * A {@code read(byte[] buf, int off, int len)} operation has started.
+ *
+ * @param pos starting position of the read.
+ * @param len length of bytes to read.
+ */
+ void readOperationStarted(long pos, long len);
+
+ /**
+ * Records a successful remote read operation.
+ */
+ void remoteReadOperation();
+
+ /**
+ * Makes the string of all the AbfsInputStream statistics.
+ * @return the string with all the statistics.
+ */
+ @Override
+ String toString();
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
new file mode 100644
index 0000000..fd18910
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * Stats for the AbfsInputStream.
+ */
+public class AbfsInputStreamStatisticsImpl
+ implements AbfsInputStreamStatistics {
+ private long seekOperations;
+ private long forwardSeekOperations;
+ private long backwardSeekOperations;
+ private long bytesRead;
+ private long bytesSkippedOnSeek;
+ private long bytesBackwardsOnSeek;
+ private long seekInBuffer;
+ private long readOperations;
+ private long bytesReadFromBuffer;
+ private long remoteReadOperations;
+
+ /**
+ * Seek backwards, incrementing the seek and backward seek counters.
+ *
+ * @param negativeOffset how far was the seek?
+ * This is expected to be negative.
+ */
+ @Override
+ public void seekBackwards(long negativeOffset) {
+ seekOperations++;
+ backwardSeekOperations++;
+ bytesBackwardsOnSeek -= negativeOffset;
+ }
+
+ /**
+ * Record a forward seek, adding a seek operation, a forward
+ * seek operation, and any bytes skipped.
+ *
+ * @param skipped number of bytes skipped by reading from the stream.
+ * If the seek was implemented by a close + reopen, set this to zero.
+ */
+ @Override
+ public void seekForwards(long skipped) {
+ seekOperations++;
+ forwardSeekOperations++;
+ if (skipped > 0) {
+ bytesSkippedOnSeek += skipped;
+ }
+ }
+
+ /**
+ * Record a forward or backward seek, adding a seek operation, a forward or
+ * a backward seek operation, and number of bytes skipped.
+ * The seek direction will be calculated based on the parameters.
+ *
+ * @param seekTo seek to the position.
+ * @param currentPos current position.
+ */
+ @Override
+ public void seek(long seekTo, long currentPos) {
+ if (seekTo >= currentPos) {
+ this.seekForwards(seekTo - currentPos);
+ } else {
+ this.seekBackwards(currentPos - seekTo);
+ }
+ }
+
+ /**
+ * Increment the bytes read counter by the number of bytes;
+ * no-op if the argument is negative.
+ *
+ * @param bytes number of bytes read.
+ */
+ @Override
+ public void bytesRead(long bytes) {
+ if (bytes > 0) {
+ bytesRead += bytes;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Total bytes read from the buffer.
+ *
+ * @param bytes number of bytes that are read from buffer.
+ */
+ @Override
+ public void bytesReadFromBuffer(long bytes) {
+ if (bytes > 0) {
+ bytesReadFromBuffer += bytes;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Increment the number of seeks in the buffer.
+ */
+ @Override
+ public void seekInBuffer() {
+ seekInBuffer++;
+ }
+
+ /**
+ * A {@code read(byte[] buf, int off, int len)} operation has started.
+ *
+ * @param pos starting position of the read.
+ * @param len length of bytes to read.
+ */
+ @Override
+ public void readOperationStarted(long pos, long len) {
+ readOperations++;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Increment the counter when a remote read operation occurs.
+ */
+ @Override
+ public void remoteReadOperation() {
+ remoteReadOperations++;
+ }
+
+ public long getSeekOperations() {
+ return seekOperations;
+ }
+
+ public long getForwardSeekOperations() {
+ return forwardSeekOperations;
+ }
+
+ public long getBackwardSeekOperations() {
+ return backwardSeekOperations;
+ }
+
+ public long getBytesRead() {
+ return bytesRead;
+ }
+
+ public long getBytesSkippedOnSeek() {
+ return bytesSkippedOnSeek;
+ }
+
+ public long getBytesBackwardsOnSeek() {
+ return bytesBackwardsOnSeek;
+ }
+
+ public long getSeekInBuffer() {
+ return seekInBuffer;
+ }
+
+ public long getReadOperations() {
+ return readOperations;
+ }
+
+ public long getBytesReadFromBuffer() {
+ return bytesReadFromBuffer;
+ }
+
+ public long getRemoteReadOperations() {
+ return remoteReadOperations;
+ }
+
+ /**
+ * String operator describes all the current statistics.
+ * <b>Important: there are no guarantees as to the stability
+ * of this value.</b>
+ *
+ * @return the current values of the stream statistics.
+ */
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "StreamStatistics{");
+ sb.append(", SeekOperations=").append(seekOperations);
+ sb.append(", ForwardSeekOperations=").append(forwardSeekOperations);
+ sb.append(", BackwardSeekOperations=").append(backwardSeekOperations);
+ sb.append(", BytesSkippedOnSeek=").append(bytesSkippedOnSeek);
+ sb.append(", BytesBackwardsOnSeek=").append(bytesBackwardsOnSeek);
+ sb.append(", seekInBuffer=").append(seekInBuffer);
+ sb.append(", BytesRead=").append(bytesRead);
+ sb.append(", ReadOperations=").append(readOperations);
+ sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer);
+ sb.append(", remoteReadOperations=").append(remoteReadOperations);
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
new file mode 100644
index 0000000..7a62eca
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import org.apache.hadoop.io.IOUtils;
+
+public class ITestAbfsInputStreamStatistics
+ extends AbstractAbfsIntegrationTest {
+ private static final int OPERATIONS = 10;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestAbfsInputStreamStatistics.class);
+ private static final int ONE_MB = 1024 * 1024;
+ private static final int ONE_KB = 1024;
+ private byte[] defBuffer = new byte[ONE_MB];
+
+ public ITestAbfsInputStreamStatistics() throws Exception {
+ }
+
+ /**
+ * Test to check the initial values of the AbfsInputStream statistics.
+ */
+ @Test
+ public void testInitValues() throws IOException {
+ describe("Testing the initial values of AbfsInputStream Statistics");
+
+ AzureBlobFileSystem fs = getFileSystem();
+ AzureBlobFileSystemStore abfss = fs.getAbfsStore();
+ Path initValuesPath = path(getMethodName());
+ AbfsOutputStream outputStream = null;
+ AbfsInputStream inputStream = null;
+
+ try {
+
+ outputStream = createAbfsOutputStreamWithFlushEnabled(fs, initValuesPath);
+ inputStream = abfss.openFileForRead(initValuesPath, fs.getFsStatistics());
+
+ AbfsInputStreamStatisticsImpl stats =
+ (AbfsInputStreamStatisticsImpl) inputStream.getStreamStatistics();
+
+ checkInitValue(stats.getSeekOperations(), "seekOps");
+ checkInitValue(stats.getForwardSeekOperations(), "forwardSeekOps");
+ checkInitValue(stats.getBackwardSeekOperations(), "backwardSeekOps");
+ checkInitValue(stats.getBytesRead(), "bytesRead");
+ checkInitValue(stats.getBytesSkippedOnSeek(), "bytesSkippedOnSeek");
+ checkInitValue(stats.getBytesBackwardsOnSeek(), "bytesBackwardsOnSeek");
+ checkInitValue(stats.getSeekInBuffer(), "seekInBuffer");
+ checkInitValue(stats.getReadOperations(), "readOps");
+ checkInitValue(stats.getBytesReadFromBuffer(), "bytesReadFromBuffer");
+ checkInitValue(stats.getRemoteReadOperations(), "remoteReadOps");
+
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, outputStream, inputStream);
+ }
+ }
+
+ /**
+ * Test to check statistics from seek operation in AbfsInputStream.
+ */
+ @Test
+ public void testSeekStatistics() throws IOException {
+ describe("Testing the values of statistics from seek operations in "
+ + "AbfsInputStream");
+
+ AzureBlobFileSystem fs = getFileSystem();
+ AzureBlobFileSystemStore abfss = fs.getAbfsStore();
+ Path seekStatPath = path(getMethodName());
+
+ AbfsOutputStream out = null;
+ AbfsInputStream in = null;
+
+ try {
+ out = createAbfsOutputStreamWithFlushEnabled(fs, seekStatPath);
+
+ //Writing a default buffer in a file.
+ out.write(defBuffer);
+ out.hflush();
+ in = abfss.openFileForRead(seekStatPath, fs.getFsStatistics());
+
+ /*
+ * Writing 1MB buffer to the file, this would make the fCursor(Current
+ * position of cursor) to the end of file.
+ */
+ int result = in.read(defBuffer, 0, ONE_MB);
+ LOG.info("Result of read : {}", result);
+
+ /*
+ * Seeking to start of file and then back to end would result in a
+ * backward and a forward seek respectively 10 times.
+ */
+ for (int i = 0; i < OPERATIONS; i++) {
+ in.seek(0);
+ in.seek(ONE_MB);
+ }
+
+ AbfsInputStreamStatisticsImpl stats =
+ (AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
+
+ LOG.info("STATISTICS: {}", stats.toString());
+
+ /*
+ * seekOps - Since we are doing backward and forward seek OPERATIONS
+ * times, total seeks would be 2 * OPERATIONS.
+ *
+ * backwardSeekOps - Since we are doing a backward seek inside a loop
+ * for OPERATION times, total backward seeks would be OPERATIONS.
+ *
+ * forwardSeekOps - Since we are doing a forward seek inside a loop
+ * for OPERATION times, total forward seeks would be OPERATIONS.
+ *
+ * bytesBackwardsOnSeek - Since we are doing backward seeks from end of
+ * file in a ONE_MB file each time, this would mean the bytes from
+ * backward seek would be OPERATIONS * ONE_MB. Since this is backward
+ * seek this value is expected be to be negative.
+ *
+ * bytesSkippedOnSeek - Since, we move from start to end in seek, but
+ * our fCursor(position of cursor) always remain at end of file, this
+ * would mean no bytes were skipped on seek. Since, all forward seeks
+ * are in buffer.
+ *
+ * seekInBuffer - Since all seeks were in buffer, the seekInBuffer
+ * would be equal to 2 * OPERATIONS.
+ *
+ */
+ assertEquals("Mismatch in seekOps value", 2 * OPERATIONS,
+ stats.getSeekOperations());
+ assertEquals("Mismatch in backwardSeekOps value", OPERATIONS,
+ stats.getBackwardSeekOperations());
+ assertEquals("Mismatch in forwardSeekOps value", OPERATIONS,
+ stats.getForwardSeekOperations());
+ assertEquals("Mismatch in bytesBackwardsOnSeek value",
+ -1 * OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek());
+ assertEquals("Mismatch in bytesSkippedOnSeek value",
+ 0, stats.getBytesSkippedOnSeek());
+ assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS,
+ stats.getSeekInBuffer());
+
+ in.close();
+ // Verifying whether stats are readable after stream is closed.
+ LOG.info("STATISTICS after closing: {}", stats.toString());
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, out, in);
+ }
+ }
+
+ /**
+ * Test to check statistics value from read operation in AbfsInputStream.
+ */
+ @Test
+ public void testReadStatistics() throws IOException {
+ describe("Testing the values of statistics from read operation in "
+ + "AbfsInputStream");
+
+ AzureBlobFileSystem fs = getFileSystem();
+ AzureBlobFileSystemStore abfss = fs.getAbfsStore();
+ Path readStatPath = path(getMethodName());
+
+ AbfsOutputStream out = null;
+ AbfsInputStream in = null;
+
+ try {
+ out = createAbfsOutputStreamWithFlushEnabled(fs, readStatPath);
+
+ /*
+ * Writing 1MB buffer to the file.
+ */
+ out.write(defBuffer);
+ out.hflush();
+ in = abfss.openFileForRead(readStatPath, fs.getFsStatistics());
+
+ /*
+ * Doing file read 10 times.
+ */
+ for (int i = 0; i < OPERATIONS; i++) {
+ in.read();
+ }
+
+ AbfsInputStreamStatisticsImpl stats =
+ (AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
+
+ LOG.info("STATISTICS: {}", stats.toString());
+
+ /*
+ * bytesRead - Since each time a single byte is read, total
+ * bytes read would be equal to OPERATIONS.
+ *
+ * readOps - Since each time read operation is performed OPERATIONS
+ * times, total number of read operations would be equal to OPERATIONS.
+ *
+ * remoteReadOps - Only a single remote read operation is done. Hence,
+ * total remote read ops is 1.
+ *
+ */
+ assertEquals("Mismatch in bytesRead value", OPERATIONS,
+ stats.getBytesRead());
+ assertEquals("Mismatch in readOps value", OPERATIONS,
+ stats.getReadOperations());
+ assertEquals("Mismatch in remoteReadOps value", 1,
+ stats.getRemoteReadOperations());
+
+ in.close();
+ // Verifying if stats are still readable after stream is closed.
+ LOG.info("STATISTICS after closing: {}", stats.toString());
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, out, in);
+ }
+ }
+
+ /**
+ * Testing AbfsInputStream works with null Statistics.
+ */
+ @Test
+ public void testWithNullStreamStatistics() throws IOException {
+ describe("Testing AbfsInputStream operations with statistics as null");
+
+ AzureBlobFileSystem fs = getFileSystem();
+ Path nullStatFilePath = path(getMethodName());
+ byte[] oneKbBuff = new byte[ONE_KB];
+
+ // Creating an AbfsInputStreamContext instance with null StreamStatistics.
+ AbfsInputStreamContext abfsInputStreamContext =
+ new AbfsInputStreamContext(
+ getConfiguration().getSasTokenRenewPeriodForStreamsInSeconds())
+ .withReadBufferSize(getConfiguration().getReadBufferSize())
+ .withReadAheadQueueDepth(getConfiguration().getReadAheadQueueDepth())
+ .withStreamStatistics(null)
+ .build();
+
+ AbfsOutputStream out = null;
+ AbfsInputStream in = null;
+
+ try {
+ out = createAbfsOutputStreamWithFlushEnabled(fs, nullStatFilePath);
+
+ // Writing a 1KB buffer in the file.
+ out.write(oneKbBuff);
+ out.hflush();
+
+ // AbfsRestOperation Instance required for eTag.
+ AbfsRestOperation abfsRestOperation =
+ fs.getAbfsClient().getPathStatus(nullStatFilePath.toUri().getPath(), false);
+
+ // AbfsInputStream with no StreamStatistics.
+ in = new AbfsInputStream(fs.getAbfsClient(), null,
+ nullStatFilePath.toUri().getPath(), ONE_KB,
+ abfsInputStreamContext,
+ abfsRestOperation.getResult().getResponseHeader("ETag"));
+
+ // Verifying that AbfsInputStream Operations works with null statistics.
+ assertNotEquals("AbfsInputStream read() with null statistics should "
+ + "work", -1, in.read());
+ in.seek(ONE_KB);
+
+ // Verifying toString() with no StreamStatistics.
+ LOG.info("AbfsInputStream: {}", in.toString());
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, out, in);
+ }
+ }
+
+ /**
+ * Method to assert the initial values of the statistics.
+ *
+ * @param actualValue the actual value of the statistics.
+ * @param statistic the name of operation or statistic being asserted.
+ */
+ private void checkInitValue(long actualValue, String statistic) {
+ assertEquals("Mismatch in " + statistic + " value", 0, actualValue);
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsInputStreamStatistics.java
new file mode 100644
index 0000000..22c247f
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsInputStreamStatistics.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
+
+public class TestAbfsInputStreamStatistics extends AbstractAbfsIntegrationTest {
+
+ private static final int OPERATIONS = 100;
+
+ public TestAbfsInputStreamStatistics() throws Exception {
+ }
+
+ /**
+ * Test to check the bytesReadFromBuffer statistic value from AbfsInputStream.
+ */
+ @Test
+ public void testBytesReadFromBufferStatistic() {
+ describe("Testing bytesReadFromBuffer statistics value in AbfsInputStream");
+
+ AbfsInputStreamStatisticsImpl abfsInputStreamStatistics =
+ new AbfsInputStreamStatisticsImpl();
+
+ //Increment the bytesReadFromBuffer value.
+ for (int i = 0; i < OPERATIONS; i++) {
+ abfsInputStreamStatistics.bytesReadFromBuffer(1);
+ }
+
+ /*
+ * Since we incremented the bytesReadFromBuffer OPERATIONS times, this
+ * should be the expected value.
+ */
+ assertEquals("Mismatch in bytesReadFromBuffer value", OPERATIONS,
+ abfsInputStreamStatistics.getBytesReadFromBuffer());
+
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org