You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by HeartSaVioR <gi...@git.apache.org> on 2017/08/04 05:20:00 UTC

[GitHub] storm pull request #2261: STORM-2678 Improve performance of LoadAwareShuffle...

GitHub user HeartSaVioR opened a pull request:

    https://github.com/apache/storm/pull/2261

    STORM-2678 Improve performance of LoadAwareShuffleGrouping

    * construct ring which represents distribution of tasks based on load
    * chooseTasks() just accesses the ring sequentially
    * port related tests from Clojure to Java
    
    I'm not expert of micro-benchmark but I also craft some of simple performance tests which you can see them from LoadAwareShuffleGroupingTest. They are `testBenchmarkLoadAwareShuffleGroupingEvenLoad` and `testBenchmarkLoadAwareShuffleGroupingUnevenLoad `, and I put `@Ignore` to avoid running unless we want to do performance test on.
    
    Here's my observation on running them, using old and new LoadAwareShuffleGrouping:
    
    > testBenchmarkLoadAwareShuffleGroupingEvenLoad (old)
    Duration: 114470 ms
    Duration: 115973 ms
    Duration: 114807 ms
    
    > testBenchmarkLoadAwareShuffleGroupingEvenLoad (new)
    Duration: 106819 ms
    Duration: 105857 ms
    Duration: 106789 ms
    
    > testBenchmarkLoadAwareShuffleGroupingUnevenLoad (old)
    Duration: 113484 ms
    Duration: 118152 ms
    Duration: 112664 ms
    
    > testBenchmarkLoadAwareShuffleGroupingUnevenLoad (new)
    Duration: 106071 ms
    Duration: 105938 ms
    Duration: 106115 ms
    
    You can see that modified LoadAwareShuffleGrouping is faster than before, 5% or more for single threaded access. Maybe would want to do multi-threading performance test, with keeping in mind that  accessing OutputCollector with single-thread is preferred over multi-threads.
    
    This still respects thread-safety, and I think respecting thread-safety is better than before, given that we only allow one thread to update the ring, and we replace the new information at once, not updating information on the fly which other threads are referencing.
    We still don't guard information with mutual-exclusion manner, but I think it is tolerable like we do before.
    
    I'm planning to explore some more, mostly about reducing call System.currentTimeMillis() in chooseTasks(). I'll put additional commits if I find any more improvements: it will be easy to revert some if we don't want to.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HeartSaVioR/storm STORM-2678

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2261.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2261
    
