You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/06/19 13:03:59 UTC
[hadoop] branch trunk updated: HADOOP-17065. Add Network Counters
to ABFS (#2056)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3472c3e HADOOP-17065. Add Network Counters to ABFS (#2056)
3472c3e is described below
commit 3472c3efc0014237d0cc4d9a989393b8513d2ab6
Author: Mehakmeet Singh <me...@gmail.com>
AuthorDate: Fri Jun 19 18:33:49 2020 +0530
HADOOP-17065. Add Network Counters to ABFS (#2056)
Contributed by Mehakmeet Singh.
---
...sInstrumentation.java => AbfsCountersImpl.java} | 13 +-
.../apache/hadoop/fs/azurebfs/AbfsStatistic.java | 20 +-
.../hadoop/fs/azurebfs/AzureBlobFileSystem.java | 15 +-
.../fs/azurebfs/AzureBlobFileSystemStore.java | 15 +-
.../hadoop/fs/azurebfs/services/AbfsClient.java | 25 +-
.../services/AbfsClientThrottlingAnalyzer.java | 7 +-
.../services/AbfsClientThrottlingIntercept.java | 14 +-
.../fs/azurebfs/services/AbfsRestOperation.java | 24 +-
.../fs/azurebfs/AbstractAbfsIntegrationTest.java | 3 +-
.../fs/azurebfs/ITestAbfsNetworkStatistics.java | 253 +++++++++++++++++++++
.../hadoop/fs/azurebfs/ITestAbfsStatistics.java | 2 +-
.../fs/azurebfs/TestAbfsNetworkStatistics.java | 67 ++++++
.../hadoop/fs/azurebfs/TestAbfsStatistics.java | 2 +-
.../fs/azurebfs/services/TestAbfsClient.java | 4 +-
14 files changed, 430 insertions(+), 34 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
similarity index 96%
rename from hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java
rename to hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
index 9094c40..57cc3ea 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
@@ -41,7 +41,7 @@ import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
/**
* Instrumentation of Abfs counters.
*/
-public class AbfsInstrumentation implements AbfsCounters {
+public class AbfsCountersImpl implements AbfsCounters {
/**
* Single context for all the Abfs counters to separate them from other
@@ -78,10 +78,17 @@ public class AbfsInstrumentation implements AbfsCounters {
DIRECTORIES_DELETED,
FILES_CREATED,
FILES_DELETED,
- ERROR_IGNORED
+ ERROR_IGNORED,
+ CONNECTIONS_MADE,
+ SEND_REQUESTS,
+ GET_RESPONSES,
+ BYTES_SENT,
+ BYTES_RECEIVED,
+ READ_THROTTLES,
+ WRITE_THROTTLES
};
- public AbfsInstrumentation(URI uri) {
+ public AbfsCountersImpl(URI uri) {
UUID fileSystemInstanceId = UUID.randomUUID();
registry.tag(REGISTRY_ID,
"A unique identifier for the instance",
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java
index a9867aa..2935cd7 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames;
/**
* Statistic which are collected in Abfs.
- * Available as metrics in {@link AbfsInstrumentation}.
+ * Available as metrics in {@link AbfsCountersImpl}.
*/
public enum AbfsStatistic {
@@ -57,7 +57,23 @@ public enum AbfsStatistic {
FILES_DELETED("files_deleted",
"Total number of files deleted from the object store."),
ERROR_IGNORED("error_ignored",
- "Errors caught and ignored.");
+ "Errors caught and ignored."),
+
+ //Network statistics.
+ CONNECTIONS_MADE("connections_made",
+ "Total number of times a connection was made with the data store."),
+ SEND_REQUESTS("send_requests",
+ "Total number of times http requests were sent to the data store."),
+ GET_RESPONSES("get_responses",
+ "Total number of times a response was received."),
+ BYTES_SENT("bytes_sent",
+ "Total bytes uploaded."),
+ BYTES_RECEIVED("bytes_received",
+ "Total bytes received."),
+ READ_THROTTLES("read_throttles",
+ "Total number of times a read operation is throttled."),
+ WRITE_THROTTLES("write_throttles",
+ "Total number of times a write operation is throttled.");
private String statName;
private String statDescription;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 6694c13..daa1905 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -97,7 +97,7 @@ public class AzureBlobFileSystem extends FileSystem {
private boolean delegationTokenEnabled = false;
private AbfsDelegationTokenManager delegationTokenManager;
- private AbfsCounters instrumentation;
+ private AbfsCounters abfsCounters;
@Override
public void initialize(URI uri, Configuration configuration)
@@ -109,11 +109,12 @@ public class AzureBlobFileSystem extends FileSystem {
LOG.debug("Initializing AzureBlobFileSystem for {}", uri);
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
- this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration);
+ abfsCounters = new AbfsCountersImpl(uri);
+ this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(),
+ configuration, abfsCounters);
LOG.trace("AzureBlobFileSystemStore init complete");
final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration();
- instrumentation = new AbfsInstrumentation(uri);
this.setWorkingDirectory(this.getHomeDirectory());
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
@@ -150,8 +151,8 @@ public class AzureBlobFileSystem extends FileSystem {
sb.append("uri=").append(uri);
sb.append(", user='").append(abfsStore.getUser()).append('\'');
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
- if (instrumentation != null) {
- sb.append(", Statistics: {").append(instrumentation.formString("{", "=",
+ if (abfsCounters != null) {
+ sb.append(", Statistics: {").append(abfsCounters.formString("{", "=",
"}", true));
sb.append("}");
}
@@ -392,7 +393,7 @@ public class AzureBlobFileSystem extends FileSystem {
* @param statistic the Statistic to be incremented.
*/
private void incrementStatistic(AbfsStatistic statistic) {
- instrumentation.incrementCounter(statistic, 1);
+ abfsCounters.incrementCounter(statistic, 1);
}
/**
@@ -1241,7 +1242,7 @@ public class AzureBlobFileSystem extends FileSystem {
@VisibleForTesting
Map<String, Long> getInstrumentationMap() {
- return instrumentation.toMap();
+ return abfsCounters.toMap();
}
@Override
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index a01d31a..397afc8 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -82,6 +82,7 @@ import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformerInterface;
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
@@ -143,8 +144,9 @@ public class AzureBlobFileSystemStore implements Closeable {
private final IdentityTransformerInterface identityTransformer;
private final AbfsPerfTracker abfsPerfTracker;
- public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration)
- throws IOException {
+ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
+ Configuration configuration,
+ AbfsCounters abfsCounters) throws IOException {
this.uri = uri;
String[] authorityParts = authorityParts(uri);
final String fileSystemName = authorityParts[0];
@@ -182,7 +184,7 @@ public class AzureBlobFileSystemStore implements Closeable {
boolean usingOauth = (authType == AuthType.OAuth);
boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
- initializeClient(uri, fileSystemName, accountName, useHttps);
+ initializeClient(uri, fileSystemName, accountName, useHttps, abfsCounters);
final Class<? extends IdentityTransformerInterface> identityTransformerClass =
configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
IdentityTransformerInterface.class);
@@ -1170,7 +1172,8 @@ public class AzureBlobFileSystemStore implements Closeable {
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
}
- private void initializeClient(URI uri, String fileSystemName, String accountName, boolean isSecure)
+ private void initializeClient(URI uri, String fileSystemName,
+ String accountName, boolean isSecure, AbfsCounters abfsCounters)
throws IOException {
if (this.client != null) {
return;
@@ -1214,11 +1217,11 @@ public class AzureBlobFileSystemStore implements Closeable {
if (tokenProvider != null) {
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
- tokenProvider, abfsPerfTracker);
+ tokenProvider, abfsPerfTracker, abfsCounters);
} else {
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
- sasTokenProvider, abfsPerfTracker);
+ sasTokenProvider, abfsPerfTracker, abfsCounters);
}
LOG.trace("AbfsClient init complete");
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index f104e7b..f614bbd 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -73,11 +73,13 @@ public class AbfsClient implements Closeable {
private final AuthType authType;
private AccessTokenProvider tokenProvider;
private SASTokenProvider sasTokenProvider;
+ private final AbfsCounters abfsCounters;
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final ExponentialRetryPolicy exponentialRetryPolicy,
- final AbfsPerfTracker abfsPerfTracker) {
+ final AbfsPerfTracker abfsPerfTracker,
+ final AbfsCounters abfsCounters) {
this.baseUrl = baseUrl;
this.sharedKeyCredentials = sharedKeyCredentials;
String baseUrlString = baseUrl.toString();
@@ -104,14 +106,17 @@ public class AbfsClient implements Closeable {
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
this.abfsPerfTracker = abfsPerfTracker;
+ this.abfsCounters = abfsCounters;
}
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final ExponentialRetryPolicy exponentialRetryPolicy,
final AccessTokenProvider tokenProvider,
- final AbfsPerfTracker abfsPerfTracker) {
- this(baseUrl, sharedKeyCredentials, abfsConfiguration, exponentialRetryPolicy, abfsPerfTracker);
+ final AbfsPerfTracker abfsPerfTracker,
+ final AbfsCounters abfsCounters) {
+ this(baseUrl, sharedKeyCredentials, abfsConfiguration,
+ exponentialRetryPolicy, abfsPerfTracker, abfsCounters);
this.tokenProvider = tokenProvider;
}
@@ -119,8 +124,10 @@ public class AbfsClient implements Closeable {
final AbfsConfiguration abfsConfiguration,
final ExponentialRetryPolicy exponentialRetryPolicy,
final SASTokenProvider sasTokenProvider,
- final AbfsPerfTracker abfsPerfTracker) {
- this(baseUrl, sharedKeyCredentials, abfsConfiguration, exponentialRetryPolicy, abfsPerfTracker);
+ final AbfsPerfTracker abfsPerfTracker,
+ final AbfsCounters abfsCounters) {
+ this(baseUrl, sharedKeyCredentials, abfsConfiguration,
+ exponentialRetryPolicy, abfsPerfTracker, abfsCounters);
this.sasTokenProvider = sasTokenProvider;
}
@@ -892,4 +899,12 @@ public class AbfsClient implements Closeable {
public SASTokenProvider getSasTokenProvider() {
return this.sasTokenProvider;
}
+
+ /**
+ * Getter for abfsCounters from AbfsClient.
+ * @return AbfsCounters instance.
+ */
+ protected AbfsCounters getAbfsCounters() {
+ return abfsCounters;
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
index f1e5aaa..e1a799b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
@@ -114,16 +114,19 @@ class AbfsClientThrottlingAnalyzer {
/**
* Suspends the current storage operation, as necessary, to reduce throughput.
+ * @return true if Thread sleeps(Throttling occurs) else false.
*/
- public void suspendIfNecessary() {
+ public boolean suspendIfNecessary() {
int duration = sleepDuration;
if (duration > 0) {
try {
Thread.sleep(duration);
+ return true;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
+ return false;
}
@VisibleForTesting
@@ -269,4 +272,4 @@ class AbfsClientThrottlingAnalyzer {
this.operationsSuccessful = new AtomicLong();
}
}
-}
\ No newline at end of file
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
index 1c6ce17..7303e83 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
@@ -23,6 +23,7 @@ import java.net.HttpURLConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
/**
@@ -103,17 +104,24 @@ public final class AbfsClientThrottlingIntercept {
* uses this to suspend the request, if necessary, to minimize errors and
* maximize throughput.
*/
- static void sendingRequest(AbfsRestOperationType operationType) {
+ static void sendingRequest(AbfsRestOperationType operationType,
+ AbfsCounters abfsCounters) {
if (!isAutoThrottlingEnabled) {
return;
}
switch (operationType) {
case ReadFile:
- singleton.readThrottler.suspendIfNecessary();
+ if (singleton.readThrottler.suspendIfNecessary()
+ && abfsCounters != null) {
+ abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1);
+ }
break;
case Append:
- singleton.writeThrottler.suspendIfNecessary();
+ if (singleton.writeThrottler.suspendIfNecessary()
+ && abfsCounters != null) {
+ abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1);
+ }
break;
default:
break;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index 521da96..f3986d4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -27,6 +27,7 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
@@ -66,6 +67,7 @@ public class AbfsRestOperation {
private int retryCount = 0;
private AbfsHttpOperation result;
+ private AbfsCounters abfsCounters;
public AbfsHttpOperation getResult() {
return result;
@@ -131,6 +133,7 @@ public class AbfsRestOperation {
this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method)
|| AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
this.sasToken = sasToken;
+ this.abfsCounters = client.getAbfsCounters();
}
/**
@@ -160,6 +163,7 @@ public class AbfsRestOperation {
this.buffer = buffer;
this.bufferOffset = bufferOffset;
this.bufferLength = bufferLength;
+ this.abfsCounters = client.getAbfsCounters();
}
/**
@@ -205,6 +209,7 @@ public class AbfsRestOperation {
try {
// initialize the HTTP request and open the connection
httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
+ incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1);
switch(client.getAuthType()) {
case Custom:
@@ -229,14 +234,19 @@ public class AbfsRestOperation {
// dump the headers
AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
httpOperation.getConnection().getRequestProperties());
- AbfsClientThrottlingIntercept.sendingRequest(operationType);
+ AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters);
if (hasRequestBody) {
// HttpUrlConnection requires
httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
+ incrementCounter(AbfsStatistic.SEND_REQUESTS, 1);
+ incrementCounter(AbfsStatistic.BYTES_SENT, bufferLength);
}
httpOperation.processResponse(buffer, bufferOffset, bufferLength);
+ incrementCounter(AbfsStatistic.GET_RESPONSES, 1);
+ incrementCounter(AbfsStatistic.BYTES_RECEIVED,
+ httpOperation.getBytesReceived());
} catch (IOException ex) {
if (ex instanceof UnknownHostException) {
LOG.warn(String.format("Unknown host name: %s. Retrying to resolve the host name...", httpOperation.getUrl().getHost()));
@@ -276,4 +286,16 @@ public class AbfsRestOperation {
return true;
}
+
+ /**
+ * Incrementing Abfs counters with a long value.
+ *
+ * @param statistic the Abfs statistic that needs to be incremented.
+ * @param value the value to be incremented by.
+ */
+ private void incrementCounter(AbfsStatistic statistic, long value) {
+ if (abfsCounters != null) {
+ abfsCounters.incrementCounter(statistic, value);
+ }
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index f41cbd6..a80bee6 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -436,9 +436,10 @@ public abstract class AbstractAbfsIntegrationTest extends
* @param metricMap map of (String, Long) with statistics name as key and
* statistics value as map value.
*/
- protected void assertAbfsStatistics(AbfsStatistic statistic,
+ protected long assertAbfsStatistics(AbfsStatistic statistic,
long expectedValue, Map<String, Long> metricMap) {
assertEquals("Mismatch in " + statistic.getStatName(), expectedValue,
(long) metricMap.get(statistic.getStatName()));
+ return expectedValue;
}
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
new file mode 100644
index 0000000..904fdf3
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+
+public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestAbfsNetworkStatistics.class);
+ private static final int LARGE_OPERATIONS = 10;
+
+ public ITestAbfsNetworkStatistics() throws Exception {
+ }
+
+ /**
+ * Testing connections_made, send_request and bytes_send statistics in
+ * {@link AbfsRestOperation}.
+ */
+ @Test
+ public void testAbfsHttpSendStatistics() throws IOException {
+ describe("Test to check correct values of statistics after Abfs http send "
+ + "request is done.");
+
+ AzureBlobFileSystem fs = getFileSystem();
+ Map<String, Long> metricMap;
+ Path sendRequestPath = path(getMethodName());
+ String testNetworkStatsString = "http_send";
+ long connectionsMade, requestsSent, bytesSent;
+
+ /*
+ * Creating AbfsOutputStream will result in 1 connection made and 1 send
+ * request.
+ */
+ try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
+ sendRequestPath)) {
+ out.write(testNetworkStatsString.getBytes());
+
+ /*
+ * Flushes all outstanding data (i.e. the current unfinished packet)
+ * from the client into the service on all DataNode replicas.
+ */
+ out.hflush();
+
+ metricMap = fs.getInstrumentationMap();
+
+ /*
+ * Testing the network stats with 1 write operation.
+ *
+ * connections_made : 3(getFileSystem()) + 1(AbfsOutputStream) + 2(flush).
+ *
+ * send_requests : 1(getFileSystem()) + 1(AbfsOutputStream) + 2(flush).
+ *
+ * bytes_sent : bytes wrote in AbfsOutputStream.
+ */
+ connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
+ 6, metricMap);
+ requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 4,
+ metricMap);
+ bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
+ testNetworkStatsString.getBytes().length, metricMap);
+
+ }
+
+ // To close the AbfsOutputStream 1 connection is made and 1 request is sent.
+ connectionsMade++;
+ requestsSent++;
+
+ try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
+ sendRequestPath)) {
+
+ for (int i = 0; i < LARGE_OPERATIONS; i++) {
+ out.write(testNetworkStatsString.getBytes());
+
+ /*
+ * 1 flush call would create 2 connections and 2 send requests.
+ * when hflush() is called it will essentially trigger append() and
+ * flush() inside AbfsRestOperation. Both of which calls
+ * executeHttpOperation() method which creates a connection and sends
+ * requests.
+ */
+ out.hflush();
+ }
+
+ metricMap = fs.getInstrumentationMap();
+
+ /*
+ * Testing the network stats with Large amount of bytes sent.
+ *
+ * connections made : connections_made(Last assertion) + 1
+ * (AbfsOutputStream) + LARGE_OPERATIONS * 2(flush).
+ *
+ * send requests : requests_sent(Last assertion) + 1(AbfsOutputStream) +
+ * LARGE_OPERATIONS * 2(flush).
+ *
+ * bytes sent : bytes_sent(Last assertion) + LARGE_OPERATIONS * (bytes
+ * wrote each time).
+ *
+ */
+ assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
+ connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap);
+ assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS,
+ requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap);
+ assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
+ bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
+ metricMap);
+
+ }
+
+ }
+
+ /**
+ * Testing get_response and bytes_received in {@link AbfsRestOperation}.
+ */
+ @Test
+ public void testAbfsHttpResponseStatistics() throws IOException {
+ describe("Test to check correct values of statistics after Http "
+ + "Response is processed.");
+
+ AzureBlobFileSystem fs = getFileSystem();
+ Path getResponsePath = path(getMethodName());
+ Map<String, Long> metricMap;
+ String testResponseString = "some response";
+ long getResponses, bytesReceived;
+
+ FSDataOutputStream out = null;
+ FSDataInputStream in = null;
+ try {
+
+ /*
+ * Creating a File and writing some bytes in it.
+ *
+ * get_response : 3(getFileSystem) + 1(OutputStream creation) + 2
+ * (Writing data in Data store).
+ *
+ */
+ out = fs.create(getResponsePath);
+ out.write(testResponseString.getBytes());
+ out.hflush();
+
+ // open would require 1 get response.
+ in = fs.open(getResponsePath);
+ // read would require 1 get response and also get the bytes received.
+ int result = in.read();
+
+ // Confirming read isn't -1.
+ LOG.info("Result of read operation : {}", result);
+
+ metricMap = fs.getInstrumentationMap();
+
+ /*
+ * Testing values of statistics after writing and reading a buffer.
+ *
+ * get_responses - 6(above operations) + 1(open()) + 1 (read()).
+ *
+ * bytes_received - This should be equal to bytes sent earlier.
+ */
+ getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 8,
+ metricMap);
+ // Testing that bytes received is equal to bytes sent.
+ long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName());
+ bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
+ bytesSend,
+ metricMap);
+
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, out, in);
+ }
+
+ // To close the streams 1 response is received.
+ getResponses++;
+
+ try {
+
+ /*
+ * Creating a file and writing buffer into it. Also recording the
+ * buffer for future read() call.
+ * This creating outputStream and writing requires 2 *
+ * (LARGE_OPERATIONS) get requests.
+ */
+ StringBuilder largeBuffer = new StringBuilder();
+ out = fs.create(getResponsePath);
+ for (int i = 0; i < LARGE_OPERATIONS; i++) {
+ out.write(testResponseString.getBytes());
+ out.hflush();
+ largeBuffer.append(testResponseString);
+ }
+
+ // Open requires 1 get_response.
+ in = fs.open(getResponsePath);
+
+ /*
+ * Reading the file which was written above. This read() call would
+ * read bytes equal to the bytes that was written above.
+ * Get response would be 1 only.
+ */
+ in.read(0, largeBuffer.toString().getBytes(), 0,
+ largeBuffer.toString().getBytes().length);
+
+ metricMap = fs.getInstrumentationMap();
+
+ /*
+ * Testing the statistics values after writing and reading a large buffer.
+ *
+ * get_response : get_responses(Last assertion) + 1
+ * (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing
+ * LARGE_OPERATIONS times) + 1(open()) + 1(read()).
+ *
+ * bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS *
+ * bytes wrote each time (bytes_received is equal to bytes wrote in the
+ * File).
+ *
+ */
+ assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
+ bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length),
+ metricMap);
+ assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
+ getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap);
+
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, out, in);
+ }
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java
index c88dc84..4220580 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java
@@ -45,7 +45,7 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
describe("Testing the initial values of Abfs counters");
AbfsCounters abfsCounters =
- new AbfsInstrumentation(getFileSystem().getUri());
+ new AbfsCountersImpl(getFileSystem().getUri());
Map<String, Long> metricMap = abfsCounters.toMap();
for (Map.Entry<String, Long> entry : metricMap.entrySet()) {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java
new file mode 100644
index 0000000..0639cf2
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
+
+public class TestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
+
+ private static final int LARGE_OPERATIONS = 1000;
+
+ public TestAbfsNetworkStatistics() throws Exception {
+ }
+
+ /**
+ * Test to check correct values of read and write throttling statistics in
+ * {@code AbfsClientThrottlingAnalyzer}.
+ */
+ @Test
+ public void testAbfsThrottlingStatistics() throws IOException {
+ describe("Test to check correct values of read throttle and write "
+ + "throttle statistics in Abfs");
+
+ AbfsCounters statistics =
+ new AbfsCountersImpl(getFileSystem().getUri());
+
+ /*
+ * Calling the throttle methods to check correct summation and values of
+ * the counters.
+ */
+ for (int i = 0; i < LARGE_OPERATIONS; i++) {
+ statistics.incrementCounter(AbfsStatistic.READ_THROTTLES, 1);
+ statistics.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1);
+ }
+
+ Map<String, Long> metricMap = statistics.toMap();
+
+ /*
+ * Test to check read and write throttle statistics gave correct values for
+ * 1000 calls.
+ */
+ assertAbfsStatistics(AbfsStatistic.READ_THROTTLES, LARGE_OPERATIONS,
+ metricMap);
+ assertAbfsStatistics(AbfsStatistic.WRITE_THROTTLES, LARGE_OPERATIONS,
+ metricMap);
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java
index 20d96fa..f831d2d 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java
@@ -43,7 +43,7 @@ public class TestAbfsStatistics extends AbstractAbfsIntegrationTest {
describe("Testing the counter values after Abfs is initialised");
AbfsCounters instrumentation =
- new AbfsInstrumentation(getFileSystem().getUri());
+ new AbfsCountersImpl(getFileSystem().getUri());
//Testing summation of the counter values.
for (int i = 0; i < LARGE_OPS; i++) {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
index 0fd65fb..8197e7e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
@@ -100,7 +100,7 @@ public final class TestAbfsClient {
private String getUserAgentString(AbfsConfiguration config,
boolean includeSSLProvider) throws MalformedURLException {
AbfsClient client = new AbfsClient(new URL("https://azure.com"), null,
- config, null, (AccessTokenProvider) null, null);
+ config, null, (AccessTokenProvider) null, null, null);
String sslProviderName = null;
if (includeSSLProvider) {
sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory()
@@ -267,7 +267,7 @@ public final class TestAbfsClient {
(currentAuthType == AuthType.OAuth
? abfsConfig.getTokenProvider()
: null),
- tracker);
+ tracker, null);
return testClient;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org