You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/04/03 20:18:18 UTC

[GitHub] [incubator-hudi] v3nkatesh opened a new pull request #1484: Hbase qps repartition writestatus

v3nkatesh opened a new pull request #1484: Hbase qps repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484
 
 
   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
   
   ## What is the purpose of the pull request
   
   This pull request optimizes hbase index write operations.
   
   ## Brief change log
     - *Replaces Thread.sleep() with RateLimiter to reduce wait time during hbase puts operation*
     - *Repartitions `WriteStatus with new records` to improve parallelism of hbase index operations*  
   
   ## Verify this pull request
   
   This change added tests and can be verified as follows:
     - *Added tests to TestHbaseIndex to verify the repartition optimization*
     - *Also verified the change by running a ob end to end*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407807929
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##########
 @@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
   /**
    * Helper method to facilitate performing mutations (including puts and deletes) in Hbase.
    */
-  private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
+  private void doMutations(BufferedMutator mutator, List<Mutation> mutations, RateLimiter limiter) throws IOException {
     if (mutations.isEmpty()) {
       return;
     }
+    // report number of operations to account per second with rate limiter.
+    // If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls
+    // for within that second
+    limiter.acquire(mutations.size());
     mutator.mutate(mutations);
     mutator.flush();
     mutations.clear();
-    sleepForTime(SLEEP_TIME_MILLISECONDS);
-  }
-
-  private static void sleepForTime(int sleepTimeMs) {
-    try {
-      Thread.sleep(sleepTimeMs);
-    } catch (InterruptedException e) {
-      LOG.error("Sleep interrupted during throttling", e);
-      throw new RuntimeException(e);
-    }
   }
 
   @Override
   public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
       HoodieTable<T> hoodieTable) {
-    final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
-    setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
-    LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
-    JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
+    final Option<Float> desiredQPSFraction =  calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator);
+    // Map each fileId that has inserts to a unique partition Id. This will be used while
+    // repartitioning RDD<WriteStatus>
+    int partitionIndex = 0;
+    final List<String> fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0)
+                                   .map(w -> w.getFileId()).collect();
+    for (final String fileId : fileIds) {
+      this.fileIdPartitionMap.put(fileId, partitionIndex++);
+    }
+    JavaRDD<WriteStatus> partitionedRDD = this.numWriteStatusWithInserts == 0 ? writeStatusRDD :
+                                          writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w))
+                                            .partitionBy(new WriteStatusPartitioner(this.fileIdPartitionMap,
+                                              this.numWriteStatusWithInserts))
+                                            .map(w -> w._2());
+    acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc);
+    LOG.info("multiPutBatchSize before hbase puts: " + this.multiPutBatchSize);
+    JavaRDD<WriteStatus> writeStatusJavaRDD = partitionedRDD.mapPartitionsWithIndex(updateLocationFunction(),
+        true);
     // caching the index updated status RDD
     writeStatusJavaRDD = writeStatusJavaRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
+    // force trigger update location(hbase puts)
+    writeStatusJavaRDD.count();
+    this.hBaseIndexQPSResourceAllocator.releaseQPSResources();
     return writeStatusJavaRDD;
   }
 
-  private void setPutBatchSize(JavaRDD<WriteStatus> writeStatusRDD,
-      HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator, final JavaSparkContext jsc) {
+  private Option<Float> calculateQPSFraction(JavaRDD<WriteStatus> writeStatusRDD,
+                                               HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator) {
 
 Review comment:
   Not directly related to your change, so feel free to ignore this comment. But hBaseIndexQPSResourceAllocator  is instance variable. why is this again passed as argument. This seems like a consistent pattern in this class. Because we are also using exact same name for local variable, it masks instance variable and can become easily error prone if the two variables evolve to mean different things.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407868223
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##########
 @@ -498,4 +554,37 @@ public boolean isImplicitWithStorage() {
   public void setHbaseConnection(Connection hbaseConnection) {
     HBaseIndex.hbaseConnection = hbaseConnection;
   }
+
+  /**
+   * Partitions each WriteStatus with inserts into a unique single partition. WriteStatus without inserts will be
+   * assigned to random partitions. This partitioner will be useful to utilize max parallelism with spark operations
+   * that are based on inserts in each WriteStatus.
+   */
+  public class WriteStatusPartitioner extends Partitioner {
 
 Review comment:
   Could you please make this a static class if its not using any instance variables of outer class

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407869565
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##########
 @@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
   /**
    * Helper method to facilitate performing mutations (including puts and deletes) in Hbase.
    */
-  private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
+  private void doMutations(BufferedMutator mutator, List<Mutation> mutations, RateLimiter limiter) throws IOException {
     if (mutations.isEmpty()) {
       return;
     }
+    // report number of operations to account per second with rate limiter.
+    // If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls
+    // for within that second
+    limiter.acquire(mutations.size());
     mutator.mutate(mutations);
     mutator.flush();
     mutations.clear();
-    sleepForTime(SLEEP_TIME_MILLISECONDS);
-  }
-
-  private static void sleepForTime(int sleepTimeMs) {
-    try {
-      Thread.sleep(sleepTimeMs);
-    } catch (InterruptedException e) {
-      LOG.error("Sleep interrupted during throttling", e);
-      throw new RuntimeException(e);
-    }
   }
 
   @Override
   public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
       HoodieTable<T> hoodieTable) {
-    final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
-    setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
-    LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
-    JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
+    final Option<Float> desiredQPSFraction =  calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator);
+    // Map each fileId that has inserts to a unique partition Id. This will be used while
+    // repartitioning RDD<WriteStatus>
+    int partitionIndex = 0;
+    final List<String> fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0)
+                                   .map(w -> w.getFileId()).collect();
+    for (final String fileId : fileIds) {
+      this.fileIdPartitionMap.put(fileId, partitionIndex++);
+    }
+    JavaRDD<WriteStatus> partitionedRDD = this.numWriteStatusWithInserts == 0 ? writeStatusRDD :
+                                          writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w))
+                                            .partitionBy(new WriteStatusPartitioner(this.fileIdPartitionMap,
+                                              this.numWriteStatusWithInserts))
+                                            .map(w -> w._2());
+    acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc);
+    LOG.info("multiPutBatchSize before hbase puts: " + this.multiPutBatchSize);
 
 Review comment:
   nit: looks like this is logged in the above method call too. so i think this can be removed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407812837
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##########
 @@ -83,13 +88,17 @@
   private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
   private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
   private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path");
-  private static final int SLEEP_TIME_MILLISECONDS = 100;
 
   private static final Logger LOG = LogManager.getLogger(HBaseIndex.class);
   private static Connection hbaseConnection = null;
   private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
 
 Review comment:
   Can you share the context on why we created HBaseIndexQPSResourceAllocator?  Do you think calls to RateLimiter#acquire can be made inside HBaseIndexQPSResourceAllocator#acquireQPSResources to simplify? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] n3nash commented on issue #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
