You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/06/19 13:03:59 UTC

[hadoop] branch trunk updated: HADOOP-17065. Add Network Counters to ABFS (#2056)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3472c3e  HADOOP-17065. Add Network Counters to ABFS (#2056)
3472c3e is described below

commit 3472c3efc0014237d0cc4d9a989393b8513d2ab6
Author: Mehakmeet Singh <me...@gmail.com>
AuthorDate: Fri Jun 19 18:33:49 2020 +0530

    HADOOP-17065. Add Network Counters to ABFS (#2056)
    
    
    Contributed by Mehakmeet Singh.
---
 ...sInstrumentation.java => AbfsCountersImpl.java} |  13 +-
 .../apache/hadoop/fs/azurebfs/AbfsStatistic.java   |  20 +-
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  15 +-
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  15 +-
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |  25 +-
 .../services/AbfsClientThrottlingAnalyzer.java     |   7 +-
 .../services/AbfsClientThrottlingIntercept.java    |  14 +-
 .../fs/azurebfs/services/AbfsRestOperation.java    |  24 +-
 .../fs/azurebfs/AbstractAbfsIntegrationTest.java   |   3 +-
 .../fs/azurebfs/ITestAbfsNetworkStatistics.java    | 253 +++++++++++++++++++++
 .../hadoop/fs/azurebfs/ITestAbfsStatistics.java    |   2 +-
 .../fs/azurebfs/TestAbfsNetworkStatistics.java     |  67 ++++++
 .../hadoop/fs/azurebfs/TestAbfsStatistics.java     |   2 +-
 .../fs/azurebfs/services/TestAbfsClient.java       |   4 +-
 14 files changed, 430 insertions(+), 34 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
similarity index 96%
rename from hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java
rename to hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
index 9094c40..57cc3ea 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
@@ -41,7 +41,7 @@ import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
 /**
  * Instrumentation of Abfs counters.
  */
-public class AbfsInstrumentation implements AbfsCounters {
+public class AbfsCountersImpl implements AbfsCounters {
 
   /**
    * Single context for all the Abfs counters to separate them from other
@@ -78,10 +78,17 @@ public class AbfsInstrumentation implements AbfsCounters {
       DIRECTORIES_DELETED,
       FILES_CREATED,
       FILES_DELETED,
-      ERROR_IGNORED
+      ERROR_IGNORED,
+      CONNECTIONS_MADE,
+      SEND_REQUESTS,
+      GET_RESPONSES,
+      BYTES_SENT,
+      BYTES_RECEIVED,
+      READ_THROTTLES,
+      WRITE_THROTTLES
   };
 
-  public AbfsInstrumentation(URI uri) {
+  public AbfsCountersImpl(URI uri) {
     UUID fileSystemInstanceId = UUID.randomUUID();
     registry.tag(REGISTRY_ID,
         "A unique identifier for the instance",
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java
index a9867aa..2935cd7 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames;
 
 /**
  * Statistic which are collected in Abfs.
- * Available as metrics in {@link AbfsInstrumentation}.
+ * Available as metrics in {@link AbfsCountersImpl}.
  */
 public enum AbfsStatistic {
 
@@ -57,7 +57,23 @@ public enum AbfsStatistic {
   FILES_DELETED("files_deleted",
       "Total number of files deleted from the object store."),
   ERROR_IGNORED("error_ignored",
-      "Errors caught and ignored.");
+      "Errors caught and ignored."),
+
+  //Network statistics.
+  CONNECTIONS_MADE("connections_made",
+      "Total number of times a connection was made with the data store."),
+  SEND_REQUESTS("send_requests",
+      "Total number of times http requests were sent to the data store."),
+  GET_RESPONSES("get_responses",
+      "Total number of times a response was received."),
+  BYTES_SENT("bytes_sent",
+      "Total bytes uploaded."),
+  BYTES_RECEIVED("bytes_received",
+      "Total bytes received."),
+  READ_THROTTLES("read_throttles",
+      "Total number of times a read operation is throttled."),
+  WRITE_THROTTLES("write_throttles",
+      "Total number of times a write operation is throttled.");
 
   private String statName;
   private String statDescription;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 6694c13..daa1905 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -97,7 +97,7 @@ public class AzureBlobFileSystem extends FileSystem {
 
   private boolean delegationTokenEnabled = false;
   private AbfsDelegationTokenManager delegationTokenManager;
-  private AbfsCounters instrumentation;
+  private AbfsCounters abfsCounters;
 
   @Override
   public void initialize(URI uri, Configuration configuration)
@@ -109,11 +109,12 @@ public class AzureBlobFileSystem extends FileSystem {
     LOG.debug("Initializing AzureBlobFileSystem for {}", uri);
 
     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
-    this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration);
+    abfsCounters = new AbfsCountersImpl(uri);
+    this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(),
+        configuration, abfsCounters);
     LOG.trace("AzureBlobFileSystemStore init complete");
 
     final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration();
-    instrumentation = new AbfsInstrumentation(uri);
     this.setWorkingDirectory(this.getHomeDirectory());
 
     if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
@@ -150,8 +151,8 @@ public class AzureBlobFileSystem extends FileSystem {
     sb.append("uri=").append(uri);
     sb.append(", user='").append(abfsStore.getUser()).append('\'');
     sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
-    if (instrumentation != null) {
-      sb.append(", Statistics: {").append(instrumentation.formString("{", "=",
+    if (abfsCounters != null) {
+      sb.append(", Statistics: {").append(abfsCounters.formString("{", "=",
           "}", true));
       sb.append("}");
     }
@@ -392,7 +393,7 @@ public class AzureBlobFileSystem extends FileSystem {
    * @param statistic the Statistic to be incremented.
    */
   private void incrementStatistic(AbfsStatistic statistic) {
-    instrumentation.incrementCounter(statistic, 1);
+    abfsCounters.incrementCounter(statistic, 1);
   }
 
   /**
@@ -1241,7 +1242,7 @@ public class AzureBlobFileSystem extends FileSystem {
 
   @VisibleForTesting
   Map<String, Long> getInstrumentationMap() {
-    return instrumentation.toMap();
+    return abfsCounters.toMap();
   }
 
   @Override
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index a01d31a..397afc8 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -82,6 +82,7 @@ import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
 import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformerInterface;
 import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
@@ -143,8 +144,9 @@ public class AzureBlobFileSystemStore implements Closeable {
   private final IdentityTransformerInterface identityTransformer;
   private final AbfsPerfTracker abfsPerfTracker;
 
-  public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration)
-          throws IOException {
+  public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
+                                  Configuration configuration,
+                                  AbfsCounters abfsCounters) throws IOException {
     this.uri = uri;
     String[] authorityParts = authorityParts(uri);
     final String fileSystemName = authorityParts[0];
@@ -182,7 +184,7 @@ public class AzureBlobFileSystemStore implements Closeable {
     boolean usingOauth = (authType == AuthType.OAuth);
     boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
     this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
-    initializeClient(uri, fileSystemName, accountName, useHttps);
+    initializeClient(uri, fileSystemName, accountName, useHttps, abfsCounters);
     final Class<? extends IdentityTransformerInterface> identityTransformerClass =
         configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
             IdentityTransformerInterface.class);
@@ -1170,7 +1172,8 @@ public class AzureBlobFileSystemStore implements Closeable {
     return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
   }
 
-  private void initializeClient(URI uri, String fileSystemName, String accountName, boolean isSecure)
+  private void initializeClient(URI uri, String fileSystemName,
+      String accountName, boolean isSecure, AbfsCounters abfsCounters)
       throws IOException {
     if (this.client != null) {
       return;
@@ -1214,11 +1217,11 @@ public class AzureBlobFileSystemStore implements Closeable {
     if (tokenProvider != null) {
       this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
           new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
-          tokenProvider, abfsPerfTracker);
+          tokenProvider, abfsPerfTracker, abfsCounters);
     } else {
       this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
           new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
-          sasTokenProvider, abfsPerfTracker);
+          sasTokenProvider, abfsPerfTracker, abfsCounters);
     }
     LOG.trace("AbfsClient init complete");
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index f104e7b..f614bbd 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -73,11 +73,13 @@ public class AbfsClient implements Closeable {
   private final AuthType authType;
   private AccessTokenProvider tokenProvider;
   private SASTokenProvider sasTokenProvider;
+  private final AbfsCounters abfsCounters;
 
   private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
                     final AbfsConfiguration abfsConfiguration,
                     final ExponentialRetryPolicy exponentialRetryPolicy,
-                    final AbfsPerfTracker abfsPerfTracker) {
+                    final AbfsPerfTracker abfsPerfTracker,
+                    final AbfsCounters abfsCounters) {
     this.baseUrl = baseUrl;
     this.sharedKeyCredentials = sharedKeyCredentials;
     String baseUrlString = baseUrl.toString();
@@ -104,14 +106,17 @@ public class AbfsClient implements Closeable {
 
     this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
     this.abfsPerfTracker = abfsPerfTracker;
+    this.abfsCounters = abfsCounters;
   }
 
   public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
                     final AbfsConfiguration abfsConfiguration,
                     final ExponentialRetryPolicy exponentialRetryPolicy,
                     final AccessTokenProvider tokenProvider,
-                    final AbfsPerfTracker abfsPerfTracker) {
-    this(baseUrl, sharedKeyCredentials, abfsConfiguration, exponentialRetryPolicy, abfsPerfTracker);
+                    final AbfsPerfTracker abfsPerfTracker,
+                    final AbfsCounters abfsCounters) {
+    this(baseUrl, sharedKeyCredentials, abfsConfiguration,
+        exponentialRetryPolicy, abfsPerfTracker, abfsCounters);
     this.tokenProvider = tokenProvider;
   }
 
@@ -119,8 +124,10 @@ public class AbfsClient implements Closeable {
                     final AbfsConfiguration abfsConfiguration,
                     final ExponentialRetryPolicy exponentialRetryPolicy,
                     final SASTokenProvider sasTokenProvider,
-                    final AbfsPerfTracker abfsPerfTracker) {
-    this(baseUrl, sharedKeyCredentials, abfsConfiguration, exponentialRetryPolicy, abfsPerfTracker);
+                    final AbfsPerfTracker abfsPerfTracker,
+                    final AbfsCounters abfsCounters) {
+    this(baseUrl, sharedKeyCredentials, abfsConfiguration,
+        exponentialRetryPolicy, abfsPerfTracker, abfsCounters);
     this.sasTokenProvider = sasTokenProvider;
   }
 
@@ -892,4 +899,12 @@ public class AbfsClient implements Closeable {
   public SASTokenProvider getSasTokenProvider() {
     return this.sasTokenProvider;
   }
+
+  /**
+   * Getter for abfsCounters from AbfsClient.
+   * @return AbfsCounters instance.
+   */
+  protected AbfsCounters getAbfsCounters() {
+    return abfsCounters;
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
index f1e5aaa..e1a799b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
@@ -114,16 +114,19 @@ class AbfsClientThrottlingAnalyzer {
 
   /**
    * Suspends the current storage operation, as necessary, to reduce throughput.
+   * @return true if Thread sleeps(Throttling occurs) else false.
    */
-  public void suspendIfNecessary() {
+  public boolean suspendIfNecessary() {
     int duration = sleepDuration;
     if (duration > 0) {
       try {
         Thread.sleep(duration);
+        return true;
       } catch (InterruptedException ie) {
         Thread.currentThread().interrupt();
       }
     }
+    return false;
   }
 
   @VisibleForTesting
@@ -269,4 +272,4 @@ class AbfsClientThrottlingAnalyzer {
       this.operationsSuccessful = new AtomicLong();
     }
   }
-}
\ No newline at end of file
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
index 1c6ce17..7303e83 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
@@ -23,6 +23,7 @@ import java.net.HttpURLConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 
 /**
@@ -103,17 +104,24 @@ public final class AbfsClientThrottlingIntercept {
    * uses this to suspend the request, if necessary, to minimize errors and
    * maximize throughput.
    */
-  static void sendingRequest(AbfsRestOperationType operationType) {
+  static void sendingRequest(AbfsRestOperationType operationType,
+      AbfsCounters abfsCounters) {
     if (!isAutoThrottlingEnabled) {
       return;
     }
 
     switch (operationType) {
       case ReadFile:
-        singleton.readThrottler.suspendIfNecessary();
+        if (singleton.readThrottler.suspendIfNecessary()
+            && abfsCounters != null) {
+          abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1);
+        }
         break;
       case Append:
-        singleton.writeThrottler.suspendIfNecessary();
+        if (singleton.writeThrottler.suspendIfNecessary()
+            && abfsCounters != null) {
+          abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1);
+        }
         break;
       default:
         break;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index 521da96..f3986d4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -27,6 +27,7 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
@@ -66,6 +67,7 @@ public class AbfsRestOperation {
   private int retryCount = 0;
 
   private AbfsHttpOperation result;
+  private AbfsCounters abfsCounters;
 
   public AbfsHttpOperation getResult() {
     return result;
@@ -131,6 +133,7 @@ public class AbfsRestOperation {
     this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method)
             || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
     this.sasToken = sasToken;
+    this.abfsCounters = client.getAbfsCounters();
   }
 
   /**
@@ -160,6 +163,7 @@ public class AbfsRestOperation {
     this.buffer = buffer;
     this.bufferOffset = bufferOffset;
     this.bufferLength = bufferLength;
+    this.abfsCounters = client.getAbfsCounters();
   }
 
   /**
@@ -205,6 +209,7 @@ public class AbfsRestOperation {
     try {
       // initialize the HTTP request and open the connection
       httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
+      incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1);
 
       switch(client.getAuthType()) {
         case Custom:
@@ -229,14 +234,19 @@ public class AbfsRestOperation {
       // dump the headers
       AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
           httpOperation.getConnection().getRequestProperties());
-      AbfsClientThrottlingIntercept.sendingRequest(operationType);
+      AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters);
 
       if (hasRequestBody) {
         // HttpUrlConnection requires
         httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
+        incrementCounter(AbfsStatistic.SEND_REQUESTS, 1);
+        incrementCounter(AbfsStatistic.BYTES_SENT, bufferLength);
       }
 
       httpOperation.processResponse(buffer, bufferOffset, bufferLength);
+      incrementCounter(AbfsStatistic.GET_RESPONSES, 1);
+      incrementCounter(AbfsStatistic.BYTES_RECEIVED,
+          httpOperation.getBytesReceived());
     } catch (IOException ex) {
       if (ex instanceof UnknownHostException) {
         LOG.warn(String.format("Unknown host name: %s. Retrying to resolve the host name...", httpOperation.getUrl().getHost()));
@@ -276,4 +286,16 @@ public class AbfsRestOperation {
 
     return true;
   }
+
+  /**
+   * Incrementing Abfs counters with a long value.
+   *
+   * @param statistic the Abfs statistic that needs to be incremented.
+   * @param value     the value to be incremented by.
+   */
+  private void incrementCounter(AbfsStatistic statistic, long value) {
+    if (abfsCounters != null) {
+      abfsCounters.incrementCounter(statistic, value);
+    }
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index f41cbd6..a80bee6 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -436,9 +436,10 @@ public abstract class AbstractAbfsIntegrationTest extends
    * @param metricMap map of (String, Long) with statistics name as key and
    *                  statistics value as map value.
    */
-  protected void assertAbfsStatistics(AbfsStatistic statistic,
+  protected long assertAbfsStatistics(AbfsStatistic statistic,
       long expectedValue, Map<String, Long> metricMap) {
     assertEquals("Mismatch in " + statistic.getStatName(), expectedValue,
         (long) metricMap.get(statistic.getStatName()));
+    return expectedValue;
   }
 }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
new file mode 100644
index 0000000..904fdf3
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+
+public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestAbfsNetworkStatistics.class);
+  private static final int LARGE_OPERATIONS = 10;
+
+  public ITestAbfsNetworkStatistics() throws Exception {
+  }
+
+  /**
+   * Testing connections_made, send_request and bytes_send statistics in
+   * {@link AbfsRestOperation}.
+   */
+  @Test
+  public void testAbfsHttpSendStatistics() throws IOException {
+    describe("Test to check correct values of statistics after Abfs http send "
+        + "request is done.");
+
+    AzureBlobFileSystem fs = getFileSystem();
+    Map<String, Long> metricMap;
+    Path sendRequestPath = path(getMethodName());
+    String testNetworkStatsString = "http_send";
+    long connectionsMade, requestsSent, bytesSent;
+
+    /*
+     * Creating AbfsOutputStream will result in 1 connection made and 1 send
+     * request.
+     */
+    try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
+        sendRequestPath)) {
+      out.write(testNetworkStatsString.getBytes());
+
+      /*
+       * Flushes all outstanding data (i.e. the current unfinished packet)
+       * from the client into the service on all DataNode replicas.
+       */
+      out.hflush();
+
+      metricMap = fs.getInstrumentationMap();
+
+      /*
+       * Testing the network stats with 1 write operation.
+       *
+       * connections_made : 3(getFileSystem()) + 1(AbfsOutputStream) + 2(flush).
+       *
+       * send_requests : 1(getFileSystem()) + 1(AbfsOutputStream) + 2(flush).
+       *
+       * bytes_sent : bytes wrote in AbfsOutputStream.
+       */
+      connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
+          6, metricMap);
+      requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 4,
+          metricMap);
+      bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
+          testNetworkStatsString.getBytes().length, metricMap);
+
+    }
+
+    // To close the AbfsOutputStream 1 connection is made and 1 request is sent.
+    connectionsMade++;
+    requestsSent++;
+
+    try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
+        sendRequestPath)) {
+
+      for (int i = 0; i < LARGE_OPERATIONS; i++) {
+        out.write(testNetworkStatsString.getBytes());
+
+        /*
+         * 1 flush call would create 2 connections and 2 send requests.
+         * when hflush() is called it will essentially trigger append() and
+         * flush() inside AbfsRestOperation. Both of which calls
+         * executeHttpOperation() method which creates a connection and sends
+         * requests.
+         */
+        out.hflush();
+      }
+
+      metricMap = fs.getInstrumentationMap();
+
+      /*
+       * Testing the network stats with Large amount of bytes sent.
+       *
+       * connections made : connections_made(Last assertion) + 1
+       * (AbfsOutputStream) + LARGE_OPERATIONS * 2(flush).
+       *
+       * send requests : requests_sent(Last assertion) + 1(AbfsOutputStream) +
+       * LARGE_OPERATIONS * 2(flush).
+       *
+       * bytes sent : bytes_sent(Last assertion) + LARGE_OPERATIONS * (bytes
+       * wrote each time).
+       *
+       */
+      assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
+          connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap);
+      assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS,
+          requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap);
+      assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
+          bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
+          metricMap);
+
+    }
+
+  }
+
+  /**
+   * Testing get_response and bytes_received in {@link AbfsRestOperation}.
+   */
+  @Test
+  public void testAbfsHttpResponseStatistics() throws IOException {
+    describe("Test to check correct values of statistics after Http "
+        + "Response is processed.");
+
+    AzureBlobFileSystem fs = getFileSystem();
+    Path getResponsePath = path(getMethodName());
+    Map<String, Long> metricMap;
+    String testResponseString = "some response";
+    long getResponses, bytesReceived;
+
+    FSDataOutputStream out = null;
+    FSDataInputStream in = null;
+    try {
+
+      /*
+       * Creating a File and writing some bytes in it.
+       *
+       * get_response : 3(getFileSystem) + 1(OutputStream creation) + 2
+       * (Writing data in Data store).
+       *
+       */
+      out = fs.create(getResponsePath);
+      out.write(testResponseString.getBytes());
+      out.hflush();
+
+      // open would require 1 get response.
+      in = fs.open(getResponsePath);
+      // read would require 1 get response and also get the bytes received.
+      int result = in.read();
+
+      // Confirming read isn't -1.
+      LOG.info("Result of read operation : {}", result);
+
+      metricMap = fs.getInstrumentationMap();
+
+      /*
+       * Testing values of statistics after writing and reading a buffer.
+       *
+       * get_responses - 6(above operations) + 1(open()) + 1 (read()).
+       *
+       * bytes_received - This should be equal to bytes sent earlier.
+       */
+      getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 8,
+          metricMap);
+      // Testing that bytes received is equal to bytes sent.
+      long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName());
+      bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
+          bytesSend,
+          metricMap);
+
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, out, in);
+    }
+
+    // To close the streams 1 response is received.
+    getResponses++;
+
+    try {
+
+      /*
+       * Creating a file and writing buffer into it. Also recording the
+       * buffer for future read() call.
+       * This creating outputStream and writing requires 2 *
+       * (LARGE_OPERATIONS) get requests.
+       */
+      StringBuilder largeBuffer = new StringBuilder();
+      out = fs.create(getResponsePath);
+      for (int i = 0; i < LARGE_OPERATIONS; i++) {
+        out.write(testResponseString.getBytes());
+        out.hflush();
+        largeBuffer.append(testResponseString);
+      }
+
+      // Open requires 1 get_response.
+      in = fs.open(getResponsePath);
+
+      /*
+       * Reading the file which was written above. This read() call would
+       * read bytes equal to the bytes that was written above.
+       * Get response would be 1 only.
+       */
+      in.read(0, largeBuffer.toString().getBytes(), 0,
+          largeBuffer.toString().getBytes().length);
+
+      metricMap = fs.getInstrumentationMap();
+
+      /*
+       * Testing the statistics values after writing and reading a large buffer.
+       *
+       * get_response : get_responses(Last assertion) + 1
+       * (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing
+       * LARGE_OPERATIONS times) + 1(open()) + 1(read()).
+       *
+       * bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS *
+       * bytes wrote each time (bytes_received is equal to bytes wrote in the
+       * File).
+       *
+       */
+      assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
+          bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length),
+          metricMap);
+      assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
+          getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap);
+
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, out, in);
+    }
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java
index c88dc84..4220580 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java
@@ -45,7 +45,7 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
     describe("Testing the initial values of Abfs counters");
 
     AbfsCounters abfsCounters =