----
commit 6c693e3a2d57cca3648240335ef6e1862dbbfc4c
Author: Jungtaek Lim <ka...@gmail.com>
Date:   2017-08-04T04:25:27Z

    STORM-2678 Improve performance of LoadAwareShuffleGrouping
    
    * construct ring which represents distribution of tasks based on load
    * chooseTasks() just accesses the ring sequentially
    * port related tests from Clojure to Java

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2261: STORM-2678 Improve performance of LoadAwareShuffle...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2261#discussion_r134645082
  
    --- Diff: storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java ---
    @@ -52,25 +78,92 @@ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<I
         }
     
         @Override
    +    public void refreshLoad(LoadMapping loadMapping) {
    +        updateRing(loadMapping);
    +    }
    +
    +    @Override
         public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
    -        if ((lastUpdate + 1000) < System.currentTimeMillis()) {
    -            int local_total = 0;
    -            for (int i = 0; i < targets.length; i++) {
    -                int val = (int)(101 - (load.get(targets[i]) * 100));
    -                loads[i] = val;
    -                local_total += val;
    +        int rightNow;
    +        while (true) {
    +            rightNow = current.incrementAndGet();
    +            if (rightNow < CAPACITY) {
    +                return rets[choices[rightNow]];
    +            } else if (rightNow == CAPACITY) {
    +                current.set(0);
    +                return rets[choices[0]];
                 }
    -            total = local_total;
    -            lastUpdate = System.currentTimeMillis();
    +            //race condition with another thread, and we lost
    +            // try again
             }
    -        int selected = random.nextInt(total);
    -        int sum = 0;
    -        for (int i = 0; i < targets.length; i++) {
    -            sum += loads[i];
    -            if (selected < sum) {
    -                return rets[i];
    +    }
    +
    +    private void updateRing(LoadMapping load) {
    +        int localTotal = 0;
    +        for (int i = 0 ; i < targets.length; i++) {
    +            int val = (int)(101 - (load.get(targets[i]) * 100));
    +            loads[i] = val;
    +            localTotal += val;
    +        }
    +
    +        int currentIdx = 0;
    +        int unassignedIdx = 0;
    +        for (int i = 0 ; i < loads.length ; i++) {
    +            if (currentIdx == CAPACITY) {
    +                break;
                 }
    +
    +            int loadForTask = loads[i];
    +            int amount = Math.round(loadForTask * 1.0f * CAPACITY / localTotal);
    +            // assign at least one for task
    +            if (amount == 0) {
    +                unassigned[unassignedIdx++] = i;
    +            }
    +            for (int j = 0; j < amount; j++) {
    +                if (currentIdx == CAPACITY) {
    +                    break;
    +                }
    +
    +                prepareChoices[currentIdx++] = i;
    +            }
    +        }
    +
    +        if (currentIdx < CAPACITY) {
    +            // if there're some rooms, give unassigned tasks a chance to be included
    +            // this should be really small amount, so just add them sequentially
    +            if (unassignedIdx > 0) {
    +                for (int i = currentIdx ; i < CAPACITY ; i++) {
    +                    prepareChoices[i] = unassigned[(i - currentIdx) % unassignedIdx];
    +                }
    +            } else {
    +                // just pick random
    +                for (int i = currentIdx ; i < CAPACITY ; i++) {
    +                    prepareChoices[i] = random.nextInt(loads.length);
    +                }
    +            }
    +        }
    +
    +        shuffleArray(prepareChoices);
    +
    +        // swapping two arrays
    +        int[] tempForSwap = choices;
    +        choices = prepareChoices;
    +        prepareChoices = tempForSwap;
    +
    +        current.set(0);
    --- End diff --
    
    Again logically this should be -1 because we **increment and get** and unlike in `chooseTasks()` we don't read the value in this method, but it doesn't hurt much anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2261
  
    I just take opposite approach, pushing updated load mapping when load updater timer is activated. We no longer need any tricks or optimizations to reduce checking, and even no need to check updating duration.
    
    This is based on fact that `LoadAwareShuffleGrouping.updateRing()` doesn't break other threads calling `LoadAwareShuffleGrouping.chooseTasks()` concurrently.
    
    I guess we couldn't optimize better easily unless we change some specifications like allowing non-thread-safety or so.
    
    > testBenchmarkLoadAwareShuffleGroupingEvenLoad
    Duration: 27596 ms
    Duration: 27772 ms
    
    > testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded
    Max duration among threads is : 89274 ms
    Max duration among threads is : 86466 ms
    
    Given that it changes some interfaces, I would like to see many reviewers reviewing and providing opinions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2261
  
    Now I introduce 'skip checking update count' to avoid calling System.currentTimeMillis() every time, but it has clear trade-off, we should call AtomicInteger.incrementAndGet() every time.
    
    I set the skip checking update index to 100 for starting value, and re-run tests. Test result is below: 
    
    > testBenchmarkLoadAwareShuffleGroupingEvenLoad
    Duration: 39352 ms
    Duration: 39753 ms
    Duration: 40604 ms
    
    > testBenchmarkLoadAwareShuffleGroupingUnevenLoad
    Duration: 39615 ms
    Duration: 39760 ms
    Duration: 40497 ms
    
    > testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded
    Max duration among threads is : 113115 ms
    Max duration among threads is : 169270 ms
    Max duration among threads is : 142175 ms
    
    > testBenchmarkLoadAwareShuffleGroupingUnevenLoadAndMultiThreaded
    Max duration among threads is : 168007 ms
    Max duration among threads is : 148562 ms
    Max duration among threads is : 168780 ms
    
    Now multi-threads hurts more on performance, but still faster than old LASG's. What we really achieve is performance improvement with single-thread. we reduced more than half of time than before.
    
    I expect that greater value of 'skip checking update index' would make single-thread much faster, and maybe multi-threads faster, so there's room to explore, but the value should be reasonable to answer the question: "Are we OK to delay updating load information if less than N (the value) calls occurred within 1 sec?"
    
    Btw, update duration (M secs) is another variable to explore. Maybe also need to see how often origin load information gets updated, since it is meaningless that LASG updates the information more often then origin load information gets updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2261: STORM-2678 Improve performance of LoadAwareShuffle...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2261#discussion_r134644962
  
    --- Diff: storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java ---
    @@ -20,30 +20,56 @@
     import java.io.Serializable;
     import java.util.ArrayList;
     import java.util.Arrays;
    +import java.util.Collections;
     import java.util.List;
     import java.util.Random;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
     
    +import com.google.common.annotations.VisibleForTesting;
    +import org.apache.commons.lang.ArrayUtils;
     import org.apache.storm.generated.GlobalStreamId;
     import org.apache.storm.task.WorkerTopologyContext;
     
     public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
    +    private static final int CAPACITY = 1000;
    +
         private Random random;
         private List<Integer>[] rets;
         private int[] targets;
         private int[] loads;
    -    private int total;
    -    private long lastUpdate = 0;
    +    private int[] unassigned;
    +    private int[] choices;
    +    private int[] prepareChoices;
    +    private AtomicInteger current;
     
         @Override
         public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
             random = new Random();
    +
             rets = (List<Integer>[])new List<?>[targetTasks.size()];
             targets = new int[targetTasks.size()];
             for (int i = 0; i < targets.length; i++) {
                 rets[i] = Arrays.asList(targetTasks.get(i));
                 targets[i] = targetTasks.get(i);
             }
    +
    +        // can't leave choices to be empty, so initiate it similar as ShuffleGrouping
    +        choices = new int[CAPACITY];
    +
    +        for (int i = 0 ; i < CAPACITY ; i++) {
    +            choices[i] = i % rets.length;
    +        }
    +
    +        shuffleArray(choices);
    +        current = new AtomicInteger(0);
    --- End diff --
    
    Logically this should be -1 because we **increment and get**, but it doesn't hurt much.
    (Same applies to ShuffleGrouping)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2261
  
    From a code perspective the changes look fine to me.  It would be nice to possibly clean up the load aware grouping Interface some.
    
    From a performance perspective it is a bit of a wash from what I am seeing.
    
    I ran ThroughputVsLatency with 1 spout, 1 splitter, and 1 counter.  I also did the same with 2 splitters, so there was a choice to be made.
    
    From a cost perspective I saw that for the 1 splitter case everything appeared to be within the noise range.  For the 2 splitter case I saw the new version taking up about 5% more CPU than the old version, so I would like to explore this a bit more.
    
    Similarly for latency at the mean, 99%ile and 99.9%ile latency measurements.  The original one was slightly better, around 5%, when there were 2 splitters.
    
    I do want to explore these a bit more, because it seams counter the benchmarks that others have run.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2261
  
    Raw numbers are here: https://gist.github.com/HeartSaVioR/5e80ab3a58b3e8cf40bab9c6da482639


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2261
  
    I explored and applied more changes:
    * change ArrayList to pre-allocated array (they're only allocated from `prepare()`)
    * fixed length of `chooses` length: 1000
      * at first I set this to 100, and some tests were failing due to distribution
    
    This might use a bit more memory, but will reduce object allocation on each updating loads.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2261: STORM-2678 Improve performance of LoadAwareShuffle...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2261#discussion_r134901967
  
    --- Diff: storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java ---
    @@ -20,5 +20,6 @@
     import java.util.List;
     
     public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
    +   void refreshLoad(LoadMapping loadMapping);
        List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load);
    --- End diff --
    
    Addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2261
  
    @revans2 
    Thanks for doing the performance test. I just ran some tests (rate 100000, 1 worker, max spout pending 1000) with TVL and got below result. Picked only 240 secs, but I also have raw numbers as well.
    
    > patch
    
    uptime:  240 acked: 3,014,700 acked/sec: 100,490.00 failed:        0 99%:      14,041,087 99.9%:      17,924,095 min:       3,284,992 max:      30,244,863 mean:    7,576,927.04 stddev:    1,771,441.20 user:    120,360 sys:      5,830 gc:      1,454 mem:     135.74
    
    uptime:  241 acked: 3,013,460 acked/sec: 100,448.67 failed:        0 99%:      13,623,295 99.9%:      16,859,135 min:       3,282,944 max:      24,559,615 mean:    7,498,565.53 stddev:    1,626,489.06 user:    119,890 sys:      5,550 gc:      1,320 mem:      73.44
    
    > master
    
    uptime:  240 acked: 3,013,700 acked/sec: 100,456.67 failed:        0 99%:      13,533,183 99.9%:      17,563,647 min:       3,311,616 max:      25,821,183 mean:    7,470,969.09 stddev:    1,618,231.68 user:    118,480 sys:      5,720 gc:      1,236 mem:      97.66
    
    uptime:  241 acked: 3,015,020 acked/sec: 100,500.67 failed:        0 99%:      13,516,799 99.9%:      16,654,335 min:       3,284,992 max:      22,331,391 mean:    7,501,903.32 stddev:    1,650,650.14 user:    119,170 sys:      5,320 gc:      1,364 mem:      84.73
    
    > patch, acker off
    
    uptime:  240 acked: 3,013,040 acked/sec: 100,434.67 failed:        0 99%:       4,464,639 99.9%:       6,959,103 min:             283 max:      11,902,975 mean:      634,505.19 stddev:      696,511.91 user:     76,820 sys:      6,470 gc:        868 mem:     165.32
    
    uptime:  240 acked: 3,012,520 acked/sec: 100,417.33 failed:        0 99%:       4,255,743 99.9%:       6,008,831 min:             323 max:       9,650,175 mean:      627,677.58 stddev:      651,341.18 user:     76,300 sys:      6,860 gc:        885 mem:     111.09
    
    > master, acker off
    
    uptime:  241 acked: 3,013,420 acked/sec: 100,447.33 failed:        0 99%:       2,060,287 99.9%:       4,468,735 min:             356 max:       7,733,247 mean:      565,479.02 stddev:    417,299.37 user:     73,570 sys:      7,070 gc:        399 mem:     237.66
    
    uptime:  241 acked: 3,012,800 acked/sec: 100,426.67 failed:        0 99%:       3,928,063 99.9%:       7,241,727 min:             450 max:      17,530,879 mean:      617,198.01 stddev:      658,032.31 user:     75,270 sys:      7,290 gc:        731 mem:     241.72
    
    I've shown numbers from grouper performance test, and also ConstSpoutNullBoltTopo which clearly shows newer is (much) faster, so actually the result is confusing to me. Maybe better to put more efforts to standardize approach of performance tests. I'll try to find time to have a look at loadgen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2261
  
    Now I introduce 'skip checking update count' to avoid calling System.currentTimeMillis() every time, but it has clear trade-off, we should call AtomicInteger.incrementAndGet() every time.
    I set the skip checking update index to 10, 100, 1000, 10000, 100000, and re-run tests. Test result is below:
    
    > 10
    
    >> testBenchmarkLoadAwareShuffleGroupingEvenLoad
    Duration: 48650 ms
    Duration: 49058 ms
    Duration: 48445 ms
    
    >> testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded
    Max duration among threads is : 159369 ms
    Max duration among threads is : 130093 ms
    Max duration among threads is : 138557 ms
    
    > 100
    
    >> testBenchmarkLoadAwareShuffleGroupingEvenLoad
    Duration: 41093 ms
    Duration: 40393 ms
    Duration: 40524 ms
    
    >> testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded
    Max duration among threads is : 142575 ms
    Max duration among threads is : 139276 ms
    Max duration among threads is : 145470 ms
    
    > 1000
    
    >> testBenchmarkLoadAwareShuffleGroupingEvenLoad
    Duration: 40238 ms
    Duration: 39715 ms
    Duration: 39242 ms
    
    >> testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded
    Max duration among threads is : 168089 ms
    Max duration among threads is : 161082 ms
    Max duration among threads is : 169998 ms
    
    > 10000
    
    >> testBenchmarkLoadAwareShuffleGroupingEvenLoad
    Duration: 40535 ms
    Duration: 39319 ms
    Duration: 46815 ms
    
    >> testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded
    Max duration among threads is : 140426 ms
    Max duration among threads is : 166214 ms
    Max duration among threads is : 169368 ms
    
    > 100000
    
    >> testBenchmarkLoadAwareShuffleGroupingEvenLoad
    Duration: 39801 ms
    Duration: 39535 ms
    Duration: 39537 ms
    
    >> testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded
    Max duration among threads is : 147115 ms
    Max duration among threads is : 140722 ms
    Max duration among threads is : 172955 ms
    
    Test result seems to fluctuate, but we can see that the change is good roughly.
    
    Now multi-threads fluctuates more and hurts on performance compared to before, but still faster than old LASG's. What we really get is performance improvement with single-thread: we reduced more than half of time than before.
    
    The value of ‘skip checking update count' should be reasonable to answer the question: "Are we OK to delay updating load information if less than N (the value) calls occurred within 1 sec?" We may want to put better efforts to find the value (given that test results was not stable enough), but at least from test result, 100 seems be a good value. Higher value doesn't show linear performance improvement.
    
    Btw, update duration (M secs) is another variable to explore. Maybe also need to see how often origin load information gets updated, since it is meaningless that LASG updates the information more often then origin load information gets updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2261
  
    Now I have another numbers to persuade this patch.
    
    I just take same approach to what @Ethanlm is done with his patch #2270 
    Performance testing on ConstSpoutNullBoltTopo with ACKing disabled.
    
    1. Config: topology.message.timeout: 300; topology.max.spout.pending: 5000; topology.acker.executors: 0
    1. 1 VM from AWS c4.xlarge, `dedicated` to get more accurate result
    1. Launched 1 workers, 1 spout task and 1 bolt task. ACKing disabled.
    1. All experiments ran 300s.
    1. For clarity, only show the outputs at 240s.
    1. tested 3 times for each case, and pick one which showed median result
    1. Numbers fluctuate slightly during the experiments.
    
    Used 08038b6 (last commit) for this patch, and 77354fe for baseline (master).
    
    Grouping | transferred (messages) | transfer rate (message/s) | spout_transferred | spout_acks | spout_throughput (acks/s)
    -- | -- | -- | -- | -- | --
    New LocalOrShuffle (patch) | 167984520 | 2799742 | 167984520 | 167984520 | 2799742
    LocalOrShuffle (master) | 130891240 | 2181520 | 130891240 | 130891260 | 2181520
    LocalOrShuffle with loadaware disabled (master) | 161410760 | 2690179 | 161410760 | 161410740 | 2690179
    
    So the new LoadAwareShuffleGrouping is definitely faster than current LoadAwareShuffleGrouping (about 28%), and *even faster* than current ShuffleGrouping (about 4%).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2261: STORM-2678 Improve performance of LoadAwareShuffle...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2261#discussion_r134778874
  
    --- Diff: storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java ---
    @@ -20,5 +20,6 @@
     import java.util.List;
     
     public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
    +   void refreshLoad(LoadMapping loadMapping);
        List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load);
    --- End diff --
    
    If we are going to refresh the load out of band then lets delete the LoadMapping from the parameters passed into chooseTask.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2261
  
    From second commits I added performance tests on multi threads: `testBenchmarkLoadAwareShuffleGroupingEvenLoad` and `testBenchmarkLoadAwareShuffleGroupingUnevenLoad`.
    
    I started exploring this with 2 threads, and may explore it with more threads if necessary.
    
    > testBenchmarkLoadAwareShuffleGroupingEvenLoad (old)
    Max duration among threads is : 185274 ms
    Max duration among threads is : 189359 ms
    
    > testBenchmarkLoadAwareShuffleGroupingEvenLoad (new)
    Max duration among threads is : 123411 ms
    Max duration among threads is : 128937 ms
    
    > testBenchmarkLoadAwareShuffleGroupingUnevenLoad (old)
    Max duration among threads is : 184834 ms
    Max duration among threads is : 185551 ms
    
    > testBenchmarkLoadAwareShuffleGroupingUnevenLoad (new)
    Max duration among threads is : 123978 ms
    Max duration among threads is : 124113 ms
    
    The test result clearly shows that both are having performance hit on multi-threads but new LASG is affected much less than old LASG.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2261: STORM-2678 Improve performance of LoadAwareShuffle...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2261#discussion_r134644598
  
    --- Diff: storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java ---
    @@ -52,25 +78,92 @@ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<I
         }
     
         @Override
    +    public void refreshLoad(LoadMapping loadMapping) {
    +        updateRing(loadMapping);
    +    }
    +
    +    @Override
         public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
    -        if ((lastUpdate + 1000) < System.currentTimeMillis()) {
    -            int local_total = 0;
    -            for (int i = 0; i < targets.length; i++) {
    -                int val = (int)(101 - (load.get(targets[i]) * 100));
    -                loads[i] = val;
    -                local_total += val;
    +        int rightNow;
    +        while (true) {
    +            rightNow = current.incrementAndGet();
    +            if (rightNow < CAPACITY) {
    +                return rets[choices[rightNow]];
    +            } else if (rightNow == CAPACITY) {
    +                current.set(0);
    --- End diff --
    
    I borrowed this from ShuffleGrouping, and even I'm not clear whether it makes race condition, I think it is acceptable since race condition scenarios don't incur out of index, just letting some threads selecting same index and maybe skip some indices.
    Moreover this patch contains tests which addresses multi-thread safety.
    
    We can still replace `set` with `getAndSet` to make it fully thread-safe, but we need to do more experiments if we would want to. Same applies to ShuffleGrouping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2261: STORM-2678 Improve performance of LoadAwareShuffle...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2261#discussion_r134640202
  
    --- Diff: storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java ---
    @@ -52,25 +78,92 @@ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<I
         }
     
         @Override
    +    public void refreshLoad(LoadMapping loadMapping) {
    +        updateRing(loadMapping);
    +    }
    +
    +    @Override
         public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
    -        if ((lastUpdate + 1000) < System.currentTimeMillis()) {
    -            int local_total = 0;
    -            for (int i = 0; i < targets.length; i++) {
    -                int val = (int)(101 - (load.get(targets[i]) * 100));
    -                loads[i] = val;
    -                local_total += val;
    +        int rightNow;
    +        while (true) {
    +            rightNow = current.incrementAndGet();
    +            if (rightNow < CAPACITY) {
    +                return rets[choices[rightNow]];
    +            } else if (rightNow == CAPACITY) {
    +                current.set(0);
    --- End diff --
    
    If you are trying to make this thread safe, i suspect this `current.set(0)`  is a race condition. not sure if its an acceptable race condition or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2261
  
    Build failure is missing removing CHANGELOG.md to binary distribution pom. I'll address and rebase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2261: STORM-2678 Improve performance of LoadAwareShuffle...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2261#discussion_r134876831
  
    --- Diff: storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java ---
    @@ -20,5 +20,6 @@
     import java.util.List;
     
     public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
    +   void refreshLoad(LoadMapping loadMapping);
        List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load);
    --- End diff --
    
    OK. I was just waiting for reviewing to see if we are OK with changing the way of providing LoadMapping. Looks like you're OK with pushing so I'll remove the method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2261: STORM-2678 Improve performance of LoadAwareShuffle...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2261


---

[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2261
  
    I spent the last few hours running more tests and I get the same results.  I am not too concerned about it.  The overhead appears to be rather low if any.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2261
  
    I got back to 8f63d5a which doesn't touch any interfaces and do same tests:
    
    Grouping | transferred (messages) | transfer rate (message/s) | spout_transferred | spout_acks | spout_throughput (acks/s)
    -- | -- | -- | -- | -- | --
    New LocalOrShuffle (patch) | 160441600 | 2674026 | 160441600 | 160441580 | 2674026
    
    Now it is a bit slower than ShuffleGrouping but still faster than LoadAwareShuffleGrouping (about 22%).
    
    So we can choose either better improvement with touching multiple parts or still great improvement without touching other parts.
    
    I have tested another thing, replacing List with Array in ShuffleGrouping. Test result is below:
    
    Grouping | transferred (messages) | transfer rate (message/s) | spout_transferred | spout_acks | spout_throughput (acks/s)
    -- | -- | -- | -- | -- | --
    LocalOrShuffle with loadaware disabled (master) | 161437800 | 2690630 | 161437800 | 161437760 | 2690630
    
    It doesn't seem to bring noticeable improvement.
    
    The difference may be the length of the array: the array is too small (would have 1 element) in test and had to call another `set()` in addition to `incrementAndGet()` for every time. Please note that the length of array in the patch is 1000, so `set()` is called every 1000 times.
    
    We could grow the array in `prepare()` to get better performance, but that's going to be a micro-optimization and I'm not clear we would need to apply.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2261
  
    @revans2 @roshannaik 
    I know you're busy, but could you have time to take a look at the change? I guess it is clear improvement and I provide raw numbers to see the difference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2261
  
    The impact of loadAware that you show here seems inline with what I have seen. Encouraging to see these improvements. 
    Reviewed only the core chooseTasks() implementation and left one comment there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---