n3nash commented on issue #1484: [HUDI-316] : Hbase qps repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#issuecomment-613027382
 
 
   @satishkotha can you please help review this ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] v3nkatesh commented on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on pull request #1484:
URL: https://github.com/apache/hudi/pull/1484#issuecomment-654383402


   Hi @vinothchandar Let me know if you will have time for the RateLimiter class. If not I will explore other options, pls let know.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] v3nkatesh commented on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on pull request #1484:
URL: https://github.com/apache/hudi/pull/1484#issuecomment-649156492


   > In any case, we need unit tests for the RateLimiter class..
   > Few alternatives ..
   > You can maintain the index outside Hudi (index classes are pluggable)
   > I can write the rate limiter for you and check it in..
   > 
   > let's hash this out together :)
   
   Thanks @vinothchandar. Reg. restrictions around guava makes sense. Would be great if you can check in a Rate limiter equivalent. Let me know if you don't have time for that in the near future.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io commented on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #1484:
URL: https://github.com/apache/hudi/pull/1484#issuecomment-719002012


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/1484?src=pr&el=h1) Report
   > Merging [#1484](https://codecov.io/gh/apache/hudi/pull/1484?src=pr&el=desc) into [master](https://codecov.io/gh/apache/hudi/commit/e206ddd431131da26cf1bb00f70fe64ad0450059?el=desc) will **increase** coverage by `0.05%`.
   > The diff coverage is `86.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/1484/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/1484?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #1484      +/-   ##
   ============================================
   + Coverage     53.68%   53.74%   +0.05%     
   - Complexity     2849     2858       +9     
   ============================================
     Files           359      360       +1     
     Lines         16565    16595      +30     
     Branches       1782     1782              
   ============================================
   + Hits           8893     8919      +26     
   - Misses         6915     6919       +4     
     Partials        757      757              
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | #hudicli | `38.37% <ø> (ø)` | `193.00 <ø> (ø)` | |
   | #hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | #hudicommon | `54.81% <86.66%> (+0.11%)` | `1803.00 <9.00> (+9.00)` | |
   | #hudihadoopmr | `33.05% <ø> (ø)` | `181.00 <ø> (ø)` | |
   | #hudispark | `65.95% <ø> (ø)` | `304.00 <ø> (ø)` | |
   | #huditimelineservice | `62.29% <ø> (ø)` | `50.00 <ø> (ø)` | |
   | #hudiutilities | `70.09% <ø> (ø)` | `327.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/1484?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../java/org/apache/hudi/common/util/RateLimiter.java](https://codecov.io/gh/apache/hudi/pull/1484/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvUmF0ZUxpbWl0ZXIuamF2YQ==) | `86.66% <86.66%> (ø)` | `9.00 <9.00> (?)` | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407815204
 
 

 ##########
 File path: hudi-common/src/main/java/org/apache/hudi/common/util/RateLimiter.java
 ##########
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+/*
+ * Note: Based on RateLimiter implementation in Google/Guava.
+ *         - adopted from com.google.common.util.concurrent
+ *           Copyright (C) 2012 The Guava Authors
+ *           Home page: https://github.com/google/guava
+ *           License: http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+@ThreadSafe
+public abstract class RateLimiter {
+  private final RateLimiter.SleepingTicker ticker;
+  private final long offsetNanos;
+  double storedPermits;
+  double maxPermits;
+  volatile double stableIntervalMicros;
+  private final Object mutex;
+  private long nextFreeTicketMicros;
+
+  public static RateLimiter create(double permitsPerSecond) {
+    return create(RateLimiter.SleepingTicker.SYSTEM_TICKER, permitsPerSecond);
+  }
+
+  static RateLimiter create(RateLimiter.SleepingTicker ticker, double permitsPerSecond) {
+    RateLimiter rateLimiter = new RateLimiter.Bursty(ticker, 1.0D);
+    rateLimiter.setRate(permitsPerSecond);
+    return rateLimiter;
+  }
+
+  public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
+    return create(RateLimiter.SleepingTicker.SYSTEM_TICKER, permitsPerSecond, warmupPeriod, unit);
+  }
+
+  static RateLimiter create(RateLimiter.SleepingTicker ticker, double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
+    RateLimiter rateLimiter = new RateLimiter.WarmingUp(ticker, warmupPeriod, unit);
+    rateLimiter.setRate(permitsPerSecond);
+    return rateLimiter;
+  }
+
+  private RateLimiter(RateLimiter.SleepingTicker ticker) {
+    this.mutex = new Object();
+    this.nextFreeTicketMicros = 0L;
+    this.ticker = ticker;
+    this.offsetNanos = ticker.read();
+  }
+
+  public final void setRate(double permitsPerSecond) {
+    checkArgument(permitsPerSecond > 0.0D && !Double.isNaN(permitsPerSecond), "rate must be positive");
+    Object var3 = this.mutex;
+    synchronized (this.mutex) {
+      this.resync(this.readSafeMicros());
+      double stableIntervalMicros = (double)TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
+      this.stableIntervalMicros = stableIntervalMicros;
+      this.doSetRate(permitsPerSecond, stableIntervalMicros);
+    }
+  }
+
+  abstract void doSetRate(double var1, double var3);
+
+  public final double getRate() {
+    return (double)TimeUnit.SECONDS.toMicros(1L) / this.stableIntervalMicros;
+  }
+
+  public void acquire() {
+    this.acquire(1);
+  }
+
+  public void acquire(int permits) {
 
 Review comment:
   could you return the time spent waiting here? I think adding metrics on time taken is very important for debugging any potential performance issues. Also, would be useful to log if time taken is greater than some threshold (say, 300ms?)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r408328994
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##########
 @@ -252,8 +263,10 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
     };
   }
 
-  private Result[] doGet(HTable hTable, List<Get> keys) throws IOException {
-    sleepForTime(SLEEP_TIME_MILLISECONDS);
+  private Result[] doGet(HTable hTable, List<Get> keys, RateLimiter limiter) throws IOException {
+    if (keys.size() > 0) {
 
 Review comment:
   nit: can we also move hTable.get(keys) inside this if?  do we need to invoke hTable.get if keys is empty?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] n3nash merged pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
n3nash merged pull request #1484:
URL: https://github.com/apache/hudi/pull/1484


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407905954
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##########
 @@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
   /**
    * Helper method to facilitate performing mutations (including puts and deletes) in Hbase.
    */
-  private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
+  private void doMutations(BufferedMutator mutator, List<Mutation> mutations, RateLimiter limiter) throws IOException {
     if (mutations.isEmpty()) {
       return;
     }
+    // report number of operations to account per second with rate limiter.
+    // If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls
+    // for within that second
+    limiter.acquire(mutations.size());
     mutator.mutate(mutations);
     mutator.flush();
     mutations.clear();
-    sleepForTime(SLEEP_TIME_MILLISECONDS);
-  }
-
-  private static void sleepForTime(int sleepTimeMs) {
-    try {
-      Thread.sleep(sleepTimeMs);
-    } catch (InterruptedException e) {
-      LOG.error("Sleep interrupted during throttling", e);
-      throw new RuntimeException(e);
-    }
   }
 
   @Override
   public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
       HoodieTable<T> hoodieTable) {
-    final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
-    setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
-    LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
-    JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
+    final Option<Float> desiredQPSFraction =  calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator);
+    // Map each fileId that has inserts to a unique partition Id. This will be used while
+    // repartitioning RDD<WriteStatus>
+    int partitionIndex = 0;
+    final List<String> fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0)
+                                   .map(w -> w.getFileId()).collect();
+    for (final String fileId : fileIds) {
+      this.fileIdPartitionMap.put(fileId, partitionIndex++);
+    }
+    JavaRDD<WriteStatus> partitionedRDD = this.numWriteStatusWithInserts == 0 ? writeStatusRDD :
+                                          writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w))
+                                            .partitionBy(new WriteStatusPartitioner(this.fileIdPartitionMap,
+                                              this.numWriteStatusWithInserts))
+                                            .map(w -> w._2());
 
 Review comment:
   Consider redoing this logic, because if this.numWriteStatusWithInserts == 0 , we still go through the process of generating fileIdPartitionMap which is not ideal.
   
   Also, curious, if you did any performance measurements before and after this change. It is worth highlighting in release notes if this improvement is significant

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] vinothchandar commented on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1484:
URL: https://github.com/apache/hudi/pull/1484#issuecomment-689722005


   will do. thanks ! @n3nash can take a pass as well 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
