You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/11/02 13:20:34 UTC

[GitHub] [hadoop] pranavsaxena-microsoft commented on a diff in pull request #5034: HADOOP-18457. ABFS: Support for account level throttling

pranavsaxena-microsoft commented on code in PR #5034:
URL: https://github.com/apache/hadoop/pull/5034#discussion_r1011670326


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -95,6 +95,18 @@ private AbfsClientThrottlingAnalyzer() {
         analysisPeriodMs);
   }
 
+  /**
+   * Resumes the timer if it was stopped.
+   */
+  public void resumeTimer() {

Review Comment:
   Lets have it private



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -216,6 +218,10 @@ SharedKeyCredentials getSharedKeyCredentials() {
     return sharedKeyCredentials;
   }
 
+  public AbfsThrottlingIntercept getIntercept() {

Review Comment:
   Lets make it 'default' access-type. Reason being, this can be accessed by end-developer and might updateMetrics in such a way that our client doesn't throttle in suitable manner.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java:
##########
@@ -104,19 +113,24 @@ private AbfsClientThrottlingAnalyzer() {
   public void addBytesTransferred(long count, boolean isFailedOperation) {
     AbfsOperationMetrics metrics = blobMetrics.get();
     if (isFailedOperation) {
-      metrics.bytesFailed.addAndGet(count);
-      metrics.operationsFailed.incrementAndGet();
+      metrics.getBytesFailed().addAndGet(count);
+      metrics.getOperationsFailed().incrementAndGet();
     } else {
-      metrics.bytesSuccessful.addAndGet(count);
-      metrics.operationsSuccessful.incrementAndGet();
+      metrics.getBytesSuccessful().addAndGet(count);
+      metrics.getOperationsSuccessful().incrementAndGet();
     }
+    blobMetrics.set(metrics);
   }
 
   /**
    * Suspends the current storage operation, as necessary, to reduce throughput.
    * @return true if Thread sleeps(Throttling occurs) else false.
    */
   public boolean suspendIfNecessary() {
+    if (isOperationOnAccountIdle.get()) {
+      resumeTimer();
+    }

Review Comment:
   Still better approach I feel we can implement is (I just thought of it now), lets have a synchronized block which deals with isOperationOnAccountIdle (either for suspend or resume), because still race conditions can happen.  For ex:, 
   1. https://github.com/anmolanmol1234/hadoop/blob/HADOOP-18457_temp/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java#L266
   2. https://github.com/anmolanmol1234/hadoop/blob/HADOOP-18457_temp/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java#L133-L136
   3. https://github.com/anmolanmol1234/hadoop/blob/HADOOP-18457_temp/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java#L267-L269



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -38,35 +40,89 @@
  * and sleeps just enough to minimize errors, allowing optimal ingress and/or
  * egress throughput.
  */
-public final class AbfsClientThrottlingIntercept {
+public final class AbfsClientThrottlingIntercept implements AbfsThrottlingIntercept {
   private static final Logger LOG = LoggerFactory.getLogger(
       AbfsClientThrottlingIntercept.class);
   private static final String RANGE_PREFIX = "bytes=";
-  private static AbfsClientThrottlingIntercept singleton = null;
-  private AbfsClientThrottlingAnalyzer readThrottler = null;
-  private AbfsClientThrottlingAnalyzer writeThrottler = null;
-  private static boolean isAutoThrottlingEnabled = false;
+  private static AbfsClientThrottlingIntercept singleton; // singleton, initialized in static initialization block
+  private static final ReentrantLock LOCK = new ReentrantLock();
+  private final AbfsClientThrottlingAnalyzer readThrottler;
+  private final AbfsClientThrottlingAnalyzer writeThrottler;
+  private final String accountName;
 
   // Hide default constructor
-  private AbfsClientThrottlingIntercept() {
-    readThrottler = new AbfsClientThrottlingAnalyzer("read");
-    writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+  public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration abfsConfiguration) {
+    this.accountName = accountName;
+    this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+    this.writeThrottler = setAnalyzer("write " + accountName, abfsConfiguration);
+    LOG.debug("Client-side throttling is enabled for the ABFS file system for the account : {}", accountName);
   }
 
-  public static synchronized void initializeSingleton(boolean enableAutoThrottling) {
-    if (!enableAutoThrottling) {
-      return;
-    }
+  // Hide default constructor
+  private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+    //Account name is kept as empty as same instance is shared across all accounts
+    this.accountName = "";
+    this.readThrottler = setAnalyzer("read", abfsConfiguration);
+    this.writeThrottler = setAnalyzer("write", abfsConfiguration);
+    LOG.debug("Client-side throttling is enabled for the ABFS file system using singleton intercept");
+  }
+
+  /**
+   * Sets the analyzer for the intercept.
+   * @param name Name of the analyzer.
+   * @param abfsConfiguration The configuration.
+   * @return AbfsClientThrottlingAnalyzer instance.
+   */
+  private AbfsClientThrottlingAnalyzer setAnalyzer(String name, AbfsConfiguration abfsConfiguration) {
+    return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+  }
+
+  /**
+   * Returns the analyzer for read operations.
+   * @return AbfsClientThrottlingAnalyzer for read.
+   */
+  AbfsClientThrottlingAnalyzer getReadThrottler() {
+    return readThrottler;
+  }
+
+  /**
+   * Returns the analyzer for write operations.
+   * @return AbfsClientThrottlingAnalyzer for write.
+   */
+  AbfsClientThrottlingAnalyzer getWriteThrottler() {
+    return writeThrottler;
+  }
+
+  /**
+   * Creates a singleton object of the AbfsClientThrottlingIntercept.
+   * which is shared across all filesystem instances.
+   * @param abfsConfiguration configuration set.
+   * @return singleton object of intercept.
+   */
+  static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration abfsConfiguration) {
     if (singleton == null) {
-      singleton = new AbfsClientThrottlingIntercept();
-      isAutoThrottlingEnabled = true;
-      LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+      LOCK.lock();
+      try {
+        if (singleton == null) {
+          singleton = new AbfsClientThrottlingIntercept(abfsConfiguration);
+          LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+        }
+      } finally {
+        LOCK.unlock();
+      }
     }
+    return singleton;
   }
 
-  static void updateMetrics(AbfsRestOperationType operationType,
-                            AbfsHttpOperation abfsHttpOperation) {
-    if (!isAutoThrottlingEnabled || abfsHttpOperation == null) {
+  /**
+   * Updates the metrics for successful and failed read and write operations.
+   * @param operationType Only applicable for read and write operations.
+   * @param abfsHttpOperation Used for status code and data transfeered.

Review Comment:
   nit: transferred.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -38,35 +40,89 @@
  * and sleeps just enough to minimize errors, allowing optimal ingress and/or
  * egress throughput.
  */
-public final class AbfsClientThrottlingIntercept {
+public final class AbfsClientThrottlingIntercept implements AbfsThrottlingIntercept {
   private static final Logger LOG = LoggerFactory.getLogger(
       AbfsClientThrottlingIntercept.class);
   private static final String RANGE_PREFIX = "bytes=";
-  private static AbfsClientThrottlingIntercept singleton = null;
-  private AbfsClientThrottlingAnalyzer readThrottler = null;
-  private AbfsClientThrottlingAnalyzer writeThrottler = null;
-  private static boolean isAutoThrottlingEnabled = false;
+  private static AbfsClientThrottlingIntercept singleton; // singleton, initialized in static initialization block
+  private static final ReentrantLock LOCK = new ReentrantLock();
+  private final AbfsClientThrottlingAnalyzer readThrottler;
+  private final AbfsClientThrottlingAnalyzer writeThrottler;
+  private final String accountName;
 
   // Hide default constructor
-  private AbfsClientThrottlingIntercept() {
-    readThrottler = new AbfsClientThrottlingAnalyzer("read");
-    writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+  public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration abfsConfiguration) {
+    this.accountName = accountName;
+    this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+    this.writeThrottler = setAnalyzer("write " + accountName, abfsConfiguration);
+    LOG.debug("Client-side throttling is enabled for the ABFS file system for the account : {}", accountName);
   }
 
-  public static synchronized void initializeSingleton(boolean enableAutoThrottling) {
-    if (!enableAutoThrottling) {
-      return;
-    }
+  // Hide default constructor
+  private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+    //Account name is kept as empty as same instance is shared across all accounts
+    this.accountName = "";
+    this.readThrottler = setAnalyzer("read", abfsConfiguration);
+    this.writeThrottler = setAnalyzer("write", abfsConfiguration);
+    LOG.debug("Client-side throttling is enabled for the ABFS file system using singleton intercept");
+  }
+
+  /**
+   * Sets the analyzer for the intercept.
+   * @param name Name of the analyzer.
+   * @param abfsConfiguration The configuration.
+   * @return AbfsClientThrottlingAnalyzer instance.
+   */
+  private AbfsClientThrottlingAnalyzer setAnalyzer(String name, AbfsConfiguration abfsConfiguration) {
+    return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+  }
+
+  /**
+   * Returns the analyzer for read operations.
+   * @return AbfsClientThrottlingAnalyzer for read.
+   */
+  AbfsClientThrottlingAnalyzer getReadThrottler() {
+    return readThrottler;
+  }
+
+  /**
+   * Returns the analyzer for write operations.
+   * @return AbfsClientThrottlingAnalyzer for write.
+   */
+  AbfsClientThrottlingAnalyzer getWriteThrottler() {
+    return writeThrottler;
+  }
+
+  /**
+   * Creates a singleton object of the AbfsClientThrottlingIntercept.
+   * which is shared across all filesystem instances.
+   * @param abfsConfiguration configuration set.
+   * @return singleton object of intercept.
+   */
+  static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration abfsConfiguration) {
     if (singleton == null) {
-      singleton = new AbfsClientThrottlingIntercept();
-      isAutoThrottlingEnabled = true;
-      LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+      LOCK.lock();
+      try {
+        if (singleton == null) {
+          singleton = new AbfsClientThrottlingIntercept(abfsConfiguration);
+          LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+        }
+      } finally {

Review Comment:
   We can remove try , finally block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org