You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/03/06 00:12:40 UTC

[GitHub] [incubator-gobblin] autumnust opened a new pull request #2912: [GOBBLIN-1072] Add sliding window to protect AutoScaling from fluctuation of number of active Helix partitions

autumnust opened a new pull request #2912: [GOBBLIN-1072] Add sliding window to protect AutoScaling from fluctuation of number of active Helix partitions
URL: https://github.com/apache/incubator-gobblin/pull/2912
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - https://issues.apache.org/jira/browse/GOBBLIN-1072
   
   
   ### Description
   - An additional data structure `MaxValueEvictingQueue` to wrap multiple instances of `numTargetContainers` and behaved like an sliding-window where we could fetch the maxValue in the sliding-window in constant time. 
   - The advantage of this data structure is, if there's fluctuation in the number of active Helix partitions, a single fluctuation wont't impact the number of containers we requested in `YarnService`. 
   - The sliding-window's size is defined by the number of attempts that we query Helix for getting partitions number. 
   
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389198699
 
 

 ##########
 File path: gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
 ##########
 @@ -40,6 +44,9 @@
  */
 @Test(groups = { "gobblin.yarn" })
 public class YarnAutoScalingManagerTest {
+  // A queue within size == 1 and upperBound == "infinite" should impose no impact on the execution.
 
 Review comment:
   "should impose no impact" -> "should not impact" 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] codecov-io commented on issue #2912: [GOBBLIN-1072] Being more conservative on releasing containers

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #2912: [GOBBLIN-1072] Being more conservative on releasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#issuecomment-596051653
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2912?src=pr&el=h1) Report
   > Merging [#2912](https://codecov.io/gh/apache/incubator-gobblin/pull/2912?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/64f9339027e4cbaec320ae7849a4f47e2d71e1c9?src=pr&el=desc) will **increase** coverage by `0.01%`.
   > The diff coverage is `72.34%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2912/graphs/tree.svg?width=650&token=4MgURJ0bGc&height=150&src=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2912?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2912      +/-   ##
   ============================================
   + Coverage     45.85%   45.87%   +0.01%     
   + Complexity     9187     9186       -1     
   ============================================
     Files          1934     1934              
     Lines         72858    72901      +43     
     Branches       8033     8045      +12     
   ============================================
   + Hits          33411    33442      +31     
   - Misses        36382    36388       +6     
   - Partials       3065     3071       +6
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2912?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...main/java/org/apache/gobblin/yarn/YarnService.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2912/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vWWFyblNlcnZpY2UuamF2YQ==) | `15.36% <ø> (+0.12%)` | `4 <0> (ø)` | :arrow_down: |
   | [.../gobblin/cluster/GobblinHelixMessagingService.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2912/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkhlbGl4TWVzc2FnaW5nU2VydmljZS5qYXZh) | `63.04% <ø> (ø)` | `4 <0> (ø)` | :arrow_down: |
   | [...org/apache/gobblin/yarn/GobblinYarnTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2912/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vR29iYmxpbllhcm5UYXNrUnVubmVyLmphdmE=) | `2.98% <100%> (+2.98%)` | `1 <0> (+1)` | :arrow_up: |
   | [...rg/apache/gobblin/yarn/YarnAutoScalingManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2912/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vWWFybkF1dG9TY2FsaW5nTWFuYWdlci5qYXZh) | `52.63% <71.73%> (+11.2%)` | `1 <0> (ø)` | :arrow_down: |
   | [...he/gobblin/metrics/reporter/ScheduledReporter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2912/diff?src=pr&el=tree#diff-Z29iYmxpbi1tZXRyaWNzLWxpYnMvZ29iYmxpbi1tZXRyaWNzLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vbWV0cmljcy9yZXBvcnRlci9TY2hlZHVsZWRSZXBvcnRlci5qYXZh) | `59.09% <0%> (-1.52%)` | `14% <0%> (-1%)` | |
   | [...lin/elasticsearch/writer/FutureCallbackHolder.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2912/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tZWxhc3RpY3NlYXJjaC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9lbGFzdGljc2VhcmNoL3dyaXRlci9GdXR1cmVDYWxsYmFja0hvbGRlci5qYXZh) | `61.42% <0%> (-1.43%)` | `4% <0%> (ø)` | |
   | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2912/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `65.06% <0%> (ø)` | `28% <0%> (-1%)` | :arrow_down: |
   | [.../apache/gobblin/service/FlowExecutionResource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2912/diff?src=pr&el=tree#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi1mbG93LWNvbmZpZy1zZXJ2aWNlL2dvYmJsaW4tZmxvdy1jb25maWctc2VydmljZS1zZXJ2ZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc2VydmljZS9GbG93RXhlY3V0aW9uUmVzb3VyY2UuamF2YQ==) | `0% <0%> (ø)` | `0% <0%> (ø)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2912?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2912?src=pr&el=footer). Last update [64f9339...7ae73e3](https://codecov.io/gh/apache/incubator-gobblin/pull/2912?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389199022
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -189,16 +224,123 @@ void runInternal() {
         }
       }
 