n3nash commented on pull request #1484:
URL: https://github.com/apache/hudi/pull/1484#issuecomment-689961998


   Yes, I will do by Friday.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407766063
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##########
 @@ -83,13 +88,17 @@
   private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
   private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
   private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path");
-  private static final int SLEEP_TIME_MILLISECONDS = 100;
 
   private static final Logger LOG = LogManager.getLogger(HBaseIndex.class);
   private static Connection hbaseConnection = null;
   private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
   private float qpsFraction;
   private int maxQpsPerRegionServer;
+  private int maxPutsPerSec;
+  private long totalNumInserts;
+  private int numWriteStatusWithInserts;
+  Map<String, Integer> fileIdPartitionMap = new HashMap<>();
 
 Review comment:
   Do you need this map as instance variable? Looks like there is one HBaseIndex object per client. We don't seem to be clearing entries from this map also.  So, over time, this map can get pretty large and can cause increased memory utilization? Please correct me if I'm misreading.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] codecov-io edited a comment on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #1484:
URL: https://github.com/apache/hudi/pull/1484#issuecomment-719002012


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/1484?src=pr&el=h1) Report
   > Merging [#1484](https://codecov.io/gh/apache/hudi/pull/1484?src=pr&el=desc) into [master](https://codecov.io/gh/apache/hudi/commit/e206ddd431131da26cf1bb00f70fe64ad0450059?el=desc) will **increase** coverage by `0.05%`.
   > The diff coverage is `86.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/1484/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/1484?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #1484      +/-   ##
   ============================================
   + Coverage     53.68%   53.74%   +0.05%     
   - Complexity     2849     2858       +9     
   ============================================
     Files           359      360       +1     
     Lines         16565    16595      +30     
     Branches       1782     1782              
   ============================================
   + Hits           8893     8919      +26     
   - Misses         6915     6919       +4     
     Partials        757      757              
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | #hudicli | `38.37% <ø> (ø)` | `193.00 <ø> (ø)` | |
   | #hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | #hudicommon | `54.81% <86.66%> (+0.11%)` | `1803.00 <9.00> (+9.00)` | |
   | #hudihadoopmr | `33.05% <ø> (ø)` | `181.00 <ø> (ø)` | |
   | #hudispark | `65.95% <ø> (ø)` | `304.00 <ø> (ø)` | |
   | #huditimelineservice | `62.29% <ø> (ø)` | `50.00 <ø> (ø)` | |
   | #hudiutilities | `70.09% <ø> (ø)` | `327.00 <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/1484?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../java/org/apache/hudi/common/util/RateLimiter.java](https://codecov.io/gh/apache/hudi/pull/1484/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvUmF0ZUxpbWl0ZXIuamF2YQ==) | `86.66% <86.66%> (ø)` | `9.00 <9.00> (?)` | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1484:
URL: https://github.com/apache/hudi/pull/1484#issuecomment-655208316


   @v3nkatesh are you able to write the rate limiter class yourself?  :)  if not can @garyli1019 @xushiyan help? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r403398611
 
 

 ##########
 File path: hudi-common/src/main/java/org/apache/hudi/common/util/RateLimiter.java
 ##########
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+/*
+ * Note: Based on RateLimiter implementation in Google/Guava.
 
 Review comment:
   Could we please avoid using this? Should not be too hard to roll our own.. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] vinothchandar commented on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1484:
URL: https://github.com/apache/hudi/pull/1484#issuecomment-703335287


   @n3nash I tried to rebase this and noticed that the custom ratelimiter is still a reuse from Guava. Are we still pursuing this PR? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] v3nkatesh commented on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on pull request #1484:
URL: https://github.com/apache/hudi/pull/1484#issuecomment-686984608


   > @v3nkatesh are you able to write the rate limiter class yourself? :) if not can @garyli1019 @xushiyan help?
   
   @vinothchandar Pushed a custom RateLimiter, pls take a look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r408345007
 
 

 ##########
 File path: hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
 ##########
 @@ -329,47 +332,140 @@ public void testPutBatchSizeCalculation() {
     // All asserts cases below are derived out of the first
     // example below, with change in one parameter at a time.
 
-    int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f);
-    // Expected batchSize is 8 because in that case, total request sent in one second is below
-    // 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) => 16000
-    // We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 request
-    // 1600 happens to be 10% of 16667 (maxQPSPerRegionServer) as expected.
-    Assert.assertEquals(putBatchSize, 8);
+    int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 0.1f);
+    // Total puts that can be sent  in 1 second = (10 * 16667 * 0.1) = 16,667
+    // Total puts per batch will be (16,667 / parallelism) = 83.335, where 200 is the maxExecutors
+    Assert.assertEquals(putBatchSize, 83);
 
     // Number of Region Servers are halved, total requests sent in a second are also halved, so batchSize is also halved
-    int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 100, 0.1f);
-    Assert.assertEquals(putBatchSize2, 4);
+    int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 0.1f);
+    Assert.assertEquals(putBatchSize2, 41);
 
     // If the parallelism is halved, batchSize has to double
-    int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 100, 0.1f);
-    Assert.assertEquals(putBatchSize3, 16);
+    int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 0.1f);
+    Assert.assertEquals(putBatchSize3, 166);
 
     // If the parallelism is halved, batchSize has to double.
     // This time parallelism is driven by numTasks rather than numExecutors
-    int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 100, 0.1f);
-    Assert.assertEquals(putBatchSize4, 16);
+    int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 0.1f);
+    Assert.assertEquals(putBatchSize4, 166);
 
     // If sleepTimeMs is halved, batchSize has to halve
-    int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.05f);
-    Assert.assertEquals(putBatchSize5, 4);
+    int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 0.05f);
+    Assert.assertEquals(putBatchSize5, 41);
 
     // If maxQPSPerRegionServer is doubled, batchSize also doubles
-    int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 100, 0.1f);
-    Assert.assertEquals(putBatchSize6, 16);
+    int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 0.1f);
+    Assert.assertEquals(putBatchSize6, 166);
   }
 
   @Test
   public void testsHBasePutAccessParallelism() {
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
     final JavaRDD<WriteStatus> writeStatusRDD = jsc.parallelize(
-        Arrays.asList(getSampleWriteStatus(1, 2), getSampleWriteStatus(0, 3), getSampleWriteStatus(10, 0)), 10);
+        Arrays.asList(
+          getSampleWriteStatus(0, 2),
+          getSampleWriteStatus(2, 3),
+          getSampleWriteStatus(4, 3),
+          getSampleWriteStatus(6, 3),
+          getSampleWriteStatus(8, 0)),
+        10);
     final Tuple2<Long, Integer> tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
     final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString());
     final int hbaseNumPuts = Integer.parseInt(tuple._1.toString());
     Assert.assertEquals(10, writeStatusRDD.getNumPartitions());
-    Assert.assertEquals(2, hbasePutAccessParallelism);
-    Assert.assertEquals(11, hbaseNumPuts);
+    Assert.assertEquals(4, hbasePutAccessParallelism);
+    Assert.assertEquals(20, hbaseNumPuts);
+  }
+
+  @Test
+  public void testsWriteStatusPartitioner() {
+    HoodieWriteConfig config = getConfig();
+    HBaseIndex index = new HBaseIndex(config);
+    int parallelism = 4;
+    final JavaRDD<WriteStatus> writeStatusRDD = jsc.parallelize(
+        Arrays.asList(
+          getSampleWriteStatusWithFileId(0, 2),
+          getSampleWriteStatusWithFileId(2, 3),
+          getSampleWriteStatusWithFileId(4, 3),
+          getSampleWriteStatusWithFileId(0, 3),
+          getSampleWriteStatusWithFileId(11, 0)), parallelism);
+    int partitionIndex = 0;
+    final Map<String, Integer> fileIdPartitionMap = new HashMap<>();
+
+    final List<String> fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0)
 
 Review comment:
   lot of code in this test seems like repetition from source code. consider refactoring this part into a library to reuse in tests if needed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] n3nash commented on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
n3nash commented on pull request #1484:
URL: https://github.com/apache/hudi/pull/1484#issuecomment-718854042


   @v3nkatesh the build is failing, can you please fix it and then I can merge the PR 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407805506
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##########
 @@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
   /**
    * Helper method to facilitate performing mutations (including puts and deletes) in Hbase.
    */