-        new AbfsInstrumentation(getFileSystem().getUri());
+        new AbfsCountersImpl(getFileSystem().getUri());
     Map<String, Long> metricMap = abfsCounters.toMap();
 
     for (Map.Entry<String, Long> entry : metricMap.entrySet()) {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java
new file mode 100644
index 0000000..0639cf2
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
+
+public class TestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
+
+  private static final int LARGE_OPERATIONS = 1000;
+
+  public TestAbfsNetworkStatistics() throws Exception {
+  }
+
+  /**
+   * Test to check correct values of read and write throttling statistics in
+   * {@code AbfsClientThrottlingAnalyzer}.
+   */
+  @Test
+  public void testAbfsThrottlingStatistics() throws IOException {
+    describe("Test to check correct values of read throttle and write "
+        + "throttle statistics in Abfs");
+
+    AbfsCounters statistics =
+        new AbfsCountersImpl(getFileSystem().getUri());
+
+    /*
+     * Calling the throttle methods to check correct summation and values of
+     * the counters.
+     */
+    for (int i = 0; i < LARGE_OPERATIONS; i++) {
+      statistics.incrementCounter(AbfsStatistic.READ_THROTTLES, 1);
+      statistics.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1);
+    }
+
+    Map<String, Long> metricMap = statistics.toMap();
+
+    /*
+     * Test to check read and write throttle statistics gave correct values for
+     * 1000 calls.
+     */
+    assertAbfsStatistics(AbfsStatistic.READ_THROTTLES, LARGE_OPERATIONS,
+        metricMap);
+    assertAbfsStatistics(AbfsStatistic.WRITE_THROTTLES, LARGE_OPERATIONS,
+        metricMap);
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java
index 20d96fa..f831d2d 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java
@@ -43,7 +43,7 @@ public class TestAbfsStatistics extends AbstractAbfsIntegrationTest {
     describe("Testing the counter values after Abfs is initialised");
 
     AbfsCounters instrumentation =
-        new AbfsInstrumentation(getFileSystem().getUri());
+        new AbfsCountersImpl(getFileSystem().getUri());
 
     //Testing summation of the counter values.
     for (int i = 0; i < LARGE_OPS; i++) {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
index 0fd65fb..8197e7e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
@@ -100,7 +100,7 @@ public final class TestAbfsClient {
   private String getUserAgentString(AbfsConfiguration config,
       boolean includeSSLProvider) throws MalformedURLException {
     AbfsClient client = new AbfsClient(new URL("https://azure.com"), null,
-        config, null, (AccessTokenProvider) null, null);
+        config, null, (AccessTokenProvider) null, null, null);
     String sslProviderName = null;
     if (includeSSLProvider) {
       sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory()
@@ -267,7 +267,7 @@ public final class TestAbfsClient {
         (currentAuthType == AuthType.OAuth
             ? abfsConfig.getTokenProvider()
             : null),
-        tracker);
+        tracker, null);
 
     return testClient;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org