+      // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
+      // and potentially replanner-instance.
+      Set<String> allParticipants = getParticipants(GobblinYarnTaskRunner.class.getSimpleName());
+
+      // Find all joined participants not in-use for this round of inspection.
+      // If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
+      for (String participant : allParticipants) {
+        if (!inUseInstances.contains(participant)) {
+          instanceIdleSinceWhen.putIfAbsent(participant, System.currentTimeMillis());
+          if (absenceUnderTolerance(participant)) {
+            inUseInstances.add(participant);
+          }
+        } else {
+          // An instance that has been previously detected as idle but now back to in-use.
+          // Remove this instance if existed in the tracking map.
+          instanceIdleSinceWhen.remove(participant);
+        }
+      }
+
+
+
       // compute the target containers as a ceiling of number of partitions divided by the number of containers
       // per partition.
       int numTargetContainers = (int) Math.ceil((double)numPartitions / this.partitionsPerContainer);
 
       // adjust the number of target containers based on the configured min and max container values.
       numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));
 
+      slidingFixedWindow.add(numTargetContainers);
+
       log.info("There are {} containers being requested", numTargetContainers);
 
-      this.yarnService.requestTargetNumberOfContainers(numTargetContainers, inUseInstances);
+      this.yarnService.requestTargetNumberOfContainers(slidingFixedWindow.getMax(), inUseInstances);
+    }
+
+    @VisibleForTesting
+    /**
+     * Pass a participant if condition hold, where the condition, by default is that if an instance went back to
+     * active (having partition running on it) within {@link #maxIdleTimeInMinBeforeScalingDown} mins, we will
+     * not tag that instance as "unused" and have that as the candidate for scaling down.
+     */
+    boolean absenceUnderTolerance(String participant){
+      return System.currentTimeMillis() - instanceIdleSinceWhen.get(participant) <
+          TimeUnit.MINUTES.toMillis(maxIdleTimeInMinBeforeScalingDown);
+    }
+  }
+
+  /**
+   * A FIFO queue with fixed size and returns maxValue among all elements within the queue in constant time.
+   * This data structure prevent temporary fluctuation in the number of active helix partitions as the size of queue
 
 Review comment:
   prevent -> prevents

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2912: [GOBBLIN-1072] Add sliding window to protect AutoScaling from fluctuation of number of active Helix partitions

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2912: [GOBBLIN-1072] Add sliding window to protect AutoScaling from fluctuation of number of active Helix partitions
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389188549
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -196,9 +207,84 @@ void runInternal() {
       // adjust the number of target containers based on the configured min and max container values.
       numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));
 
+      slidingFixedWindow.add(numTargetContainers);
 
 Review comment:
   Discussed offline. Agreed on new way to tag an instance as "unused" and be more conservative on container release. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389192633
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -62,17 +67,24 @@
   private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX + "minContainers";
   private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
   private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX + "maxContainers";
-  private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
+  // A rough value of how much containers should be an intolerable number.
+  private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = 5000;
   private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + "initialDelay";
   private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
 
+  private final String WINDOW_SIZE_OBSERVING_CONTAINER_REQUEST = AUTO_SCALING_PREFIX + "windowSize";
 
 Review comment:
   AUTO_SCALING_WINDOW_SIZE?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389197867
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -189,16 +224,123 @@ void runInternal() {
         }
       }
 
+      // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
+      // and potentially replanner-instance.
+      Set<String> allParticipants = getParticipants(GobblinYarnTaskRunner.class.getSimpleName());
+
+      // Find all joined participants not in-use for this round of inspection.
+      // If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
+      for (String participant : allParticipants) {
+        if (!inUseInstances.contains(participant)) {
+          instanceIdleSinceWhen.putIfAbsent(participant, System.currentTimeMillis());
+          if (absenceUnderTolerance(participant)) {
+            inUseInstances.add(participant);
+          }
+        } else {
+          // An instance that has been previously detected as idle but now back to in-use.
+          // Remove this instance if existed in the tracking map.
+          instanceIdleSinceWhen.remove(participant);
+        }
+      }
+
+
+
       // compute the target containers as a ceiling of number of partitions divided by the number of containers
       // per partition.
       int numTargetContainers = (int) Math.ceil((double)numPartitions / this.partitionsPerContainer);
 
       // adjust the number of target containers based on the configured min and max container values.
       numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));
 
+      slidingFixedWindow.add(numTargetContainers);
+
       log.info("There are {} containers being requested", numTargetContainers);
 
-      this.yarnService.requestTargetNumberOfContainers(numTargetContainers, inUseInstances);
+      this.yarnService.requestTargetNumberOfContainers(slidingFixedWindow.getMax(), inUseInstances);
+    }
+
+    @VisibleForTesting
+    /**
+     * Pass a participant if condition hold, where the condition, by default is that if an instance went back to
+     * active (having partition running on it) within {@link #maxIdleTimeInMinBeforeScalingDown} mins, we will
+     * not tag that instance as "unused" and have that as the candidate for scaling down.
+     */
+    boolean absenceUnderTolerance(String participant){
+      return System.currentTimeMillis() - instanceIdleSinceWhen.get(participant) <
+          TimeUnit.MINUTES.toMillis(maxIdleTimeInMinBeforeScalingDown);
+    }
+  }
+
+  /**
+   * A FIFO queue with fixed size and returns maxValue among all elements within the queue in constant time.
+   * This data structure prevent temporary fluctuation in the number of active helix partitions as the size of queue
+   * grows and will be less sensitive when scaling down is actually required.
+   *
+   * The interface for this lass is implemented in a minimal-necessity manner to serve only as a sliding-sized-window
+   * which captures max value. It is NOT built for general purpose.
+   */
+  static class MaxValueEvictingQueue {
+    private ArrayDeque<Integer> evictQueue;
+    private PriorityQueue<Integer> priorityQueue;
+
+    // Queue Size
+    private int maxSize;
+    private static final int DEFAULT_MAX_SIZE = 10;
+
+    // Upper-bound of value within the queue.
+    private int upperBound;
+
+    public MaxValueEvictingQueue(int maxSize, int upperBound) {
+      Preconditions.checkArgument(maxSize > 0, "maxSize has to be a value larger than 0");
+
+      this.maxSize = maxSize;
+      this.upperBound = upperBound;
+      this.evictQueue = new ArrayDeque<>(maxSize);
+      this.priorityQueue = new PriorityQueue<>(maxSize, new Comparator<Integer>() {
+        @Override
+        public int compare(Integer o1, Integer o2) {
+          return o2.compareTo(o1);
+        }
+      });
+    }
+
+    public MaxValueEvictingQueue(int upperBound) {
+      this(DEFAULT_MAX_SIZE, upperBound);
+    }
+
+    /**
+     * Add element into data structure.
+     * When a new element is larger than value-upper-bound, reject the value for safety consideration.
 
 Review comment:
   Also, instead of "safety consideration", better be explicit about what condition you are trying to protect against. E.g. "we reject the element, since we may request too many Yarn containers".

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Add sliding window to protect AutoScaling from fluctuation of number of active Helix partitions

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Add sliding window to protect AutoScaling from fluctuation of number of active Helix partitions
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r388651170
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -196,9 +207,84 @@ void runInternal() {
       // adjust the number of target containers based on the configured min and max container values.
       numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));
 
+      slidingFixedWindow.add(numTargetContainers);
 
 Review comment:
   A problem with your proposal is that it is based on numTargetContainers (and indirectly, numPartitions) which may remain unchanged over a window of time, even though the partitions with no assigned participant may be changing from one observation interval to the next. 
   
   E.g. Let us assume we have 4 Helix partitions. In one observation interval, suppose that [1,2,3] have an assigned participant but not 4. In the next observation interval, suppose partitions [2,3,4] have an assigned participant but not 1. If your sliding window is of size 2, then you will infer that numTargetContainers is 3, while what you want is for numTargetContainers to be 4. 
   
   I think what you want is to track which Helix instances are unused. The current implementation is based on a point-in-time check against Helix for the set of partitions which have no assigned participant. One possible improvement here could be marking a Helix instance as "unused" if it has no assigned participant for the last 10 minutes. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389193932
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -149,6 +173,17 @@ public void run() {
       }
     }
 
+    /**
+     * Getting all instances (Helix Participants) in cluster at this moment.
+     * Note that the raw result could contains AppMaster node and replanner node.
+     * @param filterString Helix instances whose name containing fitlerString will pass filtering.
+     */
+    private Set<String> getParticipants(String filterString) {
+      PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
+      return helixDataAccessor.getChildValuesMap(keyBuilder.instances())
 
 Review comment:
   Would it suffice to get keyBuilder.liveInstances() instead of instances() which may return instances from previous runs?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389193038
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -62,17 +67,24 @@
   private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX + "minContainers";
   private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
   private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX + "maxContainers";
-  private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
+  // A rough value of how much containers should be an intolerable number.
+  private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = 5000;
   private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + "initialDelay";
   private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
 
+  private final String WINDOW_SIZE_OBSERVING_CONTAINER_REQUEST = AUTO_SCALING_PREFIX + "windowSize";
+
+  private final static int DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MIN = 10;
+
   private final Config config;
   private final HelixManager helixManager;
   private final ScheduledExecutorService autoScalingExecutor;
   private final YarnService yarnService;
   private final int partitionsPerContainer;
   private final int minContainers;
   private final int maxContainers;
+  private final MaxValueEvictingQueue slidingFixedSizeWindow;
+  private static int maxIdleTimeInMinBeforeScalingDown = DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MIN;
 
 Review comment:
   Same comment as earlier: MIN -> MINUTES.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389195898
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -189,16 +224,123 @@ void runInternal() {
         }
       }
 
+      // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
+      // and potentially replanner-instance.
+      Set<String> allParticipants = getParticipants(GobblinYarnTaskRunner.class.getSimpleName());
+
+      // Find all joined participants not in-use for this round of inspection.
+      // If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
+      for (String participant : allParticipants) {
+        if (!inUseInstances.contains(participant)) {
+          instanceIdleSinceWhen.putIfAbsent(participant, System.currentTimeMillis());
+          if (absenceUnderTolerance(participant)) {
+            inUseInstances.add(participant);
+          }
+        } else {
+          // An instance that has been previously detected as idle but now back to in-use.
 
 Review comment:
   Reword: "A previously idle instance is now detected to be in use."

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389229170
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -62,17 +67,24 @@
   private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX + "minContainers";
   private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
   private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX + "maxContainers";
-  private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
+  // A rough value of how much containers should be an intolerable number.
+  private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = 5000;
 
 Review comment:
   If we really request that much, does Yarn have any protection against us ? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389230335
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -189,16 +224,123 @@ void runInternal() {
         }
       }
 
+      // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
+      // and potentially replanner-instance.
+      Set<String> allParticipants = getParticipants(GobblinYarnTaskRunner.class.getSimpleName());
 
 Review comment:
   Good catch, I prefer to put that in `GobblinYarnTaskRunner` itself. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389200991
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -189,16 +224,123 @@ void runInternal() {
         }
       }
 
+      // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
+      // and potentially replanner-instance.
+      Set<String> allParticipants = getParticipants(GobblinYarnTaskRunner.class.getSimpleName());
+
+      // Find all joined participants not in-use for this round of inspection.
+      // If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
+      for (String participant : allParticipants) {
+        if (!inUseInstances.contains(participant)) {
+          instanceIdleSinceWhen.putIfAbsent(participant, System.currentTimeMillis());
+          if (absenceUnderTolerance(participant)) {
+            inUseInstances.add(participant);
+          }
+        } else {
+          // An instance that has been previously detected as idle but now back to in-use.
+          // Remove this instance if existed in the tracking map.
+          instanceIdleSinceWhen.remove(participant);
+        }
+      }
+
+
+
       // compute the target containers as a ceiling of number of partitions divided by the number of containers
       // per partition.
       int numTargetContainers = (int) Math.ceil((double)numPartitions / this.partitionsPerContainer);
 
       // adjust the number of target containers based on the configured min and max container values.
       numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));
 