-  private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
+  private void doMutations(BufferedMutator mutator, List<Mutation> mutations, RateLimiter limiter) throws IOException {
     if (mutations.isEmpty()) {
       return;
     }
+    // report number of operations to account per second with rate limiter.
+    // If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls
+    // for within that second
+    limiter.acquire(mutations.size());
     mutator.mutate(mutations);
     mutator.flush();
     mutations.clear();
-    sleepForTime(SLEEP_TIME_MILLISECONDS);
-  }
-
-  private static void sleepForTime(int sleepTimeMs) {
-    try {
-      Thread.sleep(sleepTimeMs);
-    } catch (InterruptedException e) {
-      LOG.error("Sleep interrupted during throttling", e);
-      throw new RuntimeException(e);
-    }
   }
 
   @Override
   public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
       HoodieTable<T> hoodieTable) {
-    final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
-    setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
-    LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
-    JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
+    final Option<Float> desiredQPSFraction =  calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator);
+    // Map each fileId that has inserts to a unique partition Id. This will be used while
+    // repartitioning RDD<WriteStatus>
+    int partitionIndex = 0;
+    final List<String> fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0)
+                                   .map(w -> w.getFileId()).collect();
+    for (final String fileId : fileIds) {
+      this.fileIdPartitionMap.put(fileId, partitionIndex++);
+    }
+    JavaRDD<WriteStatus> partitionedRDD = this.numWriteStatusWithInserts == 0 ? writeStatusRDD :
+                                          writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w))
+                                            .partitionBy(new WriteStatusPartitioner(this.fileIdPartitionMap,
+                                              this.numWriteStatusWithInserts))
+                                            .map(w -> w._2());
+    acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc);
+    LOG.info("multiPutBatchSize before hbase puts: " + this.multiPutBatchSize);
+    JavaRDD<WriteStatus> writeStatusJavaRDD = partitionedRDD.mapPartitionsWithIndex(updateLocationFunction(),
+        true);
     // caching the index updated status RDD
     writeStatusJavaRDD = writeStatusJavaRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
+    // force trigger update location(hbase puts)
+    writeStatusJavaRDD.count();
+    this.hBaseIndexQPSResourceAllocator.releaseQPSResources();
 
 Review comment:
   Can you help me understand this code? why do we need to force trigger here? Is this just to releaseQPSResources? releaseQPSResources seems to be doing nothing (at least default implementation, are there other implementations outside hoodie?). Is it really important to release here as opposed to doing it in 'close()' (earlier behavior)?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407766436
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##########
 @@ -83,13 +88,17 @@
   private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
   private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
   private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path");
-  private static final int SLEEP_TIME_MILLISECONDS = 100;
 
   private static final Logger LOG = LogManager.getLogger(HBaseIndex.class);
   private static Connection hbaseConnection = null;
   private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
   private float qpsFraction;
   private int maxQpsPerRegionServer;
+  private int maxPutsPerSec;
 
 Review comment:
   I dont see how this is used. could you please add a comment for all these instance variables? It seems like they can be local variables specific to the operation being performed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] v3nkatesh commented on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on pull request #1484:
URL: https://github.com/apache/hudi/pull/1484#issuecomment-713192778


   > @n3nash I tried to rebase this and noticed that the custom ratelimiter is still a reuse from Guava. Are we still pursuing this PR?
   
   Sry for the delay folks, I just came back from my break. @vinothchandar [latest RateLimiter clss](https://github.com/apache/hudi/pull/1484/files#diff-30c718dda91817a69efbd57e3e5a499cc3776975d6e3bc8e8f4b1cb0528b91a6R1) is not based on Guava. Let me know


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418397300



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -252,8 +263,10 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
     };
   }
 
-  private Result[] doGet(HTable hTable, List<Get> keys) throws IOException {
-    sleepForTime(SLEEP_TIME_MILLISECONDS);
+  private Result[] doGet(HTable hTable, List<Get> keys, RateLimiter limiter) throws IOException {
+    if (keys.size() > 0) {

Review comment:
       Invoking hTable.get on empty keys is returning an empty Result array. But anyway, I changed it to explicitly return empty array now. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r411749174



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
   /**
    * Helper method to facilitate performing mutations (including puts and deletes) in Hbase.
    */
-  private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
+  private void doMutations(BufferedMutator mutator, List<Mutation> mutations, RateLimiter limiter) throws IOException {
     if (mutations.isEmpty()) {
       return;
     }
+    // report number of operations to account per second with rate limiter.
+    // If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls
+    // for within that second
+    limiter.acquire(mutations.size());
     mutator.mutate(mutations);
     mutator.flush();
     mutations.clear();

Review comment:
       another question, what is the typical latency of these mutate operations? If time taken here combined with time taken to collect 'multiPutBatchSize' is > 1 second, then it seems like limiter would generate enough tokens for next run and would not wait at all. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418391072



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -83,13 +88,17 @@
   private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
   private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
   private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path");
-  private static final int SLEEP_TIME_MILLISECONDS = 100;
 
   private static final Logger LOG = LogManager.getLogger(HBaseIndex.class);
   private static Connection hbaseConnection = null;
   private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;

Review comment:
       Synced offline to go over use-case outside of hoodie and why we need to rate limit here. 
   Just to summarize, we need to rate limit here because the actual hbase operations are handled here. And HBaseIndexQPSResourceAllocator#acquireQPSResources is mostly meant to manage metadata like checking for available resources before an operation, releasing meta resources etc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r425496973



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
   /**
    * Helper method to facilitate performing mutations (including puts and deletes) in Hbase.
    */
-  private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
+  private void doMutations(BufferedMutator mutator, List<Mutation> mutations, RateLimiter limiter) throws IOException {
     if (mutations.isEmpty()) {
       return;
     }
+    // report number of operations to account per second with rate limiter.
+    // If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls
+    // for within that second
+    limiter.acquire(mutations.size());
     mutator.mutate(mutations);
     mutator.flush();
     mutations.clear();

Review comment:
       I think we synced offline for this, but let me try to summarize. 
   Token will be acquired only after previous batch is done, so it won't flood the system or over-utilize the cluster as planned. Though the side effect is hbase operation running slower than intended. Yes metrics on operation will be useful, will create follow up ticket.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418390619



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
   /**
    * Helper method to facilitate performing mutations (including puts and deletes) in Hbase.
    */
-  private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
+  private void doMutations(BufferedMutator mutator, List<Mutation> mutations, RateLimiter limiter) throws IOException {
     if (mutations.isEmpty()) {
       return;
     }
+    // report number of operations to account per second with rate limiter.
+    // If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls
+    // for within that second
+    limiter.acquire(mutations.size());
     mutator.mutate(mutations);
     mutator.flush();
     mutations.clear();
-    sleepForTime(SLEEP_TIME_MILLISECONDS);
-  }
-
-  private static void sleepForTime(int sleepTimeMs) {
-    try {
-      Thread.sleep(sleepTimeMs);
-    } catch (InterruptedException e) {
-      LOG.error("Sleep interrupted during throttling", e);
-      throw new RuntimeException(e);
-    }
   }
 
   @Override
   public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
       HoodieTable<T> hoodieTable) {
-    final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
-    setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
-    LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
-    JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
+    final Option<Float> desiredQPSFraction =  calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator);
+    // Map each fileId that has inserts to a unique partition Id. This will be used while
+    // repartitioning RDD<WriteStatus>
+    int partitionIndex = 0;
+    final List<String> fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0)
+                                   .map(w -> w.getFileId()).collect();
+    for (final String fileId : fileIds) {
+      this.fileIdPartitionMap.put(fileId, partitionIndex++);
+    }
+    JavaRDD<WriteStatus> partitionedRDD = this.numWriteStatusWithInserts == 0 ? writeStatusRDD :
+                                          writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w))
+                                            .partitionBy(new WriteStatusPartitioner(this.fileIdPartitionMap,
+                                              this.numWriteStatusWithInserts))
+                                            .map(w -> w._2());
+    acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc);
+    LOG.info("multiPutBatchSize before hbase puts: " + this.multiPutBatchSize);
+    JavaRDD<WriteStatus> writeStatusJavaRDD = partitionedRDD.mapPartitionsWithIndex(updateLocationFunction(),
+        true);
     // caching the index updated status RDD
     writeStatusJavaRDD = writeStatusJavaRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
+    // force trigger update location(hbase puts)
+    writeStatusJavaRDD.count();
+    this.hBaseIndexQPSResourceAllocator.releaseQPSResources();
     return writeStatusJavaRDD;
   }
 