+      slidingFixedWindow.add(numTargetContainers);
+
       log.info("There are {} containers being requested", numTargetContainers);
 
-      this.yarnService.requestTargetNumberOfContainers(numTargetContainers, inUseInstances);
+      this.yarnService.requestTargetNumberOfContainers(slidingFixedWindow.getMax(), inUseInstances);
+    }
+
+    @VisibleForTesting
+    /**
+     * Pass a participant if condition hold, where the condition, by default is that if an instance went back to
+     * active (having partition running on it) within {@link #maxIdleTimeInMinBeforeScalingDown} mins, we will
+     * not tag that instance as "unused" and have that as the candidate for scaling down.
+     */
+    boolean absenceUnderTolerance(String participant){
+      return System.currentTimeMillis() - instanceIdleSinceWhen.get(participant) <
+          TimeUnit.MINUTES.toMillis(maxIdleTimeInMinBeforeScalingDown);
+    }
+  }
+
+  /**
+   * A FIFO queue with fixed size and returns maxValue among all elements within the queue in constant time.
+   * This data structure prevent temporary fluctuation in the number of active helix partitions as the size of queue
+   * grows and will be less sensitive when scaling down is actually required.
+   *
+   * The interface for this lass is implemented in a minimal-necessity manner to serve only as a sliding-sized-window
+   * which captures max value. It is NOT built for general purpose.
+   */
+  static class MaxValueEvictingQueue {
+    private ArrayDeque<Integer> evictQueue;
 
 Review comment:
   evictQueue -> fifoQueue?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389192007
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -71,6 +74,8 @@
 
   private final String WINDOW_SIZE_OBSERVING_CONTAINER_REQUEST = AUTO_SCALING_PREFIX + "windowSize";
 
+  private final static int DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MIN = 10;
 
 Review comment:
   MIN -> MINUTES? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] asfgit closed pull request #2912: [GOBBLIN-1072] Being more conservative on releasing containers

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #2912: [GOBBLIN-1072] Being more conservative on releasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389197688
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -189,16 +224,123 @@ void runInternal() {
         }
       }
 