-  private void setPutBatchSize(JavaRDD<WriteStatus> writeStatusRDD,
-      HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator, final JavaSparkContext jsc) {
+  private Option<Float> calculateQPSFraction(JavaRDD<WriteStatus> writeStatusRDD,
+                                               HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator) {

Review comment:
       Yeah I think it makes sense to refactor it here. Let me update other parts of this class where hBaseIndexQPSResourceAllocator is passed around.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418425591



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
   /**
    * Helper method to facilitate performing mutations (including puts and deletes) in Hbase.
    */
-  private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
+  private void doMutations(BufferedMutator mutator, List<Mutation> mutations, RateLimiter limiter) throws IOException {
     if (mutations.isEmpty()) {
       return;
     }
+    // report number of operations to account per second with rate limiter.
+    // If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls
+    // for within that second
+    limiter.acquire(mutations.size());
     mutator.mutate(mutations);
     mutator.flush();
     mutations.clear();

Review comment:
       Because of the synchronous nature, we will not acquire more permits until the operation is done even if it takes more than a second.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] n3nash commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r425489735



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -83,13 +88,14 @@
   private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
   private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
   private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path");
-  private static final int SLEEP_TIME_MILLISECONDS = 100;
 
   private static final Logger LOG = LogManager.getLogger(HBaseIndex.class);
   private static Connection hbaseConnection = null;
   private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
-  private float qpsFraction;
   private int maxQpsPerRegionServer;
+  private long totalNumInserts;
+  private int numWriteStatusWithInserts;

Review comment:
       @v3nkatesh can you address or respond to this comment ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418692391



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -83,13 +88,14 @@
   private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
   private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
   private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path");
-  private static final int SLEEP_TIME_MILLISECONDS = 100;
 
   private static final Logger LOG = LogManager.getLogger(HBaseIndex.class);
   private static Connection hbaseConnection = null;
   private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
-  private float qpsFraction;
   private int maxQpsPerRegionServer;
+  private long totalNumInserts;
+  private int numWriteStatusWithInserts;

Review comment:
       these two also seem like related to the operation being performed and not really need to be instance variables. If we can find a way to move them to local variables, that would make it cleaner.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] n3nash commented on issue #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
n3nash commented on issue #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#issuecomment-617371930


   @v3nkatesh any updates on this ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418391562



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
   /**
    * Helper method to facilitate performing mutations (including puts and deletes) in Hbase.
    */
-  private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
+  private void doMutations(BufferedMutator mutator, List<Mutation> mutations, RateLimiter limiter) throws IOException {
     if (mutations.isEmpty()) {
       return;
     }
+    // report number of operations to account per second with rate limiter.
+    // If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls
+    // for within that second
+    limiter.acquire(mutations.size());
     mutator.mutate(mutations);
     mutator.flush();
     mutations.clear();
-    sleepForTime(SLEEP_TIME_MILLISECONDS);
-  }
-
-  private static void sleepForTime(int sleepTimeMs) {
-    try {
-      Thread.sleep(sleepTimeMs);
-    } catch (InterruptedException e) {
-      LOG.error("Sleep interrupted during throttling", e);
-      throw new RuntimeException(e);
-    }
   }
 
   @Override
   public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
       HoodieTable<T> hoodieTable) {
-    final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
-    setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
-    LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
-    JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
+    final Option<Float> desiredQPSFraction =  calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator);
+    // Map each fileId that has inserts to a unique partition Id. This will be used while
+    // repartitioning RDD<WriteStatus>
+    int partitionIndex = 0;
+    final List<String> fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0)
+                                   .map(w -> w.getFileId()).collect();
+    for (final String fileId : fileIds) {
+      this.fileIdPartitionMap.put(fileId, partitionIndex++);
+    }
+    JavaRDD<WriteStatus> partitionedRDD = this.numWriteStatusWithInserts == 0 ? writeStatusRDD :
+                                          writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w))
+                                            .partitionBy(new WriteStatusPartitioner(this.fileIdPartitionMap,
+                                              this.numWriteStatusWithInserts))
+                                            .map(w -> w._2());
+    acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc);
+    LOG.info("multiPutBatchSize before hbase puts: " + this.multiPutBatchSize);

Review comment:
       done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] satishkotha commented on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
satishkotha commented on pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#issuecomment-622527791


   @n3nash @v3nkatesh added few more comments. these are in nice-to-have bucket. Since this is already running in production for quiet some time, you can merge given there is time pressure. I can also make additional changes in a later PR if needed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418391855



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
   /**
    * Helper method to facilitate performing mutations (including puts and deletes) in Hbase.
    */
-  private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
+  private void doMutations(BufferedMutator mutator, List<Mutation> mutations, RateLimiter limiter) throws IOException {
     if (mutations.isEmpty()) {
       return;
     }
+    // report number of operations to account per second with rate limiter.
+    // If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls
+    // for within that second
+    limiter.acquire(mutations.size());
     mutator.mutate(mutations);
     mutator.flush();
     mutations.clear();
-    sleepForTime(SLEEP_TIME_MILLISECONDS);
-  }
-
-  private static void sleepForTime(int sleepTimeMs) {
-    try {
-      Thread.sleep(sleepTimeMs);
-    } catch (InterruptedException e) {
-      LOG.error("Sleep interrupted during throttling", e);
-      throw new RuntimeException(e);
-    }
   }
 
   @Override
   public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
       HoodieTable<T> hoodieTable) {
-    final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
-    setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
-    LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
-    JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
+    final Option<Float> desiredQPSFraction =  calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator);
+    // Map each fileId that has inserts to a unique partition Id. This will be used while
+    // repartitioning RDD<WriteStatus>
+    int partitionIndex = 0;
+    final List<String> fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0)
+                                   .map(w -> w.getFileId()).collect();
+    for (final String fileId : fileIds) {
+      this.fileIdPartitionMap.put(fileId, partitionIndex++);
+    }
+    JavaRDD<WriteStatus> partitionedRDD = this.numWriteStatusWithInserts == 0 ? writeStatusRDD :
+                                          writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w))
+                                            .partitionBy(new WriteStatusPartitioner(this.fileIdPartitionMap,
+                                              this.numWriteStatusWithInserts))
+                                            .map(w -> w._2());

Review comment:
       When this.numWriteStatusWithInserts == 0, fileIds will be empty. So fileIdPartitionMap will also be empty in this case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418391204



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -498,4 +554,37 @@ public boolean isImplicitWithStorage() {
   public void setHbaseConnection(Connection hbaseConnection) {
     HBaseIndex.hbaseConnection = hbaseConnection;
   }
+
+  /**
+   * Partitions each WriteStatus with inserts into a unique single partition. WriteStatus without inserts will be
+   * assigned to random partitions. This partitioner will be useful to utilize max parallelism with spark operations
+   * that are based on inserts in each WriteStatus.
+   */
+  public class WriteStatusPartitioner extends Partitioner {

Review comment:
       done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418402390



##########
File path: hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
##########
@@ -329,47 +332,140 @@ public void testPutBatchSizeCalculation() {
     // All asserts cases below are derived out of the first
     // example below, with change in one parameter at a time.
 
-    int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f);
-    // Expected batchSize is 8 because in that case, total request sent in one second is below
-    // 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) => 16000
-    // We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 request
-    // 1600 happens to be 10% of 16667 (maxQPSPerRegionServer) as expected.
-    Assert.assertEquals(putBatchSize, 8);
+    int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 0.1f);
+    // Total puts that can be sent  in 1 second = (10 * 16667 * 0.1) = 16,667
+    // Total puts per batch will be (16,667 / parallelism) = 83.335, where 200 is the maxExecutors
+    Assert.assertEquals(putBatchSize, 83);
 
     // Number of Region Servers are halved, total requests sent in a second are also halved, so batchSize is also halved
-    int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 100, 0.1f);
-    Assert.assertEquals(putBatchSize2, 4);
+    int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 0.1f);
+    Assert.assertEquals(putBatchSize2, 41);
 
     // If the parallelism is halved, batchSize has to double
-    int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 100, 0.1f);
-    Assert.assertEquals(putBatchSize3, 16);
+    int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 0.1f);
+    Assert.assertEquals(putBatchSize3, 166);
 
     // If the parallelism is halved, batchSize has to double.
     // This time parallelism is driven by numTasks rather than numExecutors
-    int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 100, 0.1f);
-    Assert.assertEquals(putBatchSize4, 16);
+    int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 0.1f);
+    Assert.assertEquals(putBatchSize4, 166);
 
     // If sleepTimeMs is halved, batchSize has to halve