+      // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
+      // and potentially replanner-instance.
+      Set<String> allParticipants = getParticipants(GobblinYarnTaskRunner.class.getSimpleName());
+
+      // Find all joined participants not in-use for this round of inspection.
+      // If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
+      for (String participant : allParticipants) {
+        if (!inUseInstances.contains(participant)) {
+          instanceIdleSinceWhen.putIfAbsent(participant, System.currentTimeMillis());
+          if (absenceUnderTolerance(participant)) {
+            inUseInstances.add(participant);
+          }
+        } else {
+          // An instance that has been previously detected as idle but now back to in-use.
+          // Remove this instance if existed in the tracking map.
+          instanceIdleSinceWhen.remove(participant);
+        }
+      }
+
+
+
       // compute the target containers as a ceiling of number of partitions divided by the number of containers
       // per partition.
       int numTargetContainers = (int) Math.ceil((double)numPartitions / this.partitionsPerContainer);
 
       // adjust the number of target containers based on the configured min and max container values.
       numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));
 
+      slidingFixedWindow.add(numTargetContainers);
+
       log.info("There are {} containers being requested", numTargetContainers);
 
-      this.yarnService.requestTargetNumberOfContainers(numTargetContainers, inUseInstances);
+      this.yarnService.requestTargetNumberOfContainers(slidingFixedWindow.getMax(), inUseInstances);
+    }
+
+    @VisibleForTesting
+    /**
+     * Pass a participant if condition hold, where the condition, by default is that if an instance went back to
+     * active (having partition running on it) within {@link #maxIdleTimeInMinBeforeScalingDown} mins, we will
+     * not tag that instance as "unused" and have that as the candidate for scaling down.
+     */
+    boolean absenceUnderTolerance(String participant){
+      return System.currentTimeMillis() - instanceIdleSinceWhen.get(participant) <
+          TimeUnit.MINUTES.toMillis(maxIdleTimeInMinBeforeScalingDown);
+    }
+  }
+
+  /**
+   * A FIFO queue with fixed size and returns maxValue among all elements within the queue in constant time.
+   * This data structure prevent temporary fluctuation in the number of active helix partitions as the size of queue
+   * grows and will be less sensitive when scaling down is actually required.
+   *
+   * The interface for this lass is implemented in a minimal-necessity manner to serve only as a sliding-sized-window
+   * which captures max value. It is NOT built for general purpose.
+   */
+  static class MaxValueEvictingQueue {
+    private ArrayDeque<Integer> evictQueue;
+    private PriorityQueue<Integer> priorityQueue;
+
+    // Queue Size
+    private int maxSize;
+    private static final int DEFAULT_MAX_SIZE = 10;
+
+    // Upper-bound of value within the queue.
+    private int upperBound;
+
+    public MaxValueEvictingQueue(int maxSize, int upperBound) {
+      Preconditions.checkArgument(maxSize > 0, "maxSize has to be a value larger than 0");
+
+      this.maxSize = maxSize;
+      this.upperBound = upperBound;
+      this.evictQueue = new ArrayDeque<>(maxSize);
+      this.priorityQueue = new PriorityQueue<>(maxSize, new Comparator<Integer>() {
+        @Override
+        public int compare(Integer o1, Integer o2) {
+          return o2.compareTo(o1);
+        }
+      });
+    }
+
+    public MaxValueEvictingQueue(int upperBound) {
+      this(DEFAULT_MAX_SIZE, upperBound);
+    }
+
+    /**
+     * Add element into data structure.
+     * When a new element is larger than value-upper-bound, reject the value for safety consideration.
 
 Review comment:
   value-upper-bound -> upperBound.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389195341
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -189,16 +224,123 @@ void runInternal() {
         }
       }
 
+      // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
+      // and potentially replanner-instance.
+      Set<String> allParticipants = getParticipants(GobblinYarnTaskRunner.class.getSimpleName());
 
 Review comment:
   Can we define a static variable HELIX_INSTANCE_NAME_PREFIX = GobblinYarnTaskRunner.class.getSimpleName() in YarnService and use it in onContainersAllocated as well as here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389192881
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -62,17 +67,24 @@
   private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX + "minContainers";
   private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
   private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX + "maxContainers";
-  private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
+  // A rough value of how much containers should be an intolerable number.
+  private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = 5000;
 
 Review comment:
   Why reduce the default? Can we leave it as Integer.MAX_VALUE?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389198583
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -189,16 +224,123 @@ void runInternal() {
         }
       }
 
+      // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
+      // and potentially replanner-instance.
+      Set<String> allParticipants = getParticipants(GobblinYarnTaskRunner.class.getSimpleName());
+
+      // Find all joined participants not in-use for this round of inspection.
+      // If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
+      for (String participant : allParticipants) {
+        if (!inUseInstances.contains(participant)) {
+          instanceIdleSinceWhen.putIfAbsent(participant, System.currentTimeMillis());
+          if (absenceUnderTolerance(participant)) {
+            inUseInstances.add(participant);
+          }
+        } else {
+          // An instance that has been previously detected as idle but now back to in-use.
+          // Remove this instance if existed in the tracking map.
+          instanceIdleSinceWhen.remove(participant);
+        }
+      }
+
+
+
       // compute the target containers as a ceiling of number of partitions divided by the number of containers
       // per partition.
       int numTargetContainers = (int) Math.ceil((double)numPartitions / this.partitionsPerContainer);
 
       // adjust the number of target containers based on the configured min and max container values.
       numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));
 
+      slidingFixedWindow.add(numTargetContainers);
+
       log.info("There are {} containers being requested", numTargetContainers);
 
-      this.yarnService.requestTargetNumberOfContainers(numTargetContainers, inUseInstances);
+      this.yarnService.requestTargetNumberOfContainers(slidingFixedWindow.getMax(), inUseInstances);
+    }
+
+    @VisibleForTesting
+    /**
+     * Pass a participant if condition hold, where the condition, by default is that if an instance went back to
+     * active (having partition running on it) within {@link #maxIdleTimeInMinBeforeScalingDown} mins, we will
+     * not tag that instance as "unused" and have that as the candidate for scaling down.
+     */
+    boolean absenceUnderTolerance(String participant){
+      return System.currentTimeMillis() - instanceIdleSinceWhen.get(participant) <
+          TimeUnit.MINUTES.toMillis(maxIdleTimeInMinBeforeScalingDown);
+    }
+  }
+
+  /**
+   * A FIFO queue with fixed size and returns maxValue among all elements within the queue in constant time.
+   * This data structure prevent temporary fluctuation in the number of active helix partitions as the size of queue
+   * grows and will be less sensitive when scaling down is actually required.
+   *
+   * The interface for this lass is implemented in a minimal-necessity manner to serve only as a sliding-sized-window
+   * which captures max value. It is NOT built for general purpose.
+   */
+  static class MaxValueEvictingQueue {
+    private ArrayDeque<Integer> evictQueue;
+    private PriorityQueue<Integer> priorityQueue;
+
+    // Queue Size
+    private int maxSize;
+    private static final int DEFAULT_MAX_SIZE = 10;
+
+    // Upper-bound of value within the queue.
+    private int upperBound;
+
+    public MaxValueEvictingQueue(int maxSize, int upperBound) {
+      Preconditions.checkArgument(maxSize > 0, "maxSize has to be a value larger than 0");
+
+      this.maxSize = maxSize;
+      this.upperBound = upperBound;
+      this.evictQueue = new ArrayDeque<>(maxSize);
+      this.priorityQueue = new PriorityQueue<>(maxSize, new Comparator<Integer>() {
+        @Override
+        public int compare(Integer o1, Integer o2) {
+          return o2.compareTo(o1);
+        }
+      });
+    }
+
+    public MaxValueEvictingQueue(int upperBound) {
+      this(DEFAULT_MAX_SIZE, upperBound);
+    }
+
+    /**
+     * Add element into data structure.
+     * When a new element is larger than value-upper-bound, reject the value for safety consideration.
+     * When queue is full, evict head of FIFO-queue (In FIFO queue, elements are inserted from tail).
+     */
+    public void add(int e) {
+      if (e > upperBound) {
+        log.error(String.format("Request of getting %s containers seems to be excessive, rejected", e));
+        return;
+      }
+
+      if (evictQueue.size() == maxSize) {
+        Integer removedElement = evictQueue.remove();
+        priorityQueue.remove(removedElement);
+      }
+
+      if (evictQueue.size() == priorityQueue.size()) {
+        evictQueue.add(e);
+        priorityQueue.add(e);
+      } else {
+        throw new IllegalStateException("Queue has its internal data structure being inconsistent.");
+      }
+    }
+
+    /**
+     * If queue if empty, throw {@link IllegalStateException}.
 
 Review comment:
   Typo: "If queue is empty"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on releasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on releasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389233805
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -62,17 +67,24 @@
   private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX + "minContainers";
   private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
   private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX + "maxContainers";
-  private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
+  // A rough value of how much containers should be an intolerable number.
+  private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = 5000;
 
 Review comment:
   In practice, the number of containers we request is limited by number of Helix partitions. So we would never get close to this limit. On the other hand, setting the default to a smaller value may result in hard-to-debug scenarios, where some partitions are stuck due to resource limits imposed by this setting.  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389196373
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -189,16 +224,123 @@ void runInternal() {
         }
       }
 
+      // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
+      // and potentially replanner-instance.
+      Set<String> allParticipants = getParticipants(GobblinYarnTaskRunner.class.getSimpleName());
+
+      // Find all joined participants not in-use for this round of inspection.
+      // If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
+      for (String participant : allParticipants) {
+        if (!inUseInstances.contains(participant)) {
+          instanceIdleSinceWhen.putIfAbsent(participant, System.currentTimeMillis());
+          if (absenceUnderTolerance(participant)) {
+            inUseInstances.add(participant);
+          }
+        } else {
+          // An instance that has been previously detected as idle but now back to in-use.
+          // Remove this instance if existed in the tracking map.
+          instanceIdleSinceWhen.remove(participant);
+        }
+      }
+
+
+
       // compute the target containers as a ceiling of number of partitions divided by the number of containers
       // per partition.
       int numTargetContainers = (int) Math.ceil((double)numPartitions / this.partitionsPerContainer);
 
       // adjust the number of target containers based on the configured min and max container values.
       numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));
 
+      slidingFixedWindow.add(numTargetContainers);
+
       log.info("There are {} containers being requested", numTargetContainers);
 
-      this.yarnService.requestTargetNumberOfContainers(numTargetContainers, inUseInstances);
+      this.yarnService.requestTargetNumberOfContainers(slidingFixedWindow.getMax(), inUseInstances);
+    }
+
+    @VisibleForTesting
+    /**
+     * Pass a participant if condition hold, where the condition, by default is that if an instance went back to
+     * active (having partition running on it) within {@link #maxIdleTimeInMinBeforeScalingDown} mins, we will
+     * not tag that instance as "unused" and have that as the candidate for scaling down.
+     */
+    boolean absenceUnderTolerance(String participant){
 
 Review comment:
   absenceUnderTolerance -> isInstanceUnused, for better readability? Returns true if it is unused and false otherwise. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389193332
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -137,6 +154,13 @@ protected void shutDown() throws Exception {
     private final int partitionsPerContainer;
     private final int minContainers;
     private final int maxContainers;
+    private final MaxValueEvictingQueue slidingFixedWindow;
+    private final HelixDataAccessor helixDataAccessor;
+    /**
+     * A static map that keep track of an idle instance and its latest beginning idle time.
+     * If an instance is no long idle when inspected, it will be dropped from this map.
 
 Review comment:
   no long -> no longer

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389200838
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -189,16 +224,123 @@ void runInternal() {
         }
       }
 
+      // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
+      // and potentially replanner-instance.
+      Set<String> allParticipants = getParticipants(GobblinYarnTaskRunner.class.getSimpleName());
+
+      // Find all joined participants not in-use for this round of inspection.
+      // If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
+      for (String participant : allParticipants) {
+        if (!inUseInstances.contains(participant)) {
+          instanceIdleSinceWhen.putIfAbsent(participant, System.currentTimeMillis());
+          if (absenceUnderTolerance(participant)) {
+            inUseInstances.add(participant);
+          }
+        } else {
+          // An instance that has been previously detected as idle but now back to in-use.
+          // Remove this instance if existed in the tracking map.
+          instanceIdleSinceWhen.remove(participant);
+        }
+      }
+
+
+
       // compute the target containers as a ceiling of number of partitions divided by the number of containers
       // per partition.
       int numTargetContainers = (int) Math.ceil((double)numPartitions / this.partitionsPerContainer);
 
       // adjust the number of target containers based on the configured min and max container values.
       numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));
 
+      slidingFixedWindow.add(numTargetContainers);
+
       log.info("There are {} containers being requested", numTargetContainers);
 
-      this.yarnService.requestTargetNumberOfContainers(numTargetContainers, inUseInstances);
+      this.yarnService.requestTargetNumberOfContainers(slidingFixedWindow.getMax(), inUseInstances);
+    }
+
+    @VisibleForTesting
+    /**
+     * Pass a participant if condition hold, where the condition, by default is that if an instance went back to
+     * active (having partition running on it) within {@link #maxIdleTimeInMinBeforeScalingDown} mins, we will
+     * not tag that instance as "unused" and have that as the candidate for scaling down.
+     */
+    boolean absenceUnderTolerance(String participant){
+      return System.currentTimeMillis() - instanceIdleSinceWhen.get(participant) <
+          TimeUnit.MINUTES.toMillis(maxIdleTimeInMinBeforeScalingDown);
+    }
+  }
+
+  /**
+   * A FIFO queue with fixed size and returns maxValue among all elements within the queue in constant time.
+   * This data structure prevent temporary fluctuation in the number of active helix partitions as the size of queue
+   * grows and will be less sensitive when scaling down is actually required.
+   *
+   * The interface for this lass is implemented in a minimal-necessity manner to serve only as a sliding-sized-window
+   * which captures max value. It is NOT built for general purpose.
+   */
+  static class MaxValueEvictingQueue {
 
 Review comment:
   Can we call this a "SlidingWindowReservoir" instead of MaxValueEvictingQueue, as it seems to be tracking the last K elements with a method getMax() to return the maximum value from the reservoir. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389193535
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -137,6 +154,13 @@ protected void shutDown() throws Exception {
     private final int partitionsPerContainer;
     private final int minContainers;
     private final int maxContainers;
+    private final MaxValueEvictingQueue slidingFixedWindow;
+    private final HelixDataAccessor helixDataAccessor;
+    /**
+     * A static map that keep track of an idle instance and its latest beginning idle time.
+     * If an instance is no long idle when inspected, it will be dropped from this map.
+     */
+    private static final Map<String, Long> instanceIdleSinceWhen = new HashMap<>();
 
 Review comment:
   instanceIdleSinceWhen -> instanceIdleSince

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2912: [GOBBLIN-1072] Being more conservative on leasing containers
URL: https://github.com/apache/incubator-gobblin/pull/2912#discussion_r389199173
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 ##########
 @@ -189,16 +224,123 @@ void runInternal() {
         }
       }
 
+      // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
+      // and potentially replanner-instance.
+      Set<String> allParticipants = getParticipants(GobblinYarnTaskRunner.class.getSimpleName());
+
+      // Find all joined participants not in-use for this round of inspection.
+      // If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
+      for (String participant : allParticipants) {
+        if (!inUseInstances.contains(participant)) {
+          instanceIdleSinceWhen.putIfAbsent(participant, System.currentTimeMillis());
+          if (absenceUnderTolerance(participant)) {
+            inUseInstances.add(participant);
+          }
+        } else {
+          // An instance that has been previously detected as idle but now back to in-use.
+          // Remove this instance if existed in the tracking map.
+          instanceIdleSinceWhen.remove(participant);
+        }
+      }
+
+
+
       // compute the target containers as a ceiling of number of partitions divided by the number of containers
       // per partition.
       int numTargetContainers = (int) Math.ceil((double)numPartitions / this.partitionsPerContainer);
 
       // adjust the number of target containers based on the configured min and max container values.
       numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));
 
+      slidingFixedWindow.add(numTargetContainers);
+
       log.info("There are {} containers being requested", numTargetContainers);
 
-      this.yarnService.requestTargetNumberOfContainers(numTargetContainers, inUseInstances);
+      this.yarnService.requestTargetNumberOfContainers(slidingFixedWindow.getMax(), inUseInstances);
+    }
+
+    @VisibleForTesting
+    /**
+     * Pass a participant if condition hold, where the condition, by default is that if an instance went back to
+     * active (having partition running on it) within {@link #maxIdleTimeInMinBeforeScalingDown} mins, we will
+     * not tag that instance as "unused" and have that as the candidate for scaling down.
+     */
+    boolean absenceUnderTolerance(String participant){
+      return System.currentTimeMillis() - instanceIdleSinceWhen.get(participant) <
+          TimeUnit.MINUTES.toMillis(maxIdleTimeInMinBeforeScalingDown);
+    }
+  }
+
+  /**
+   * A FIFO queue with fixed size and returns maxValue among all elements within the queue in constant time.
+   * This data structure prevent temporary fluctuation in the number of active helix partitions as the size of queue
+   * grows and will be less sensitive when scaling down is actually required.
+   *
+   * The interface for this lass is implemented in a minimal-necessity manner to serve only as a sliding-sized-window
 
 Review comment:
   Typo: drop "lass"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services