-    int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.05f);
-    Assert.assertEquals(putBatchSize5, 4);
+    int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 0.05f);
+    Assert.assertEquals(putBatchSize5, 41);
 
     // If maxQPSPerRegionServer is doubled, batchSize also doubles
-    int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 100, 0.1f);
-    Assert.assertEquals(putBatchSize6, 16);
+    int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 0.1f);
+    Assert.assertEquals(putBatchSize6, 166);
   }
 
   @Test
   public void testsHBasePutAccessParallelism() {
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
     final JavaRDD<WriteStatus> writeStatusRDD = jsc.parallelize(
-        Arrays.asList(getSampleWriteStatus(1, 2), getSampleWriteStatus(0, 3), getSampleWriteStatus(10, 0)), 10);
+        Arrays.asList(
+          getSampleWriteStatus(0, 2),
+          getSampleWriteStatus(2, 3),
+          getSampleWriteStatus(4, 3),
+          getSampleWriteStatus(6, 3),
+          getSampleWriteStatus(8, 0)),
+        10);
     final Tuple2<Long, Integer> tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
     final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString());
     final int hbaseNumPuts = Integer.parseInt(tuple._1.toString());
     Assert.assertEquals(10, writeStatusRDD.getNumPartitions());
-    Assert.assertEquals(2, hbasePutAccessParallelism);
-    Assert.assertEquals(11, hbaseNumPuts);
+    Assert.assertEquals(4, hbasePutAccessParallelism);
+    Assert.assertEquals(20, hbaseNumPuts);
+  }
+
+  @Test
+  public void testsWriteStatusPartitioner() {
+    HoodieWriteConfig config = getConfig();
+    HBaseIndex index = new HBaseIndex(config);
+    int parallelism = 4;
+    final JavaRDD<WriteStatus> writeStatusRDD = jsc.parallelize(
+        Arrays.asList(
+          getSampleWriteStatusWithFileId(0, 2),
+          getSampleWriteStatusWithFileId(2, 3),
+          getSampleWriteStatusWithFileId(4, 3),
+          getSampleWriteStatusWithFileId(0, 3),
+          getSampleWriteStatusWithFileId(11, 0)), parallelism);
+    int partitionIndex = 0;
+    final Map<String, Integer> fileIdPartitionMap = new HashMap<>();
+
+    final List<String> fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0)

Review comment:
       Sure, I have moved this logic inside HBaseIndex to a separate method instead of a different class, let me know. Using this util method from tests.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418397300



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -252,8 +263,10 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
     };
   }
 
-  private Result[] doGet(HTable hTable, List<Get> keys) throws IOException {
-    sleepForTime(SLEEP_TIME_MILLISECONDS);
+  private Result[] doGet(HTable hTable, List<Get> keys, RateLimiter limiter) throws IOException {
+    if (keys.size() > 0) {

Review comment:
       Invoking hTable.get on empty keys is returning an empty Result array. But anyway, I changed it to explicitly return empty array now. Though I don't want to change the return type of doGet().




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418690538



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/RateLimiter.java
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+/*
+ * Note: Based on RateLimiter implementation in Google/Guava.
+ *         - adopted from com.google.common.util.concurrent
+ *           Copyright (C) 2012 The Guava Authors
+ *           Home page: https://github.com/google/guava
+ *           License: http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+@ThreadSafe
+public abstract class RateLimiter {
+  private final RateLimiter.SleepingTicker ticker;
+  private final long offsetNanos;
+  double storedPermits;
+  double maxPermits;
+  volatile double stableIntervalMicros;
+  private final Object mutex;
+  private long nextFreeTicketMicros;
+
+  public static RateLimiter create(double permitsPerSecond) {
+    return create(RateLimiter.SleepingTicker.SYSTEM_TICKER, permitsPerSecond);
+  }
+
+  static RateLimiter create(RateLimiter.SleepingTicker ticker, double permitsPerSecond) {
+    RateLimiter rateLimiter = new RateLimiter.Bursty(ticker, 1.0D);
+    rateLimiter.setRate(permitsPerSecond);
+    return rateLimiter;
+  }
+
+  public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
+    return create(RateLimiter.SleepingTicker.SYSTEM_TICKER, permitsPerSecond, warmupPeriod, unit);
+  }
+
+  static RateLimiter create(RateLimiter.SleepingTicker ticker, double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
+    RateLimiter rateLimiter = new RateLimiter.WarmingUp(ticker, warmupPeriod, unit);
+    rateLimiter.setRate(permitsPerSecond);
+    return rateLimiter;
+  }
+
+  private RateLimiter(RateLimiter.SleepingTicker ticker) {
+    this.mutex = new Object();
+    this.nextFreeTicketMicros = 0L;
+    this.ticker = ticker;
+    this.offsetNanos = ticker.read();
+  }
+
+  public final void setRate(double permitsPerSecond) {
+    checkArgument(permitsPerSecond > 0.0D && !Double.isNaN(permitsPerSecond), "rate must be positive");
+    Object var3 = this.mutex;
+    synchronized (this.mutex) {
+      this.resync(this.readSafeMicros());
+      double stableIntervalMicros = (double)TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
+      this.stableIntervalMicros = stableIntervalMicros;
+      this.doSetRate(permitsPerSecond, stableIntervalMicros);
+    }
+  }
+
+  abstract void doSetRate(double var1, double var3);
+
+  public final double getRate() {
+    return (double)TimeUnit.SECONDS.toMicros(1L) / this.stableIntervalMicros;
+  }
+
+  public void acquire() {
+    this.acquire(1);
+  }
+
+  public void acquire(int permits) {

Review comment:
       any chance we can add these as metrics/logs? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] n3nash commented on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
n3nash commented on pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#issuecomment-629876523


   @v3nkatesh The rate limiter looks good to me but it's still inspired from guava. I'll let @vinothchandar since he felt strongly about implementing our own.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#issuecomment-631426244


   @v3nkatesh allow me to give some context around why we are strict around guava.. Hudi as you know has to be dropped under many different services (hive/spark/presto,...) and guava as universal it is, presents a jar conflict nightmare... 
   
   On the re-using code itself, i know we have done this in few places in the past. but, this often leads to maintenance issues more often than not... we change the re-used code, people fix the original code.. and overtime we don't invest in getting upstream bug fixes etc.. So for small stuff like this, I prefer that we write it ourselves.. 
   
   Is this the original [code](https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/RateLimiter.java) ? I would say if we are trimming down that file and using some parts verbatim, we are still reusing code.. 
   
   The act of adding new things to LICENSE/NOTICE is not that straightforward, given we don't have an entry for guava yet.. We need to examine, guava's NOTICE, its dependencies etc.. I thought, even for you, just writing a small class and being done would be a better use of time? 
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418696889



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
   /**
    * Helper method to facilitate performing mutations (including puts and deletes) in Hbase.
    */
-  private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
+  private void doMutations(BufferedMutator mutator, List<Mutation> mutations, RateLimiter limiter) throws IOException {
     if (mutations.isEmpty()) {
       return;
     }
+    // report number of operations to account per second with rate limiter.
+    // If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls
+    // for within that second
+    limiter.acquire(mutations.size());
     mutator.mutate(mutations);
     mutator.flush();
     mutations.clear();

Review comment:
       example to help clarify what i meant:
   
   lets say, mutator.mutate() + flush +clear takes 2 seconds as minimum. limiter.acquire would never wait because it generates mutations.size() tokens every second. So we would never wait. looks like this is expected and we dont see it as a problem. So I'm fine with it. (if possible, having metrics on per operation wait time would help us debug any potential issues)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] v3nkatesh commented on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#issuecomment-628953098


   > @v3nkatesh There are a couple of pending comments from @satishkotha. If you can finish up with those we can merge this PR with the condition that we need to add to LICENSE file because of the copied code in ratelimiter here -> https://github.com/apache/incubator-hudi/blob/master/LICENSE
   
   @n3nash I have replied to the remaining comments. For RateLimiter class, can you check if the new refactoring is good enough to skip the LICENSE part ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] n3nash edited a comment on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
n3nash edited a comment on pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#issuecomment-629876523


   @v3nkatesh The rate limiter looks good to me but it's still inspired from guava. I'll let @vinothchandar comment since he felt strongly about implementing our own.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418387652



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -83,13 +88,17 @@
   private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
   private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
   private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path");
-  private static final int SLEEP_TIME_MILLISECONDS = 100;
 
   private static final Logger LOG = LogManager.getLogger(HBaseIndex.class);
   private static Connection hbaseConnection = null;
   private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
   private float qpsFraction;
   private int maxQpsPerRegionServer;
+  private int maxPutsPerSec;

Review comment:
       Yeah can be removed actually, I am anyway recalculating this inside getBatchSize(). Also cleaned up other variables that are not used. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar edited a comment on pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
vinothchandar edited a comment on pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#issuecomment-631426244


   @v3nkatesh allow me to give some context around why we are strict around guava.. Hudi as you know has to be dropped under many different services (hive/spark/presto,...) and guava as universal it is, presents a jar conflict nightmare... 
   
   On the re-using code itself, i know we have done this in few places in the past. but, this often leads to maintenance issues more often than not... we change the re-used code, people fix the original code.. and overtime we don't invest in getting upstream bug fixes etc.. So for small stuff like this, I prefer that we write it ourselves.. 
   
   Is this the original [code](https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/RateLimiter.java) ? I would say if we are trimming down that file and using some parts verbatim, we are still reusing code.. (which I think is what we are doing)
   
   The act of adding new things to LICENSE/NOTICE is not that straightforward, given we don't have an entry for guava yet.. We need to examine, guava's NOTICE, its dependencies etc.. I thought, even for you, just writing a small class and being done would be a better use of time? 
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r425498242



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -83,13 +88,14 @@
   private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
   private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
   private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path");
-  private static final int SLEEP_TIME_MILLISECONDS = 100;
 
   private static final Logger LOG = LogManager.getLogger(HBaseIndex.class);
   private static Connection hbaseConnection = null;
   private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
-  private float qpsFraction;
   private int maxQpsPerRegionServer;
+  private long totalNumInserts;
+  private int numWriteStatusWithInserts;

Review comment:
       These 2 are actually used at multiple places inside the class, so did not refactor. It's possible to move them inside methods, but it will mean re-calculating.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418387192



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -83,13 +88,17 @@
   private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
   private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
   private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path");
-  private static final int SLEEP_TIME_MILLISECONDS = 100;
 
   private static final Logger LOG = LogManager.getLogger(HBaseIndex.class);
   private static Connection hbaseConnection = null;
   private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
   private float qpsFraction;
   private int maxQpsPerRegionServer;
+  private int maxPutsPerSec;
+  private long totalNumInserts;
+  private int numWriteStatusWithInserts;
+  Map<String, Integer> fileIdPartitionMap = new HashMap<>();

Review comment:
       Yeah we don't need a class variable for this. Moved it inside updateLocation() method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] v3nkatesh commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus

Posted by GitBox <gi...@apache.org>.
v3nkatesh commented on a change in pull request #1484:
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r418389802



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
##########
@@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
   /**
    * Helper method to facilitate performing mutations (including puts and deletes) in Hbase.
    */
-  private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
+  private void doMutations(BufferedMutator mutator, List<Mutation> mutations, RateLimiter limiter) throws IOException {
     if (mutations.isEmpty()) {
       return;
     }
+    // report number of operations to account per second with rate limiter.
+    // If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls
+    // for within that second
+    limiter.acquire(mutations.size());
     mutator.mutate(mutations);
     mutator.flush();
     mutations.clear();
-    sleepForTime(SLEEP_TIME_MILLISECONDS);
-  }
-
-  private static void sleepForTime(int sleepTimeMs) {
-    try {
-      Thread.sleep(sleepTimeMs);
-    } catch (InterruptedException e) {
-      LOG.error("Sleep interrupted during throttling", e);
-      throw new RuntimeException(e);
-    }
   }
 
   @Override
   public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
       HoodieTable<T> hoodieTable) {
-    final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
-    setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
-    LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
-    JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
+    final Option<Float> desiredQPSFraction =  calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator);
+    // Map each fileId that has inserts to a unique partition Id. This will be used while
+    // repartitioning RDD<WriteStatus>
+    int partitionIndex = 0;
+    final List<String> fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0)
+                                   .map(w -> w.getFileId()).collect();
+    for (final String fileId : fileIds) {
+      this.fileIdPartitionMap.put(fileId, partitionIndex++);
+    }
+    JavaRDD<WriteStatus> partitionedRDD = this.numWriteStatusWithInserts == 0 ? writeStatusRDD :
+                                          writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w))
+                                            .partitionBy(new WriteStatusPartitioner(this.fileIdPartitionMap,
+                                              this.numWriteStatusWithInserts))
+                                            .map(w -> w._2());
+    acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc);
+    LOG.info("multiPutBatchSize before hbase puts: " + this.multiPutBatchSize);
+    JavaRDD<WriteStatus> writeStatusJavaRDD = partitionedRDD.mapPartitionsWithIndex(updateLocationFunction(),
+        true);
     // caching the index updated status RDD
     writeStatusJavaRDD = writeStatusJavaRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
+    // force trigger update location(hbase puts)
+    writeStatusJavaRDD.count();
+    this.hBaseIndexQPSResourceAllocator.releaseQPSResources();

Review comment:
       Correct, releaseQPSResources() has implementation outside of hoodie. This ensures that we release any resources acquired for HBase operations are released right after the spark stage is done (i.e. because of forced spark action)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org