You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by clockfly <gi...@git.apache.org> on 2014/05/07 05:14:42 UTC

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

GitHub user clockfly opened a pull request:

    https://github.com/apache/incubator-storm/pull/103

    STORM-297 Storm Performance cannot be scaled up by adding more CPU cores

    STORM-297:
    
    Description and test report can be found at https://issues.apache.org/jira/browse/STORM-297
    The changes consists of:
    1. use netty async
    2. use batch send and batch receiver messaging api
    3. allow to configure multiple worker receiver threads.
    4. name the executor and netty threads


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/clockfly/incubator-storm storm_async_netty_and_batch_api

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-storm/pull/103.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #103
    
----
commit 861a92eab8740cfc0f83ac4d7ade9a2ab04a8b9b
Author: Sean Zhong <cl...@gmail.com>
Date:   2014-05-07T03:10:07Z

    1. Async netty message transfer 2. Batch send and batch receive api and implementation 3. allow to configure the number of receiver thread 4. name the threads

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45413255
  
    I tried to merge the pull request into master, and the tests are now failing with the following in the logs (on my mac)
    ```
    134871 [Thread-1250-disruptor-worker-transfer-queue] INFO  backtype.storm.util - Halting process: ("Async loop died!")
    ```
    
    I have not tried it on Linux, but the tests all pass on master.  @clockfly could you please look into it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43704046
  
    @revans, 
    
    I created a jira https://issues.apache.org/jira/browse/STORM-329 to handle the case that worker sudden crashs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45448273
  
    FYI: I'll add Nathan's comment on our test methodology to DEVELOPER.md.
    
    
    > On 07.06.2014, at 20:59, Nathan Marz <no...@github.com> wrote:
    > 
    > -1 Tests should never, ever rely on timing in order to pass. This is the whole reason for doing time simulation in the first place, so that when functionality depends on time it can be properly tested without having to worry about random delays messing up the tests.
    > 
    > complete-topology is inherently reliant on detecting topology completion based on the spout saying all its tuples are "complete". If you're testing topologies that don't do full tuple acking, then you should be testing using the "tracked topologies" utilities in backtype.storm.testing.clj
    > 
    > For example, here is how the acking system is tested using tracked topologies: https://github.com/apache/incubator-storm/blob/master/storm-core/test/clj/backtype/storm/integration_test.clj#L213
    > 
    > The "tracked-wait" function is the key which will only return when both that many tuples have been emitted by the spouts AND the topology is idle (no tuples have been emitted nor will be emitted without further input) You shouldn't use tracked-topologies for topologies that have tick tuples, but that shouldn't be a problem in this case.
    > 
    > —
    > Reply to this email directly or view it on GitHub.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12410447
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -84,43 +93,87 @@
     
             // Start the connection attempt.
             remote_addr = new InetSocketAddress(host, port);
    -        bootstrap.connect(remote_addr);
    +        
    +        Thread flushChecker = new Thread(new Runnable() {
    +            @Override
    +            public void run() {
    +                //make sure we have a connection
    +                connect();
    +                
    +                while(!closing) {
    +                    long flushCheckTime = flushCheckTimer.get();
    +                    long now = System.currentTimeMillis();
    +                    if (now > flushCheckTime) {
    +                        Channel channel = channelRef.get();
    +                        if (null != channel && channel.isWritable()) {
    +                            flush();
    +                        }
    +                    }
    +                    try {
    +                        Thread.sleep(flushCheckInterval);
    +                    } catch (InterruptedException e) {
    +                        break;
    +                    }
    +                }
    +                
    +            }
    +        });
    +        
    +        flushChecker.setDaemon(true);
    --- End diff --
    
    I will fix this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12812473
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -84,43 +93,87 @@
     
             // Start the connection attempt.
             remote_addr = new InetSocketAddress(host, port);
    -        bootstrap.connect(remote_addr);
    +        
    +        Thread flushChecker = new Thread(new Runnable() {
    --- End diff --
    
    Can we make this thread shared between the clients, otherwise we will have a dedicated thread per client, which can cause resource utilization issues, hitting a ulimit with the number of processes allowed per user.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-42918783
  
    Hi Nathan,
    
    Thank you for your comments, I just drew a diagram for your reference.
    https://issues.apache.org/jira/secure/attachment/12644559/storm_Netty_receiver_diagram.png
    
    Sean



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45413952
  
    Test pass on my machine,
    Tests run: 157, Assertions: 101461, Failures: 0, Errors: 0
    
    There should have a stacktrace in the log file, it can searched by using "Async loop died!" without "Halting process"
    
                        (catch Throwable t
                          (log-error t "Async loop died!")
                          (kill-fn t)
                          ))
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by miofthena <gi...@git.apache.org>.
Github user miofthena commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-44900176
  
    Hi, @clockfly 
    In Client.java
    
    * submit *flusher* every ~10ms
    * every *flusher* never stops (if closed == true)
    * in *flusher* loop we don't park thread 
    
    → 100% CPU,  uncool =(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43656469
  
    Bobby, 
    
    Sorry for this regression. Now it is fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45046513
  
    @Gvain, thanks for your detailed description. I care nothing but finding the truth. :Let me try your approach to reproduce the test. 
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45414524
  
    @Bobby, I reproduced this. It is random. I will look into this, thanks.
    
    Seems the Netty Client is closed before worker transfer thread is closed.
    So, transfer thread got an exception.
    
    137917 [Thread-549-disruptor-worker-transfer-queue] ERROR backtype.storm.util - Async loop died!
    java.lang.RuntimeException: java.lang.RuntimeException: Client is being closed, and does not take requests any more
    	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:114) ~[classes/:na]
    	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:85) ~[classes/:na]
    	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:78) ~[classes/:na]
    	at backtype.storm.disruptor$consume_loop_STAR_$fn__2895.invoke(disruptor.clj:89) ~[classes/:na]
    	at backtype.storm.util$async_loop$fn__442.invoke(util.clj:434) ~[classes/:na]
    	at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
    	at java.lang.Thread.run(Thread.java:679) [na:1.6.0_22]
    Caused by: java.lang.RuntimeException: Client is being closed, and does not take requests any more
    	at backtype.storm.messaging.netty.Client.send(Client.java:194) ~[classes/:na]
    	at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) ~[classes/:na]
    	at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__5765$fn__5766.invoke(worker.clj:322) ~[classes/:na]
    	at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__5765.invoke(worker.clj:320) ~[classes/:na]
    	at backtype.storm.disruptor$clojure_handler$reify__2882.onEvent(disruptor.clj:59) ~[classes/:na]
    	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:111) ~[classes/:na]
    	... 6 common frames omitted



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by nathanmarz <gi...@git.apache.org>.
Github user nathanmarz commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-42916013
  
    Can you describe how the worker receive thread has been changed in the pull request? In particular, do we still have the guarantee that messages sent from task A to task B are received in the same order they are sent (or not received at all)? For example, if task A sends messages 1, 2, 3, 4, and task B receives 1, 4, 3 that would violate this guarantee. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45416875
  
    fixed.
    
    The failed case is under namespace: backtype.storm.messaging-test
    
    root cause:
    The test failure is caused by the bug of local topology simulation. funtion complete-topology will asume topology is finished after all spout return, but it is not always true, the downstream bolts may be still working.
    
    direct cause:
    complete-topology->all spout finish -> shutdown the topology->shutdown worker->shutdown netty client -> bolt still not finished -> bolt try to send mesage to netty client -> client already shutdown -> throw runtime exception.
    
    the fix is to add a config to allow this topology to wait a longer time to be killed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by nathanmarz <gi...@git.apache.org>.
Github user nathanmarz commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45056169
  
    Great work @clockfly. The only additional change I'd like to see is renaming "worker.receiver.thread.count" to "topology.worker.receiver.thread.count". The "topology" prefix is how we indicate confs can be set on a topology-specific level. After that I'm +1. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43659441
  
    The unit tests pass and the code looks OK to me.  I am +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986
  
    @clockfly,
    
    Your logic makes since to me on why these calls are blocking.  My biggest concern around the blocking is in the case of a worker crashing.  If a single worker crashes this can block the entire topology from executing until that worker comes back up.  In some cases I can see that being something that you would want.  In other cases I can see speed being the primary concern and some users would like to get partial data fast, rather then accurate data later.
    
    Could we make it configurable on a follow up JIRA where we can have a max limit to the buffering that is allowed, before we block, or throw data away (which is what zeromq does)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-44634492
  
    Hi Gvain,
    
    Firstly I apologize for the late reply, I was stuck by a 2-day training.
    
    Thank you for sharing your data! It is really awesome that we are using the data to present our points!
    
    Before jumping to conclusions, let's make sure we understand the data in the same way and the test can be reproduced, I have some questions about your data.
    
    1. what tps here is consist of? Is it solely consist of spout message? or also including acker message? and other message?
    2. What is the message latency in each test?
    3. I noticed your network usage is pretty small, is it overall cluster network usage? or is it only single machine network usage? Are you using 1Gb network?
    4. What is your max spout pending setting? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by Gvain <gi...@git.apache.org>.
Github user Gvain commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r13386178
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -128,133 +185,114 @@ private long getSleepTimeMs()
         }
     
         /**
    -     * Enqueue a task message to be sent to server
    +     * Enqueue task messages to be sent to server
          */
    -    public void send(int task, byte[] message) {
    -        //throw exception if the client is being closed
    -        if (being_closed.get()) {
    +    synchronized public void send(Iterator<TaskMessage> msgs) {
    +
    +        // throw exception if the client is being closed
    +        if (closing) {
                 throw new RuntimeException("Client is being closed, and does not take requests any more");
             }
    -
    -        try {
    -            message_queue.put(new TaskMessage(task, message));
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException(e);
    +        
    +        if (null == msgs || !msgs.hasNext()) {
    +            return;
             }
    -    }
    -
    -    /**
    -     * Retrieve messages from queue, and delivery to server if any
    -     */
    -    synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
    -        //just skip if delivery only if waiting, and we are not waiting currently
    -        if (only_if_waiting && !wait_for_requests)  return;
     
    -        //make sure that channel was not closed
             Channel channel = channelRef.get();
    -        if (channel == null)  return;
    -        if (!channel.isOpen()) {
    -            LOG.info("Channel to {} is no longer open.",remote_addr);
    -            //The channel is not open yet. Reconnect?
    -            reconnect();
    -            return;
    +        if (null == channel) {
    +            connect();
    +            channel = channelRef.get();
             }
     
    -        final MessageBatch requests = tryTakeMessages();
    -        if (requests==null) {
    -            wait_for_requests = true;
    -            return;
    -        }
    +        while (msgs.hasNext()) {
    +            TaskMessage message = msgs.next();
    +            if (null == messageBatch) {
    +                messageBatch = new MessageBatch(messageBatchSize);
    +            }
     
    -        //if channel is being closed and we have no outstanding messages,  let's close the channel
    -        if (requests.isEmpty() && being_closed.get()) {
    -            close_n_release();
    -            return;
    +            messageBatch.add(message);
    +            if (messageBatch.isFull()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                flushRequest(channel, toBeFlushed);
    --- End diff --
    
    What if channel is not writable ? Furthermore, what if messageBatch continuous being filled up under heavy throughput while channel is still not writable as connected peer is a slow component ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45492048
  
    Thank you all!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12362569
  
    --- Diff: storm-core/src/clj/backtype/storm/messaging/loader.clj ---
    @@ -24,45 +24,62 @@
     (defn mk-local-context []
       (local/mk-context))
     
    --- End diff --
    
    support mutiple worker receive threads


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45052822
  
    Hi @Gvain,
    
    First thank you for your insistence! it really helps to gain more understanding of storm.  
    I run with your approach, the result verified your saying, the throughput does increase as worker number increase.
    
    Test
    -------------
    
    **Throughput vs. worker# ①**
    
    | Worker# |cluster network IN(MB/s)|spout throughput(msg/s)|overall CPU|user cpu|system cpu|total threads#② |
    | ------------- |:-------------:| -----:|-----:|-----:|-----:|-----:|-----:|
    | 4 | 72 | 391860 | 42% | 36% | 6%| 422 |
    | 8 | 88.3 | 475007 | 42% | 35%	| 7%	| 554 | 
    | 12 |	104 | 555174 | 51% | 40% | 11% | 686 |
    | 16 | 116 | 603399 | 59% | 46% | 13% | 818 |  
    | 24 |	130 | 622938 | 77% | 55% | 22% | 1082 | 
    | 4(storm 297) |	140 |	752479 | 74% | 67.80% | 5.20% | 434 |
    
    ①	Test environment: node=4, 48 vcore on each machine, max.spout.pendings = 1000, CPU: E52680, 48 spout, 48 bolt, and 48 ackers.
    ②	We will only count 1 gc thread 1 jit thread for each JVM.
    
    
    
    ![worker_throughput_without_storm-297](https://cloud.githubusercontent.com/assets/2595532/3169502/82447702-eb9e-11e3-9f97-bc29fde190f3.png)
    
    
    **CPU Usage**
    
    ![cpu_worker _scability](https://cloud.githubusercontent.com/assets/2595532/3169503/afdac11c-eb9e-11e3-9207-9d524606f613.png)
    
    We can find out both throughput and CPU usage increase when worker number increase.
    
    Analysis
    -------------------
    The facts revealed by this test strengthened my conviction that it is even better to apply this patch for higher performance:
    
    1. **Higher performance with this patch.**
    
      | | Worker# |cluster network IN(MB/s)|spout through put(msg/s)|
    | ------------- |:-------------:| -----:|-----:|-----:|
    | no storm-297 | 24 |	130 | 622938 |
    | with storm 297 | 4 |	140 |	752479 |
    
      **4 worker** with storm-297 can process **20% more** message than **24 workers** without this patch, with **less** CPU consumption.
    
    2. **Storm cannot scale well by changing task parallism solely**
    
      As the data in your test showed, for 4 worker, we can only reach 56% CPU. with the facts that there are 36 task parallism for each worker, much lagger than CPU core# 24.
    
      **The worker has inherit bottlenecks, ths issues are there. Work-arounds won't fix those issues.**
    
    3. **CPU System time increase when worker number increase**
    
      | Worker# |user cpu|system cpu|
    | ------------- | -----:|-----:|
    | 4 | 36% | 6%| 
    | 8 | 35%	| 7%	|
    | 12 |	40% | 11% |
    | 16 | 46% | 13% |  
    | 24 |	55% | **22%** |
    | 4(with storm-297) | 67.80% | 5.20% |
    
      Unnormal high system CPU is not good.
    
    4. **JVM allocation cost is high.**
      In the test, there are 16+ threads running for each JVM allocation. The supporting data can be found at  
    
    5. **Worker allocation is not cost free**
    
     Besides the JVM allocation cost, there are zookeeper related threads(4), hearbeat threads(5), system bolt(2), netty boss and worker threads(6, 2 boss, 2 worker, and 2 timer). Plus with JVM threads, each worker will add at least 33 threads. 
    
      More threads will add more pressure to central service like nimbus, and zookeeper.
    
      More threads means more context switch, it will hurt the performance of all applications running on these server. 
    
      | Worker# |cluster total threads#|
    | ------------- |:-------------:|
    | 4 | 422 |
    | 8 | 554 | 
    | 12 | 686 |
    | 16 | 818 |  
    | 24 | **1082** | 
    | 4(with storm-297) | 434 |
    (We will only count 1 gc thread 1 jit thread for each JVM) 
    
      For 24 workers case, the overall thread number is 1082+, with **each machine having more than 270 threads!**
    
    
    6. **Serialization and Deserialization Cost**
    
      When the message is delivered from the task in the same process, the tuple won't be serialized. 
    
      When there are 4 worker(suppose shuffle grouping and task is even distributed), there are 1/4 message that don't need serialization, but for 24 workers, the ratio is 1/24, which means we are now need to serialize 28% more messages.
    
     


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-42479510
  
    In general this looks good to me, but it needs to be reviewed by additional committers familiar with the netty transport. Unit tests pass as did a basic smoke test on a 3-node cluster.
    
    As I mentioned in an earlier comment, the additional config parameters need to be added to the `defaults.yaml` file, preferably with some comments/documentation regarding usage and ramifications of certain settings (e.g. sync vs. async). That way users won't have to dig around in source code to determine the default values.
    
    I also noticed that some of the changes you mentioned in the pdf doc don't appear in this pull request (e.g. serialization.reserve.tuple.createTime)? I'm just curious as to why you left some things out.
    
    Thanks for the contribution!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12813817
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java ---
    @@ -18,70 +18,24 @@
     package backtype.storm.messaging.netty;
     
     import java.net.ConnectException;
    -import java.util.concurrent.atomic.AtomicBoolean;
     
    -import org.jboss.netty.channel.Channel;
    -import org.jboss.netty.channel.ChannelHandlerContext;
    -import org.jboss.netty.channel.ChannelStateEvent;
    -import org.jboss.netty.channel.ExceptionEvent;
    -import org.jboss.netty.channel.MessageEvent;
    -import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    +import org.jboss.netty.channel.*;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.net.ConnectException;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -
     public class StormClientHandler extends SimpleChannelUpstreamHandler  {
         private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
         private Client client;
    -    long start_time;
         
         StormClientHandler(Client client) {
    --- End diff --
    
    If all this code does now is log error messages can we drop the Client from the constructor and rename it to something that describes what it does better?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by Gvain <gi...@git.apache.org>.
Github user Gvain commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-42414069
  
    Does the performance number includes all components' 'transferred' throught as yahooeng does here http://yahooeng.tumblr.com/post/64758709722/making-storm-fly-with-netty ? Or only the spouts' 'emitted' throughput counts ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45060188
  
    @clockfly Beyond the specific work on this item (STORM-297) what would be your lessons learned after having investigated Storm's performance and scalability so closely, i.e. with regards to potential bottlenecks (= things that are broken and that need fixing) or areas where we still have untapped potential (= where we could optimize further, maybe simply because we haven't focused on a particular piece of the architecture/code/... yet)?
    
    For instance, would you say that there are some general design-related decisions that could put an upper bound on Storm's scalability (in theory and/or in practice)?  Or would you say it is, at least at this point in time, more the current implementation of the design?  Just thinking aloud.
    
    I'd appreciate any comments -- positive or negative -- you might have in this context.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12845130
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -84,43 +93,87 @@
     
             // Start the connection attempt.
             remote_addr = new InetSocketAddress(host, port);
    -        bootstrap.connect(remote_addr);
    +        
    +        Thread flushChecker = new Thread(new Runnable() {
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12827325
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java ---
    @@ -31,35 +31,69 @@
     import org.slf4j.LoggerFactory;
     
     import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
     import java.util.Map;
     import java.util.concurrent.Executors;
     import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
     
     class Server implements IConnection {
         private static final Logger LOG = LoggerFactory.getLogger(Server.class);
         @SuppressWarnings("rawtypes")
         Map storm_conf;
         int port;
    -    private LinkedBlockingQueue<TaskMessage> message_queue;
    +    
    +    // Create multiple queues for incoming messages. The size equals the number of receiver threads.
    +    // For message which is sent to same task, it will be stored in the same queue to preserve the message order.
    +    private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
    +    
         volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
         final ChannelFactory factory;
         final ServerBootstrap bootstrap;
    -
    +    
    +    private int queueCount;
    +    HashMap<Integer, Integer> taskToQueueId = null;
    +    int roundRobinQueueId;
    +	
    +    boolean closing = false;
    +    List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
    +    
    +    
         @SuppressWarnings("rawtypes")
         Server(Map storm_conf, int port) {
             this.storm_conf = storm_conf;
             this.port = port;
    -        message_queue = new LinkedBlockingQueue<TaskMessage>();
    -
    +        
    +        queueCount = Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1);
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45362633
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by Gvain <gi...@git.apache.org>.
Github user Gvain commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43708724
  
    clockfly,
    
    Thanks for your patience. 
    Do you mean that allocating one worker per node is better than several workers per node as the netty threads from different worker process will compete with each other ?
    In production practice, it is properly to allocate only one worker per node ? I don't think so
    
    And i think using several netty threads working in sync and non-batch mode may have some what the same effect with using only one netty threads working in async and batch mode. Maybe i should test this out.  By the way, what storm version do you use in the test ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12827582
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -128,133 +181,105 @@ private long getSleepTimeMs()
         }
     
         /**
    -     * Enqueue a task message to be sent to server
    +     * Enqueue task messages to be sent to server
          */
    -    public void send(int task, byte[] message) {
    -        //throw exception if the client is being closed
    -        if (being_closed.get()) {
    +    synchronized public void send(Iterator<TaskMessage> msgs) {
    +
    +        // throw exception if the client is being closed
    +        if (closing) {
                 throw new RuntimeException("Client is being closed, and does not take requests any more");
             }
    -
    -        try {
    -            message_queue.put(new TaskMessage(task, message));
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException(e);
    +        
    +        if (null == msgs || !msgs.hasNext()) {
    +            return;
             }
    -    }
     
    -    /**
    -     * Retrieve messages from queue, and delivery to server if any
    -     */
    -    synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
    -        //just skip if delivery only if waiting, and we are not waiting currently
    -        if (only_if_waiting && !wait_for_requests)  return;
    -
    -        //make sure that channel was not closed
             Channel channel = channelRef.get();
    -        if (channel == null)  return;
    -        if (!channel.isOpen()) {
    -            LOG.info("Channel to {} is no longer open.",remote_addr);
    -            //The channel is not open yet. Reconnect?
    -            reconnect();
    -            return;
    +        if (null == channel) {
    +            connect();
    +            channel = channelRef.get();
             }
     
    -        final MessageBatch requests = tryTakeMessages();
    -        if (requests==null) {
    -            wait_for_requests = true;
    -            return;
    -        }
    +        while (msgs.hasNext()) {
    +            TaskMessage message = msgs.next();
    +            if (null == messageBatch) {
    +                messageBatch = new MessageBatch(messageBatchSize);
    +            }
     
    -        //if channel is being closed and we have no outstanding messages,  let's close the channel
    -        if (requests.isEmpty() && being_closed.get()) {
    -            close_n_release();
    -            return;
    +            messageBatch.add(message);
    +            if (messageBatch.isFull()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                flushRequest(channel, toBeFlushed, blocking);
    +                messageBatch = null;
    +            }
             }
     
    -        //we are busily delivering messages, and will check queue upon response.
    -        //When send() is called by senders, we should not thus call tryDeliverMessages().
    -        wait_for_requests = false;
    -
    -        //write request into socket channel
    -        ChannelFuture future = channel.write(requests);
    -        future.addListener(new ChannelFutureListener() {
    -            public void operationComplete(ChannelFuture future)
    -                    throws Exception {
    -                if (!future.isSuccess()) {
    -                    LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause());
    -                    reconnect();
    -                } else {
    -                    LOG.debug("{} request(s) sent", requests.size());
    -
    -                    //Now that our requests have been sent, channel could be closed if needed
    -                    if (being_closed.get())
    -                        close_n_release();
    -                }
    +        if (null != messageBatch && !messageBatch.isEmpty()) {
    +            if (channel.isWritable()) {
    +                flushCheckTimer.set(Long.MAX_VALUE);
    +                
    +                // Flush as fast as we can to reduce the latency
    +                MessageBatch toBeFlushed = messageBatch;
    +                messageBatch = null;
    +                flushRequest(channel, toBeFlushed, blocking);
    +                
    +            } else {
    +                // when channel is NOT writable, it means the internal netty buffer is full. 
    +                // In this case, we can try to buffer up more incoming messages.
    +                flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval);
                 }
    -        });
    +        }
    +
         }
     
    -    /**
    -     * Take all enqueued messages from queue
    -     * @return  batch of messages
    -     * @throws InterruptedException
    -     *
    -     * synchronized ... ensure that messages are delivered in the same order
    -     * as they are added into queue
    -     */
    -    private MessageBatch tryTakeMessages() throws InterruptedException {
    -        //1st message
    -        Object msg = message_queue.poll();
    -        if (msg == null) return null;
    -
    -        MessageBatch batch = new MessageBatch(buffer_size);
    -        //we will discard any message after CLOSE
    -        if (msg == ControlMessage.CLOSE_MESSAGE) {
    -            LOG.info("Connection to {} is being closed", remote_addr);
    -            being_closed.set(true);
    -            return batch;
    +    public String name() {
    +        if (null != remote_addr) {
    +            return PREFIX + remote_addr.toString();
             }
    +        return "";
    +    }
     
    -        batch.add((TaskMessage)msg);
    -        while (!batch.isFull() && ((msg = message_queue.peek())!=null)) {
    -            //Is it a CLOSE message?
    -            if (msg == ControlMessage.CLOSE_MESSAGE) {
    -                message_queue.take();
    -                LOG.info("Connection to {} is being closed", remote_addr);
    -                being_closed.set(true);
    -                break;
    +    private synchronized void flush() {
    +        if (!closing) {
    +            if (null != messageBatch && !messageBatch.isEmpty()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                Channel channel = channelRef.get();
    +                if (channel != null) {
    +                    flushCheckTimer.set(Long.MAX_VALUE);
    +                    flushRequest(channel, toBeFlushed, true);
    +                }
    +                messageBatch = null;
    --- End diff --
    
    fixed. Change flush() to flush(Channel channel), as the caller of flush() already checked the channel to make sure it is not null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45494371
  
    @clockfly thank you.  This was a huge amount of work, and should make a big difference.  Hopefully you can continue to help improve/support storm going forward.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45072821
  
    @Gvain,
    
    There are two parts in your question,
    
    1. In your test, why throughput drops when worker number increase after reaching a value(8 in your test case)? 
      
       For this one, it is because your CPU reach limit for worker# = 8 (CPU usage: 89%), In this case, adding more workers will just adding more threads and context switch, hurting performance. While for my case, I have more powerful CPU, and allow more parallel workers.
    
    2. why there are performance difference when scaling worker# from 4 to 8, in two different  environment?
    
      I don't know the answer. But I guess it may be caused by the difference in hardware. You env is  "bonded 1Gb network card"(2Gb)  bandwith is twice mine, and CPU is 24 core, half of mine. 
    
      Suppose we can model the message transfering pipeline as three layers:
    
      netty layer(throughput somewhat impacted by NIC bandwidth) -> intermediate layer( worker intermediate receiving pipes: netty server handler -> decoding-> receiver thread ) -> task processing (througput impact by CPU).
    
      For your env, CPU is relative at shortage, effective network bandwidth is rich(effective bandwith is measured by theory_bandwidth * network_efficiency_factor), the performance is throttled by the last layer. While for my environment, CPU is rich, effective network bandwidth is not enough(due to theory_bandwidth is only half), the performance is throttled by the first two layers. 
    
      The patch mainly solved the first two layers.
    
      1. Change netty Api from async -> sync and messaging API change will improve the network_efficiency_factor, thus increasing the effective network bandwidth.
    
      2. Adding more receiver thread and optimization in netty server handler will improve the second layer throughput.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12827328
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java ---
    @@ -72,34 +106,109 @@
             Channel channel = bootstrap.bind(new InetSocketAddress(port));
             allChannels.add(channel);
         }
    +    
    +    private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) {
    +      ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
    +      
    +      for (int i = 0; i < msgs.size(); i++) {
    +        TaskMessage message = msgs.get(i);
    +        int task = message.task();
    +        
    +        if (task == -1) {
    +          closing = true;
    +          return null;
    +        }
    +        
    +        Integer queueId = getMessageQueueId(task);
    +        
    +        if (null == messageGroups[queueId]) {
    +          messageGroups[queueId] = new ArrayList<TaskMessage>();
    +        }
    +        messageGroups[queueId].add(message);
    +      }
    +      return messageGroups;
    +    }
    +    
    +    private Integer getMessageQueueId(int task) {
    +      // try to construct the map from taskId -> queueId in round robin manner.
    +      
    +      Integer queueId = taskToQueueId.get(task);
    +      if (null == queueId) {
    +        synchronized(taskToQueueId) {
    +          //assgin task to queue in round-robin manner
    +          if (null == taskToQueueId.get(task)) {
    +            queueId = roundRobinQueueId++;
    +            
    +            taskToQueueId.put(task, queueId);
    +            if (roundRobinQueueId == queueCount) {
    +              roundRobinQueueId = 0;
    +            }
    +          }
    +        }
    +      }
    +      return queueId;
    +    }
     
         /**
          * enqueue a received message 
          * @param message
          * @throws InterruptedException
          */
    -    protected void enqueue(TaskMessage message) throws InterruptedException {
    -        message_queue.put(message);
    -        LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
    -    }
    +    protected void enqueue(List<TaskMessage> msgs) throws InterruptedException {
    +      
    +      if (null == msgs || msgs.size() == 0 || closing) {
    +        return;
    +      }
    +      
    +      ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
    +      
    +      if (null == messageGroups || closing) {
    +        return;
    +      }
    +      
    +      for (int receiverId = 0; receiverId < messageGroups.length; receiverId++) {
    +        ArrayList<TaskMessage> msgGroup = messageGroups[receiverId];
    +        if (null != msgGroup) {
    +          message_queue[receiverId].put(msgGroup);
    +        }
    +      }
    +   }
         
         /**
          * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
          */
    -    public TaskMessage recv(int flags)  {
    -        if ((flags & 0x01) == 0x01) { 
    +    public Iterator<TaskMessage> recv(int flags)  {
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12362647
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java ---
    @@ -31,35 +31,65 @@
     import org.slf4j.LoggerFactory;
     
     import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
     import java.util.Map;
     import java.util.concurrent.Executors;
     import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
     
     class Server implements IConnection {
         private static final Logger LOG = LoggerFactory.getLogger(Server.class);
         @SuppressWarnings("rawtypes")
         Map storm_conf;
         int port;
    -    private LinkedBlockingQueue<TaskMessage> message_queue;
    +    private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
    --- End diff --
    
    Create multiple queues for incoming messages. The size is the number of receiver threads.
    For message which is sent to same task, it will be stored in the same queue to preserve the message order. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-42414269
  
    spout "transfered" throughput.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12827318
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -21,52 +21,53 @@
     import backtype.storm.messaging.IConnection;
     import backtype.storm.messaging.TaskMessage;
     import backtype.storm.utils.Utils;
    -
     import org.jboss.netty.bootstrap.ClientBootstrap;
     import org.jboss.netty.channel.Channel;
     import org.jboss.netty.channel.ChannelFactory;
    -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
     import org.jboss.netty.channel.ChannelFuture;
     import org.jboss.netty.channel.ChannelFutureListener;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    -
     import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
     import java.util.Map;
    -import java.util.Timer;
    -import java.util.TimerTask;
     import java.util.Random;
    -import java.util.concurrent.LinkedBlockingQueue;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.atomic.AtomicLong;
     import java.util.concurrent.atomic.AtomicReference;
     
    -class Client implements IConnection {
    +public class Client implements IConnection {
         private static final Logger LOG = LoggerFactory.getLogger(Client.class);
    -    private static final Timer TIMER = new Timer("netty-client-timer", true);
    -
    +    private static final String PREFIX = "Netty-Client-";
         private final int max_retries;
         private final long base_sleep_ms;
         private final long max_sleep_ms;
    -    private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
         private AtomicReference<Channel> channelRef;
         private final ClientBootstrap bootstrap;
    -    InetSocketAddress remote_addr;
    -    private AtomicInteger retries;
    +    private InetSocketAddress remote_addr;
    +    
         private final Random random = new Random();
         private final ChannelFactory factory;
         private final int buffer_size;
    -    private final AtomicBoolean being_closed;
    -    private boolean wait_for_requests;
    +    private boolean closing;
    +
    +    private Integer messageBatchSize;
    +    private Boolean blocking = false;
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12827338
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java ---
    @@ -41,30 +45,22 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
         
         @Override
         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    -        Object msg = e.getMessage();  
    -        if (msg == null) return;
    -
    -        //end of batch?
    -        if (msg==ControlMessage.EOB_MESSAGE) {
    -            Channel channel = ctx.getChannel();
    -            LOG.debug("Send back response ...");
    -            if (failure_count.get()==0)
    -                channel.write(ControlMessage.OK_RESPONSE);
    -            else channel.write(ControlMessage.FAILURE_RESPONSE);
    -            return;
    -        }
    -        
    -        //enqueue the received message for processing
    -        try {
    -            server.enqueue((TaskMessage)msg);
    -        } catch (InterruptedException e1) {
    -            LOG.info("failed to enqueue a request message", e);
    -            failure_count.incrementAndGet();
    -        }
    +      List<TaskMessage> msgs = (List<TaskMessage>) e.getMessage();
    +      if (msgs == null) {
    +        return;
    +      }
    +      
    +      try {
    +        server.enqueue(msgs);
    +      } catch (InterruptedException e1) {
    +        LOG.info("failed to enqueue a request message", e);
    +        failure_count.incrementAndGet();
    +      }
         }
     
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
    +        e.getCause().printStackTrace();
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45389078
  
    +1
    
    > On 06.06.2014, at 19:00, "Robert (Bobby) Evans" <no...@github.com> wrote:
    > 
    > @clockfly and everyone else I love the discussion that is happening here. I think the code is now at a point where we can merge it in (2 binding +1s), and continue the discussion and development on separate JIRAs. I tried to go through all of the comments and most of what is left is around possible improvements that are still remaining, but nothing blocking. If anyone disagrees or has issues they have seen, but not expressed yet please speak up. Otherwise I plan to merge this in later today.
    > 
    > —
    > Reply to this email directly or view it on GitHub.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45226047
  
    @miguno, there are several more observations I have
    
    1.	Network still not efficient enough
    
      We can see from the test report, after this fix, the throughput is still bottlenecked by network(CPU: 72%, network: 45%), because there are still margins in the CPU(28%). That’s weird because only 45% of theory network bandwidth is used.
    
    2.	Uneven machine message receive latency
    
      In the experiment, I noticed that there are always some machine whose message receive latency is much higher than the others. For example, tuples generated from machine A, are sent to tasks on machine B, C, D, in one run, tasks on B take more time to receive messages, in another run, D may be the slowest.
    
      My guess is that some machine has a longer netty receiver queue than the other machines, and the queue length on all machines becomes stable but not equal after some time(new input = new output) . The latency is different because queue length is different. Changing max.spout.pending won’t improve this, because it only control overall message sent from A, it doesn’t treat B, C, D differently.
    
    3. better max.spout.pending?
    I observed, after we tune max.spout.pending to a big enough value, increasing max.spout.pending will only add to latency but not throughput. When spout.pending doubles, the latency doubles.
    
      Can we do flow control adaptively so that we stops when there is no further benefit to continue increasing max.spout.pending?
    
    4.	Potential deadlock when all intermediate buffer is full
    
      Consider two worker, task1(workerA) deliver message to task3(workerB), task3 deliver to task2(workerA). There is a loop! It is possible that all worker sender/receiver buffer will be full and block. 
    
      ![vvvv](https://cloud.githubusercontent.com/assets/2595532/3188775/ba645bdc-ecbd-11e3-959b-dfb8208d4d1b.png)
    
      The current work-around in storm is tricky, it use a unbounded receiver buffer(LinkedBlockingQueue) for each worker to break the loop. But this is not good, because the receiver buffer can potentially be very long, and latency be very high.
    
    5.	Is it necessary for each task to have a dedicated send queue thread?
      Currently, each task has a dedicated send queue thread to push data to worker transfer queue. During the profiling, the task send queue thread is usually at wait state. Maybe it is a good idea to use a shared thread pool replace dedicated thread?
    
    6.	Acker workload very high.
      In the test, I spotted that the acker task is very busy. As each message size is small(100 byte), there are hugh amout of tuples need to be acked. 
    
      Can this acker cost be reduced? 
    
      For example, we can group the tuple at spout to time slice, and each time slice will share a same root tuple Id. For example, the time slice can be 100ms, and there are 10, 000 message in this slice, all share same root id, before sending to acker task, we can first XOR all acker message of same root Id locally on each worker. In that case, we may can reduce the acking network and task cost. The drawback is that when a message is lost, we need to reply all message in this slice.
    
    7.	Worker receive thread blocked by task receiver queue
    
      In worker receiver thread, it will try to publish the messages to the receive queue of each task sequentially in a blocking way. If one task receiver queue is full, the thread will block and wait.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12362464
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj ---
    @@ -204,6 +204,7 @@
             storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id)
             executor-type (executor-type worker-context component-id)
             batch-transfer->worker (disruptor/disruptor-queue
    +                                  (str "executor"  executor-id "-send-queue")
    --- End diff --
    
    give queue a name so that the thread for the queue has a reasonable name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12413170
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java ---
    @@ -301,15 +303,39 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String
         }
         
         public static Integer getInt(Object o) {
    -        if(o instanceof Long) {
    -            return ((Long) o ).intValue();
    -        } else if (o instanceof Integer) {
    -            return (Integer) o;
    -        } else if (o instanceof Short) {
    -            return ((Short) o).intValue();
    -        } else {
    -            throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
    -        }
    +      Integer result = getInt(o, null);
    +      if (null == result) {
    +        throw new IllegalArgumentException("Don't know how to convert null + to int");
    +      }
    +      return result;
    +    }
    +    
    +    public static Integer getInt(Object o, Integer defaultValue) {
    +      if (null == o) {
    +        return defaultValue;
    +      }
    +      
    +      if(o instanceof Long) {
    --- End diff --
    
    Negative, Number has subtype of Float, Double, we want to throw for these two types


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12810544
  
    --- Diff: conf/defaults.yaml ---
    @@ -109,6 +112,15 @@ storm.messaging.netty.max_retries: 30
     storm.messaging.netty.max_wait_ms: 1000
     storm.messaging.netty.min_wait_ms: 100
     
    +# If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency.
    +storm.messaging.netty.transfer.batch.size: 262144
    +
    +# If storm.messaging.netty.blocking is set to true, the Netty Client will send messages in synchronized way, otherwise it will do it in async way. Set storm.messaging.netty.blocking to false to improve the latency and throughput.
    --- End diff --
    
    If this always improves the latency and throughput why have this as a config option at all?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by Gvain <gi...@git.apache.org>.
Github user Gvain commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-44635246
  
    Hi, clockfly
    
    1) tps here only consists of spout emitted messages, NOT including acker messages or any other messages.
    3) the network usage is only SINGLE machine network usage, and only the IN direction bytes counts. And I am using two bonded 1Gb network card.
    4) max.spout.pending is 1000.
    
    As for 2), i didn't really stats message latency. All i did is compared the emitted count by spouts and the received count by bolts, they are quite catch up with each other  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12845167
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java ---
    @@ -373,6 +399,25 @@ public static CuratorFramework newCuratorStarted(Map conf, List<String> servers,
             ret.start();
             return ret;
         }
    +    
    +    public static void redirectStreamAsync(Process process) {
    +      redirectStreamAsync(process.getInputStream(), System.out);
    --- End diff --
    
    Fixed, this part code is removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12362656
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java ---
    @@ -31,35 +31,65 @@
     import org.slf4j.LoggerFactory;
     
     import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
     import java.util.Map;
     import java.util.concurrent.Executors;
     import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
     
     class Server implements IConnection {
         private static final Logger LOG = LoggerFactory.getLogger(Server.class);
         @SuppressWarnings("rawtypes")
         Map storm_conf;
         int port;
    -    private LinkedBlockingQueue<TaskMessage> message_queue;
    +    private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
         volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
         final ChannelFactory factory;
         final ServerBootstrap bootstrap;
    -
    +    
    +    private int queueCount;
    +    HashMap<Integer, Integer> taskToQueueId = null;
    +    int roundRobinQueueId;
    +	
    +    boolean closing = false;
    +    List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
    +    
    +    
         @SuppressWarnings("rawtypes")
         Server(Map storm_conf, int port) {
             this.storm_conf = storm_conf;
             this.port = port;
    -        message_queue = new LinkedBlockingQueue<TaskMessage>();
    -
    +        
    --- End diff --
    
    try to construct the map from taskId -> queueId in round robin manner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-42499434
  
    | (e.g. serialization.reserve.tuple.createTime)?
    This changes a lot of files. I will do this in a followup pull request, to make the code change easier to review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45491833
  
    +1 for me too. The tests passed 3 times in a row, and the changes look good. I'll merge this into trunk now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12395524
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java ---
    @@ -373,6 +399,25 @@ public static CuratorFramework newCuratorStarted(Map conf, List<String> servers,
             ret.start();
             return ret;
         }
    +    
    +    public static void redirectStreamAsync(Process process) {
    +      redirectStreamAsync(process.getInputStream(), System.out);
    --- End diff --
    
    I don't see this code referenced from anywhere else.
    
    I assume this is an attempt at solving the issue where STDOUT can fill buffers and cause workers to hang (e.g. when GC logging is turned on without being directed to a file)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12410406
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java ---
    @@ -373,6 +399,25 @@ public static CuratorFramework newCuratorStarted(Map conf, List<String> servers,
             ret.start();
             return ret;
         }
    +    
    +    public static void redirectStreamAsync(Process process) {
    +      redirectStreamAsync(process.getInputStream(), System.out);
    --- End diff --
    
    Hi Taylor,
    
    Yes, you are right. it is used to avoid the worker to hang due to stdout full. The original demo patch is very big, so I have to break it down to several patches. The code which use  this function will appear in followup pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by Gvain <gi...@git.apache.org>.
Github user Gvain commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43639415
  
    "So in common practice, each worker will have a moderate size of executors, neither too small, nor too big."
    I agree on this. But what size is considered to be too big or too small. Should 36 be too big? Only a few executors failed to heartbeat to nimbus, the whole worker will reload.
    
    Besides, By using a SHARED threadpool(its default size is 1) among all netty client within a worker, the netty threads number do not increase as total worker numbers increase. Check [jira][storm-12]. So, increasing worker count may not cause netty context switching problem.
    
    "3. More outbound acker message count. Usually we will allocate one acker to one worker."
    But you allocate 48 ackers to only 4 workers.
      


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43553213
  
    I did a quick pass through the code.  I have some concerns about the blocking nature of a number of the calls, especially connect.  This just feels like it is going to make the worker block until all other workers are up and the connections have been established.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12393972
  
    --- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
    @@ -84,8 +84,27 @@
          */
         public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads"; 
         public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = Number.class;
    +    
    +    /**
    +     * If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes
    +     */
    +    public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "netty.transfer.batch.size";
    +    public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = Number.class;
    --- End diff --
    
    This and all other new configuration parameters should be added to `defaults.yaml` with their default values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12362595
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java ---
    @@ -34,52 +37,78 @@
          */
         protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
             // Make sure that we have received at least a short 
    -        if (buf.readableBytes() < 2) {
    +        long available = buf.readableBytes();
    +        if (available < 2) {
                 //need more data
                 return null;
             }
     
    -        // Mark the current buffer position before reading task/len field
    -        // because the whole frame might not be in the buffer yet.
    -        // We will reset the buffer position to the marked position if
    -        // there's not enough bytes in the buffer.
    -        buf.markReaderIndex();
    -
    -        //read the short field
    -        short code = buf.readShort();
    -        
    -        //case 1: Control message
    -        ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
    -        if (ctrl_msg != null) return ctrl_msg;
    -        
    -        //case 2: task Message
    -        short task = code;
    -        
    -        // Make sure that we have received at least an integer (length) 
    -        if (buf.readableBytes() < 4) {
    -            //need more data
    -            buf.resetReaderIndex();
    -            return null;
    -        }
    +        List<Object> ret = new ArrayList<Object>();
    +
    +        while (available >= 2) {
    --- End diff --
    
    while loop, try to decode as more messages as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-44925955
  
    Thanks @miofthena,
    
    fixed at 0bca173! I should have done more test on new checkin.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12827326
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java ---
    @@ -31,35 +31,69 @@
     import org.slf4j.LoggerFactory;
     
     import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
     import java.util.Map;
     import java.util.concurrent.Executors;
     import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
     
     class Server implements IConnection {
         private static final Logger LOG = LoggerFactory.getLogger(Server.class);
         @SuppressWarnings("rawtypes")
         Map storm_conf;
         int port;
    -    private LinkedBlockingQueue<TaskMessage> message_queue;
    +    
    +    // Create multiple queues for incoming messages. The size equals the number of receiver threads.
    +    // For message which is sent to same task, it will be stored in the same queue to preserve the message order.
    +    private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
    +    
         volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
         final ChannelFactory factory;
         final ServerBootstrap bootstrap;
    -
    +    
    +    private int queueCount;
    +    HashMap<Integer, Integer> taskToQueueId = null;
    +    int roundRobinQueueId;
    +	
    +    boolean closing = false;
    +    List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
    +    
    +    
         @SuppressWarnings("rawtypes")
         Server(Map storm_conf, int port) {
             this.storm_conf = storm_conf;
             this.port = port;
    -        message_queue = new LinkedBlockingQueue<TaskMessage>();
    -
    +        
    +        queueCount = Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1);
    +        roundRobinQueueId = 0;
    +        taskToQueueId = new HashMap<Integer, Integer>();
    +    
    +        message_queue = new LinkedBlockingQueue[queueCount];
    +		    for (int i = 0; i < queueCount; i++) {
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12811853
  
    --- Diff: storm-core/src/clj/backtype/storm/disruptor.clj ---
    @@ -89,7 +90,7 @@
                     (consume-batch-when-available queue handler)
                     0 )
                   :kill-fn kill-fn
    -              :thread-name thread-name
    +              :thread-name (.getName queue)
    --- End diff --
    
    The function this is a part of takes an optional :thread-name parameter.  We should either remove that parameter or honor it.  I would vote to remove it, because the only place I think it is called you modified as well (just below).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43896389
  
    @clockfly Many thanks for your all the hard work and your patience.  It's much appreciated!
    
    I don't have anything to add to the discussion at the moment (thanks Bobby & Co.!) except that I, too, can confirm that the Storm test suite passes with Sean's latest code changes.  I tested against the latest commit in Sean's `storm_async_netty_and_batch_api` branch, which at the time of writing was https://github.com/clockfly/incubator-storm/commit/20b4f8b2195a1bf214f63e10b1bbca4690c0290f.
    
        $ git checkout master
        $ gco -b STORM-297
        $ git pull git@github.com:clockfly/incubator-storm.git storm_async_netty_and_batch_api
        $ mvn clean install
    
        >>> Success.
    
    PS: Unfortunately I haven't had the chance yet to run the patched version of Storm in a large-scale environment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43923944
  
    Thank you, @miguno!
    
    @nathanmarz, good point! I will add a test case to cover this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12845484
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -109,25 +111,30 @@
     (defn mk-transfer-fn [worker]
       (let [local-tasks (-> worker :task-ids set)
             local-transfer (:transfer-local-fn worker)
    -        ^DisruptorQueue transfer-queue (:transfer-queue worker)]
    +        ^DisruptorQueue transfer-queue (:transfer-queue worker)
    +        task->node+port (:cached-task->node+port worker)]
         (fn [^KryoTupleSerializer serializer tuple-batch]
           (let [local (ArrayList.)
    -            remote (ArrayList.)]
    +            remoteMap (HashMap.)]
             (fast-list-iter [[task tuple :as pair] tuple-batch]
               (if (local-tasks task)
                 (.add local pair)
    -            (.add remote pair)
    -            ))
    +            (let [node+port (get @task->node+port task)]
    +              (when (not (.get remoteMap node+port))
    +                (.put remoteMap node+port (ArrayList.)))
    +              (let [remote (.get remoteMap node+port)]
    +                (.add remote (TaskMessage. task (.serialize serializer tuple)))
    +                 ))))
    --- End diff --
    
    OK, if you could add a quick comment about it, that would help prevent someone like me trying to "clean it up" in the future and slowing it down. Something like
    ```;;Using java objects directly to avoid performance issues in java code```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r13488844
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -128,133 +185,114 @@ private long getSleepTimeMs()
         }
     
         /**
    -     * Enqueue a task message to be sent to server
    +     * Enqueue task messages to be sent to server
          */
    -    public void send(int task, byte[] message) {
    -        //throw exception if the client is being closed
    -        if (being_closed.get()) {
    +    synchronized public void send(Iterator<TaskMessage> msgs) {
    +
    +        // throw exception if the client is being closed
    +        if (closing) {
                 throw new RuntimeException("Client is being closed, and does not take requests any more");
             }
    -
    -        try {
    -            message_queue.put(new TaskMessage(task, message));
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException(e);
    +        
    +        if (null == msgs || !msgs.hasNext()) {
    +            return;
             }
    -    }
    -
    -    /**
    -     * Retrieve messages from queue, and delivery to server if any
    -     */
    -    synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
    -        //just skip if delivery only if waiting, and we are not waiting currently
    -        if (only_if_waiting && !wait_for_requests)  return;
     
    -        //make sure that channel was not closed
             Channel channel = channelRef.get();
    -        if (channel == null)  return;
    -        if (!channel.isOpen()) {
    -            LOG.info("Channel to {} is no longer open.",remote_addr);
    -            //The channel is not open yet. Reconnect?
    -            reconnect();
    -            return;
    +        if (null == channel) {
    +            connect();
    +            channel = channelRef.get();
             }
     
    -        final MessageBatch requests = tryTakeMessages();
    -        if (requests==null) {
    -            wait_for_requests = true;
    -            return;
    -        }
    +        while (msgs.hasNext()) {
    +            TaskMessage message = msgs.next();
    +            if (null == messageBatch) {
    +                messageBatch = new MessageBatch(messageBatchSize);
    +            }
     
    -        //if channel is being closed and we have no outstanding messages,  let's close the channel
    -        if (requests.isEmpty() && being_closed.get()) {
    -            close_n_release();
    -            return;
    +            messageBatch.add(message);
    +            if (messageBatch.isFull()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                flushRequest(channel, toBeFlushed);
    --- End diff --
    
    Yes, that is what storm-329 want to do. But it may require more thoughts, because introducing another blocking buffer have performance penalty, and may cause deadlock. 
    
    I think for your unacked topology, the more reasonable method is to introduce a flow control mechanism. Let's have more disucssion in the thread of STORM-329. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12394305
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java ---
    @@ -301,15 +303,39 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String
         }
         
         public static Integer getInt(Object o) {
    -        if(o instanceof Long) {
    -            return ((Long) o ).intValue();
    -        } else if (o instanceof Integer) {
    -            return (Integer) o;
    -        } else if (o instanceof Short) {
    -            return ((Short) o).intValue();
    -        } else {
    -            throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
    -        }
    +      Integer result = getInt(o, null);
    +      if (null == result) {
    +        throw new IllegalArgumentException("Don't know how to convert null + to int");
    +      }
    +      return result;
    +    }
    +    
    +    public static Integer getInt(Object o, Integer defaultValue) {
    +      if (null == o) {
    +        return defaultValue;
    +      }
    +      
    +      if(o instanceof Long) {
    --- End diff --
    
    Very minor point, but this could probably be tightened to:
    
    ```java
    if (o instanceof Number) {
         return ((Number) o).intValue();
    } else {
         throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45583294
  
    Thanks, @clockfly!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12827316
  
    --- Diff: storm-core/src/clj/backtype/storm/disruptor.clj ---
    @@ -89,7 +90,7 @@
                     (consume-batch-when-available queue handler)
                     0 )
                   :kill-fn kill-fn
    -              :thread-name thread-name
    +              :thread-name (.getName queue)
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43634626
  
    Bobby,
    
    Your suggestion makes sense, let's do this in a follow up jira!
    
    Sean
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by Gvain <gi...@git.apache.org>.
Github user Gvain commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43621117
  
    Just a hint:
    What if we use MORE workers per node than just only ONE worker per node without changing the total number of executors ? By doing so, we will have MORE received threads, transfer threads and netty i/o threads for the total 144 executors. Should this increase total CPU usage and network bandwidth usage? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12813477
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java ---
    @@ -31,35 +31,69 @@
     import org.slf4j.LoggerFactory;
     
     import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
     import java.util.Map;
     import java.util.concurrent.Executors;
     import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
     
     class Server implements IConnection {
         private static final Logger LOG = LoggerFactory.getLogger(Server.class);
         @SuppressWarnings("rawtypes")
         Map storm_conf;
         int port;
    -    private LinkedBlockingQueue<TaskMessage> message_queue;
    +    
    +    // Create multiple queues for incoming messages. The size equals the number of receiver threads.
    +    // For message which is sent to same task, it will be stored in the same queue to preserve the message order.
    +    private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
    +    
         volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
         final ChannelFactory factory;
         final ServerBootstrap bootstrap;
    -
    +    
    +    private int queueCount;
    +    HashMap<Integer, Integer> taskToQueueId = null;
    +    int roundRobinQueueId;
    +	
    +    boolean closing = false;
    +    List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
    +    
    +    
         @SuppressWarnings("rawtypes")
         Server(Map storm_conf, int port) {
             this.storm_conf = storm_conf;
             this.port = port;
    -        message_queue = new LinkedBlockingQueue<TaskMessage>();
    -
    +        
    +        queueCount = Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1);
    +        roundRobinQueueId = 0;
    +        taskToQueueId = new HashMap<Integer, Integer>();
    +    
    +        message_queue = new LinkedBlockingQueue[queueCount];
    +		    for (int i = 0; i < queueCount; i++) {
    --- End diff --
    
    Indentation looks off here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by Gvain <gi...@git.apache.org>.
Github user Gvain commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-44372134
  
    @clockfly 
    
    I used latest storm to reproduce some of your test. I found that adding a few more workers will increase throughput, as well as CPU usage. I also get a max doubled throughput.
    
    Im my test, i used 4 nodes(24 cores each, not power enough than yours), 48 spouts, 48 bolts, 48 ackers, and 4 to 64 workers, SOL benchmark, message size is 100 Bytes. Here is the results:
    
    workers |     Throughput |     CPU usage |     NET usage (only IN Bytes)
    ---------------------------------------------------------------------
     4          |     320,000 tps |    56%             |    14MB/s
     8          |     656,000 tps |    89%             |    28MB/s
     16        |     560,000 tps |    92%             |    26MB/s
     32        |     353,000 tps |    90%             |    20MB/s
     64        |     208,000 tps |    90%             |    16MB/s
    
    Using 8 workers, the throughput doubled. 
    
    As we already discussed:
    (a) Increasing worker count, not too much,  may not cause netty context switching problem. As using a SHARED threadpool(its default size is 1) among all netty client within a worker, the netty threads number do not increase as total worker numbers increase. Check https://github.com/apache/incubator-storm/pull/57
    
    (b) Increasing worker count will increase netty threads. Using more netty threads working in sync and non-batch mode may have some what the same effect with using less netty threads working in async and batch mode which is your way.
    
    From the test result, it seems like the above point of view (b)  is true. 
    So, maybe, what we need to to is just pulling the https://github.com/apache/incubator-storm/pull/57 and adding a few workers.
    
    But i am still curious about if i adding more nodes, should this still work ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12813729
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java ---
    @@ -133,4 +242,12 @@ public synchronized void close() {
         public void send(int task, byte[] message) {
             throw new RuntimeException("Server connection should not send any messages");
         }
    +    
    +    public void send(Iterator<TaskMessage> msgs) {
    +      throw new RuntimeException("Server connection should not send any messages");
    +    }
    +	
    +	 public String name() {
    --- End diff --
    
    Indentation appears to be off here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by Gvain <gi...@git.apache.org>.
Github user Gvain commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45056355
  
    @clockfly 
    
    Thanks for your detailed test. But notice that there is a big difference between our test results. 
    In my test, only using 8 workers, i get the throughput peak which is 656K tps, and adding more workers the throughput drops down. 
    In your test, adding more worker helps increasing throughput, and the throughput peak reached 622K tps when using 24 workers. The worker number is greatly increased and thus the total threads number.
    
    I am wondering why this difference exists. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12845868
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -109,25 +111,30 @@
     (defn mk-transfer-fn [worker]
       (let [local-tasks (-> worker :task-ids set)
             local-transfer (:transfer-local-fn worker)
    -        ^DisruptorQueue transfer-queue (:transfer-queue worker)]
    +        ^DisruptorQueue transfer-queue (:transfer-queue worker)
    +        task->node+port (:cached-task->node+port worker)]
         (fn [^KryoTupleSerializer serializer tuple-batch]
           (let [local (ArrayList.)
    -            remote (ArrayList.)]
    +            remoteMap (HashMap.)]
             (fast-list-iter [[task tuple :as pair] tuple-batch]
               (if (local-tasks task)
                 (.add local pair)
    -            (.add remote pair)
    -            ))
    +            (let [node+port (get @task->node+port task)]
    +              (when (not (.get remoteMap node+port))
    +                (.put remoteMap node+port (ArrayList.)))
    +              (let [remote (.get remoteMap node+port)]
    +                (.add remote (TaskMessage. task (.serialize serializer tuple)))
    +                 ))))
    --- End diff --
    
    fixed, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12812348
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -21,52 +21,53 @@
     import backtype.storm.messaging.IConnection;
     import backtype.storm.messaging.TaskMessage;
     import backtype.storm.utils.Utils;
    -
     import org.jboss.netty.bootstrap.ClientBootstrap;
     import org.jboss.netty.channel.Channel;
     import org.jboss.netty.channel.ChannelFactory;
    -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
     import org.jboss.netty.channel.ChannelFuture;
     import org.jboss.netty.channel.ChannelFutureListener;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    -
     import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
     import java.util.Map;
    -import java.util.Timer;
    -import java.util.TimerTask;
     import java.util.Random;
    -import java.util.concurrent.LinkedBlockingQueue;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.atomic.AtomicLong;
     import java.util.concurrent.atomic.AtomicReference;
     
    -class Client implements IConnection {
    +public class Client implements IConnection {
         private static final Logger LOG = LoggerFactory.getLogger(Client.class);
    -    private static final Timer TIMER = new Timer("netty-client-timer", true);
    -
    +    private static final String PREFIX = "Netty-Client-";
         private final int max_retries;
         private final long base_sleep_ms;
         private final long max_sleep_ms;
    -    private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
         private AtomicReference<Channel> channelRef;
         private final ClientBootstrap bootstrap;
    -    InetSocketAddress remote_addr;
    -    private AtomicInteger retries;
    +    private InetSocketAddress remote_addr;
    +    
         private final Random random = new Random();
         private final ChannelFactory factory;
         private final int buffer_size;
    -    private final AtomicBoolean being_closed;
    -    private boolean wait_for_requests;
    +    private boolean closing;
    +
    +    private Integer messageBatchSize;
    +    private Boolean blocking = false;
    --- End diff --
    
    Can we make these to an int and a boolean they should never be null and I would rather have it blow up sooner rather then later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by nathanmarz <gi...@git.apache.org>.
Github user nathanmarz commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45447183
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12398727
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -84,43 +93,87 @@
     
             // Start the connection attempt.
             remote_addr = new InetSocketAddress(host, port);
    -        bootstrap.connect(remote_addr);
    +        
    +        Thread flushChecker = new Thread(new Runnable() {
    +            @Override
    +            public void run() {
    +                //make sure we have a connection
    +                connect();
    +                
    +                while(!closing) {
    +                    long flushCheckTime = flushCheckTimer.get();
    +                    long now = System.currentTimeMillis();
    +                    if (now > flushCheckTime) {
    +                        Channel channel = channelRef.get();
    +                        if (null != channel && channel.isWritable()) {
    +                            flush();
    +                        }
    +                    }
    +                    try {
    +                        Thread.sleep(flushCheckInterval);
    +                    } catch (InterruptedException e) {
    +                        break;
    +                    }
    +                }
    +                
    +            }
    +        });
    +        
    +        flushChecker.setDaemon(true);
    --- End diff --
    
    Do we want to name this thread as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43656390
  
    Gvain,
    
    > Besides, By using a SHARED threadpool(its default size is 1) among all netty client within a worker,  the netty threads number do not increase as total worker numbers increase. Check [jira][storm-12]. So, increasing worker count may not cause netty context switching problem.
    
    Context switch here means netty threads from different worker processes of same machine will compete with each other. 
    
    > "3. More outbound acker message count. Usually we will allocate one acker to one worker."
    But you allocate 48 ackers to only 4 workers.
    
    Usually one acker one worker will suffice. But for the perfomance benchmarking case, acker becomes a bottleneck, because the message count is huge.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12813461
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java ---
    @@ -31,35 +31,69 @@
     import org.slf4j.LoggerFactory;
     
     import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
     import java.util.Map;
     import java.util.concurrent.Executors;
     import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
     
     class Server implements IConnection {
         private static final Logger LOG = LoggerFactory.getLogger(Server.class);
         @SuppressWarnings("rawtypes")
         Map storm_conf;
         int port;
    -    private LinkedBlockingQueue<TaskMessage> message_queue;
    +    
    +    // Create multiple queues for incoming messages. The size equals the number of receiver threads.
    +    // For message which is sent to same task, it will be stored in the same queue to preserve the message order.
    +    private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
    +    
         volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
         final ChannelFactory factory;
         final ServerBootstrap bootstrap;
    -
    +    
    +    private int queueCount;
    +    HashMap<Integer, Integer> taskToQueueId = null;
    +    int roundRobinQueueId;
    +	
    +    boolean closing = false;
    +    List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
    +    
    +    
         @SuppressWarnings("rawtypes")
         Server(Map storm_conf, int port) {
             this.storm_conf = storm_conf;
             this.port = port;
    -        message_queue = new LinkedBlockingQueue<TaskMessage>();
    -
    +        
    +        queueCount = Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1);
    --- End diff --
    
    This should be a part of Config.java.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by Gvain <gi...@git.apache.org>.
Github user Gvain commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45050542
  
    @clockfly , Just let you know that I keep the topology.executor.send.buffer.size and the topology.executor.receive.buffer.size unchanged which are both 1024. And the netty.server/client.worker_threads are both set to 1, and storm.messaging.netty.buffer_size is set to 5242880
    
    I think this discuss will help us understand the issue much better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r13480579
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -128,133 +185,114 @@ private long getSleepTimeMs()
         }
     
         /**
    -     * Enqueue a task message to be sent to server
    +     * Enqueue task messages to be sent to server
          */
    -    public void send(int task, byte[] message) {
    -        //throw exception if the client is being closed
    -        if (being_closed.get()) {
    +    synchronized public void send(Iterator<TaskMessage> msgs) {
    +
    +        // throw exception if the client is being closed
    +        if (closing) {
                 throw new RuntimeException("Client is being closed, and does not take requests any more");
             }
    -
    -        try {
    -            message_queue.put(new TaskMessage(task, message));
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException(e);
    +        
    +        if (null == msgs || !msgs.hasNext()) {
    +            return;
             }
    -    }
    -
    -    /**
    -     * Retrieve messages from queue, and delivery to server if any
    -     */
    -    synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
    -        //just skip if delivery only if waiting, and we are not waiting currently
    -        if (only_if_waiting && !wait_for_requests)  return;
     
    -        //make sure that channel was not closed
             Channel channel = channelRef.get();
    -        if (channel == null)  return;
    -        if (!channel.isOpen()) {
    -            LOG.info("Channel to {} is no longer open.",remote_addr);
    -            //The channel is not open yet. Reconnect?
    -            reconnect();
    -            return;
    +        if (null == channel) {
    +            connect();
    +            channel = channelRef.get();
             }
     
    -        final MessageBatch requests = tryTakeMessages();
    -        if (requests==null) {
    -            wait_for_requests = true;
    -            return;
    -        }
    +        while (msgs.hasNext()) {
    +            TaskMessage message = msgs.next();
    +            if (null == messageBatch) {
    +                messageBatch = new MessageBatch(messageBatchSize);
    +            }
     
    -        //if channel is being closed and we have no outstanding messages,  let's close the channel
    -        if (requests.isEmpty() && being_closed.get()) {
    -            close_n_release();
    -            return;
    +            messageBatch.add(message);
    +            if (messageBatch.isFull()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                flushRequest(channel, toBeFlushed);
    --- End diff --
    
    @Gvain,
    
    Unacked topology with no flow control is dangerous! 
    
    There are two concepts, flow control and error control. You don't want acker for its performance penalty, that is no error control. But you do need flow control.
    
    But the fact is that current storm flow control method "max.spout.pending" depends on error control channel "acker", that is the dilemma. Maybe we should seperate "max.spout.pending" from error control channel "acker" and only serves for flow control.
    
    One work-around is that when you emit a message with SpoutOutputCollector, you emit a id with a sampling rate, for example 1%.
    
    collector.emit(tuple, messageId), 
    
    only 1% tuple has messageId, while 99% don't have.
    
    In this case, you only have 1% acker traffic, reducing the performance penalty of acker, while still have some basic flow control.
    
    Hope this helps!
    
    
     
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12813204
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -128,133 +181,105 @@ private long getSleepTimeMs()
         }
     
         /**
    -     * Enqueue a task message to be sent to server
    +     * Enqueue task messages to be sent to server
          */
    -    public void send(int task, byte[] message) {
    -        //throw exception if the client is being closed
    -        if (being_closed.get()) {
    +    synchronized public void send(Iterator<TaskMessage> msgs) {
    +
    +        // throw exception if the client is being closed
    +        if (closing) {
                 throw new RuntimeException("Client is being closed, and does not take requests any more");
             }
    -
    -        try {
    -            message_queue.put(new TaskMessage(task, message));
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException(e);
    +        
    +        if (null == msgs || !msgs.hasNext()) {
    +            return;
             }
    -    }
     
    -    /**
    -     * Retrieve messages from queue, and delivery to server if any
    -     */
    -    synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
    -        //just skip if delivery only if waiting, and we are not waiting currently
    -        if (only_if_waiting && !wait_for_requests)  return;
    -
    -        //make sure that channel was not closed
             Channel channel = channelRef.get();
    -        if (channel == null)  return;
    -        if (!channel.isOpen()) {
    -            LOG.info("Channel to {} is no longer open.",remote_addr);
    -            //The channel is not open yet. Reconnect?
    -            reconnect();
    -            return;
    +        if (null == channel) {
    +            connect();
    +            channel = channelRef.get();
             }
     
    -        final MessageBatch requests = tryTakeMessages();
    -        if (requests==null) {
    -            wait_for_requests = true;
    -            return;
    -        }
    +        while (msgs.hasNext()) {
    +            TaskMessage message = msgs.next();
    +            if (null == messageBatch) {
    +                messageBatch = new MessageBatch(messageBatchSize);
    +            }
     
    -        //if channel is being closed and we have no outstanding messages,  let's close the channel
    -        if (requests.isEmpty() && being_closed.get()) {
    -            close_n_release();
    -            return;
    +            messageBatch.add(message);
    +            if (messageBatch.isFull()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                flushRequest(channel, toBeFlushed, blocking);
    +                messageBatch = null;
    +            }
             }
     
    -        //we are busily delivering messages, and will check queue upon response.
    -        //When send() is called by senders, we should not thus call tryDeliverMessages().
    -        wait_for_requests = false;
    -
    -        //write request into socket channel
    -        ChannelFuture future = channel.write(requests);
    -        future.addListener(new ChannelFutureListener() {
    -            public void operationComplete(ChannelFuture future)
    -                    throws Exception {
    -                if (!future.isSuccess()) {
    -                    LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause());
    -                    reconnect();
    -                } else {
    -                    LOG.debug("{} request(s) sent", requests.size());
    -
    -                    //Now that our requests have been sent, channel could be closed if needed
    -                    if (being_closed.get())
    -                        close_n_release();
    -                }
    +        if (null != messageBatch && !messageBatch.isEmpty()) {
    +            if (channel.isWritable()) {
    +                flushCheckTimer.set(Long.MAX_VALUE);
    +                
    +                // Flush as fast as we can to reduce the latency
    +                MessageBatch toBeFlushed = messageBatch;
    +                messageBatch = null;
    +                flushRequest(channel, toBeFlushed, blocking);
    +                
    +            } else {
    +                // when channel is NOT writable, it means the internal netty buffer is full. 
    +                // In this case, we can try to buffer up more incoming messages.
    +                flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval);
                 }
    -        });
    +        }
    +
         }
     
    -    /**
    -     * Take all enqueued messages from queue
    -     * @return  batch of messages
    -     * @throws InterruptedException
    -     *
    -     * synchronized ... ensure that messages are delivered in the same order
    -     * as they are added into queue
    -     */
    -    private MessageBatch tryTakeMessages() throws InterruptedException {
    -        //1st message
    -        Object msg = message_queue.poll();
    -        if (msg == null) return null;
    -
    -        MessageBatch batch = new MessageBatch(buffer_size);
    -        //we will discard any message after CLOSE
    -        if (msg == ControlMessage.CLOSE_MESSAGE) {
    -            LOG.info("Connection to {} is being closed", remote_addr);
    -            being_closed.set(true);
    -            return batch;
    +    public String name() {
    +        if (null != remote_addr) {
    +            return PREFIX + remote_addr.toString();
             }
    +        return "";
    +    }
     
    -        batch.add((TaskMessage)msg);
    -        while (!batch.isFull() && ((msg = message_queue.peek())!=null)) {
    -            //Is it a CLOSE message?
    -            if (msg == ControlMessage.CLOSE_MESSAGE) {
    -                message_queue.take();
    -                LOG.info("Connection to {} is being closed", remote_addr);
    -                being_closed.set(true);
    -                break;
    +    private synchronized void flush() {
    +        if (!closing) {
    +            if (null != messageBatch && !messageBatch.isEmpty()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                Channel channel = channelRef.get();
    +                if (channel != null) {
    +                    flushCheckTimer.set(Long.MAX_VALUE);
    +                    flushRequest(channel, toBeFlushed, true);
    +                }
    +                messageBatch = null;
                 }
    -
    -            //try to add this msg into batch
    -            if (!batch.tryAdd((TaskMessage) msg))
    -                break;
    -
    -            //remove this message
    -            message_queue.take();
             }
    -
    -        return batch;
         }
    -
    +    
         /**
          * gracefully close this client.
    -     *
    -     * We will send all existing requests, and then invoke close_n_release() method
    +     * 
    +     * We will send all existing requests, and then invoke close_n_release()
    +     * method
          */
    -    public void close() {
    -        //enqueue a CLOSE message so that shutdown() will be invoked
    -        try {
    -            message_queue.put(ControlMessage.CLOSE_MESSAGE);
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            LOG.info("Interrupted Connection to {} is being closed", remote_addr);
    -            being_closed.set(true);
    +    public synchronized void close() {
    +        if (!closing) {
    +            closing = true;
    +            if (null != messageBatch && !messageBatch.isEmpty()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                Channel channel = channelRef.get();
    +                if (channel != null) {
    +                    flushRequest(channel, toBeFlushed, true);
    +                }
    +                messageBatch = null;
    +            }
    +        
    +            //wait for pendings to exit
    +            while(pendings.get() != 0) {
    --- End diff --
    
    Can we have a max number of iterations on this? I just feel it is more defensive to try for a while, and then if it does not work out log the message and go on.  After all the worst thing that happens with leaving early is that some messages may not have been acked, which we more or less ignore anyways.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12362613
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -128,133 +181,103 @@ private long getSleepTimeMs()
         }
     
         /**
    -     * Enqueue a task message to be sent to server
    +     * Enqueue task messages to be sent to server
          */
    -    public void send(int task, byte[] message) {
    -        //throw exception if the client is being closed
    -        if (being_closed.get()) {
    +    synchronized public void send(Iterator<TaskMessage> msgs) {
    +
    +        // throw exception if the client is being closed
    +        if (closing) {
                 throw new RuntimeException("Client is being closed, and does not take requests any more");
             }
    -
    -        try {
    -            message_queue.put(new TaskMessage(task, message));
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException(e);
    +        
    +        if (null == msgs || !msgs.hasNext()) {
    +            return;
             }
    -    }
     
    -    /**
    -     * Retrieve messages from queue, and delivery to server if any
    -     */
    -    synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
    -        //just skip if delivery only if waiting, and we are not waiting currently
    -        if (only_if_waiting && !wait_for_requests)  return;
    -
    -        //make sure that channel was not closed
             Channel channel = channelRef.get();
    -        if (channel == null)  return;
    -        if (!channel.isOpen()) {
    -            LOG.info("Channel to {} is no longer open.",remote_addr);
    -            //The channel is not open yet. Reconnect?
    -            reconnect();
    -            return;
    +        if (null == channel) {
    +            connect();
    +            channel = channelRef.get();
             }
     
    -        final MessageBatch requests = tryTakeMessages();
    -        if (requests==null) {
    -            wait_for_requests = true;
    -            return;
    -        }
    +        while (msgs.hasNext()) {
    +            TaskMessage message = msgs.next();
    +            if (null == messageBatch) {
    +                messageBatch = new MessageBatch(messageBatchSize);
    +            }
     
    -        //if channel is being closed and we have no outstanding messages,  let's close the channel
    -        if (requests.isEmpty() && being_closed.get()) {
    -            close_n_release();
    -            return;
    +            messageBatch.add(message);
    +            if (messageBatch.isFull()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                flushRequest(channel, toBeFlushed, blocking);
    +                messageBatch = null;
    +            }
             }
     
    -        //we are busily delivering messages, and will check queue upon response.
    -        //When send() is called by senders, we should not thus call tryDeliverMessages().
    -        wait_for_requests = false;
    -
    -        //write request into socket channel
    -        ChannelFuture future = channel.write(requests);
    -        future.addListener(new ChannelFutureListener() {
    -            public void operationComplete(ChannelFuture future)
    -                    throws Exception {
    -                if (!future.isSuccess()) {
    -                    LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause());
    -                    reconnect();
    -                } else {
    -                    LOG.debug("{} request(s) sent", requests.size());
    -
    -                    //Now that our requests have been sent, channel could be closed if needed
    -                    if (being_closed.get())
    -                        close_n_release();
    -                }
    +        if (null != messageBatch && !messageBatch.isEmpty()) {
    +            if (channel.isWritable()) {
    --- End diff --
    
    when channel is NOT writable, it means the internal netty buffer is full. In this case, we can try to buffer up more incoming messages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by Gvain <gi...@git.apache.org>.
Github user Gvain commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-42416918
  
    Really impressive. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12812760
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -84,43 +93,87 @@
     
             // Start the connection attempt.
             remote_addr = new InetSocketAddress(host, port);
    -        bootstrap.connect(remote_addr);
    +        
    +        Thread flushChecker = new Thread(new Runnable() {
    +            @Override
    +            public void run() {
    +                //make sure we have a connection
    +                connect();
    +                
    +                while(!closing) {
    +                    long flushCheckTime = flushCheckTimer.get();
    +                    long now = System.currentTimeMillis();
    +                    if (now > flushCheckTime) {
    +                        Channel channel = channelRef.get();
    +                        if (null != channel && channel.isWritable()) {
    +                            flush();
    +                        }
    +                    }
    +                    try {
    +                        Thread.sleep(flushCheckInterval);
    +                    } catch (InterruptedException e) {
    +                        break;
    +                    }
    +                }
    +                
    +            }
    +        }, name() + "-flush-checker");
    +        
    +        flushChecker.setDaemon(true);
    +        flushChecker.start();
         }
     
         /**
          * We will retry connection with exponential back-off policy
          */
    -    void reconnect() {
    -        close_n_release();
    -
    -        //reconnect only if it's not being closed
    -        if (being_closed.get()) return;
    -
    -        final int tried_count = retries.incrementAndGet();
    -        if (tried_count <= max_retries) {
    -            long sleep = getSleepTimeMs();
    -            LOG.info("Waiting {} ms before trying connection to {}", sleep, remote_addr);
    -            TIMER.schedule(new TimerTask() {
    -                @Override
    -                public void run() { 
    -                    LOG.info("Reconnect ... [{}] to {}", tried_count, remote_addr);
    -                    bootstrap.connect(remote_addr);
    -                }}, sleep);
    -        } else {
    -            LOG.warn(remote_addr+" is not reachable. We will close this client.");
    -            close();
    +    private synchronized void connect() {
    --- End diff --
    
    I don't like the idea of having connect block until the connection is established.  Any thread that tries to send data to a connection that is still being established will block until the connection is established. I think it is more robust to buffer the messages in a data structure and try to handle them later when the connection is finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43588816
  
    Hi Bobby,
    
    Thank you for your comments. I addressed most of the issues, except these three:
    
    **issue1:**
    > 
    ```
    +    private synchronized void connect() {
    ```
    > I don't like the idea of having connect block until the connection is established. Any thread that tries to send data to a connection that is still being established will block until the connection is established. I think it is more robust to buffer the messages in a data structure and try to handle them later when the connection is finished.
    
    **reply:** I think it is better to have connection estabilish before we allow message sender to send message. Here is the my obervations when allowing message sender to send message without a established connection:
    
    1. Excessive memory usage
    If  message senders are allow to send message without the connetion being established, the senders are encourged to send as fast as they can. In some profiling, I observed there was sharp increase of heap memory in the beginning, as we will buffer all those message in a unlimited queue in netty client. If the user are using unacked topology or set the topology.max.spout.pending to be a bigger enough value, it will possibly cause the JVM OOM.
    
    2. Longer latency
    The Netty Client queue can turns out to be very long, longer queue means longer latency. For example, suppose Netty Client can at max transfer 10 tuples/second, the Netty Client queue increased to size of 10000 because of this buffering, the spout generates 10 tuple/second, then in this case the queue size will stablize at 10000, the throughput will be 10 tuples/second. The throughput will be the same no matter the queue size is 100, or 1000, or 10000, the latency will be much bigger for queue size 10000. So it is very important to make sure the queue will not increase to too big from the begining.
    
    3. Reduced throughput.
    When the latency is longer, it will reduce the message generation speed of spout, as spout  will wait message to be acked(unacked size controled by topology.max.spout.pending). The longer the initial latency is, the longer it takes for the spout to converge to the balanced speed of tuple generation.  
      
    In the code, we will setup the connection in Client constructor asyncly before the send(message) is called to reduce the time that need to be waited by the message sender.
    ```
            Thread flushChecker = new Thread(new Runnable() {
                @Override
                public void run() {
                    //make sure we have a connection
                    connect();  //<-----------------here!
    ```
    
    **issue 2:**
    >
    ```
    storm-core/src/jvm/backtype/storm/messaging/netty/Client
    +        Thread flushChecker = new Thread(new Runnable() {
    ```
    > Can we make this thread shared between the clients, otherwise we will have a dedicated thread per client, which can cause resource utilization issues, hitting a ulimit with the number of processes allowed per user.
    
    **Reply:** Can we do this in a followup patch? I have a local patch, but it requires more testing.
    
    **issue 3:** 
    >
    ```
    +            (let [node+port (get @task->node+port task)]
    +              (when (not (.get remoteMap node+port))
    +                (.put remoteMap node+port (ArrayList.)))
    +              (let [remote (.get remoteMap node+port)]
    +                (.add remote (TaskMessage. task (.serialize serializer tuple)))
    +                 ))))
    ```
    > The above code does not really feel like it is clojure, as it is updating mutable state. I would rather have see us do something like a group-by.
    
    **Reply:** This is on purpose for performance. 
    The ArrayList and HashMap constructed here will be used directly in java. We designed a customized iterator inside class TransferDrainer, so that we don't need to wrap/unwrap data or copy data from clojure data strucutre to java data structure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12845322
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java ---
    @@ -301,15 +303,39 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String
         }
         
         public static Integer getInt(Object o) {
    -        if(o instanceof Long) {
    -            return ((Long) o ).intValue();
    -        } else if (o instanceof Integer) {
    -            return (Integer) o;
    -        } else if (o instanceof Short) {
    -            return ((Short) o).intValue();
    -        } else {
    -            throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
    -        }
    +      Integer result = getInt(o, null);
    +      if (null == result) {
    +        throw new IllegalArgumentException("Don't know how to convert null + to int");
    +      }
    +      return result;
    +    }
    +    
    +    public static Integer getInt(Object o, Integer defaultValue) {
    +      if (null == o) {
    +        return defaultValue;
    +      }
    +      
    +      if(o instanceof Long) {
    --- End diff --
    
    You are correct, I filed
    
    https://issues.apache.org/jira/browse/STORM-328
    
    To address the issue across the project.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by Gvain <gi...@git.apache.org>.
Github user Gvain commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-44629767
  
    @clockfly , @revans2 
    
    I added more nodes, from 4 nodes to 15 nodes, as well as spouts, bolts, ackers and workers, and keep their parallelism count in the same ratio, eg, for every other 4 nodes, add 48 spouts, 48 bolts, 48 ackers and 8 workers. Here is the test result:
    
    nodes | workers | Throughput | CPU usage | NET usage (only IN Bytes)
    4   | 8   | 656,000 tps    | 89% | 28MB/s
    8   | 16 | 1,004,000 tps | 82% | 28MB/s
    12 | 24 | 1,133,000 tps | 72% | 25MB/s
    15 | 30 | 1,235,000 tps | 69% | 24MB/s
    
    for last two rows, the CPU usage decreased, and the throughput increasing speed slows down. I added a few more worker to re-run the test, the result is:
    
    nodes | workers | Throughput | CPU usage | NET usage (only IN Bytes)
    12 | 48 | 1,444,000 tps | 88% | 30MB/s
    15 | 30 | 1,735,000 tps | 88% | 30MB/s
    
    From the result, We can see adding some more workers DO help to scale up performance as adding more CPUs. 
    
    Finally, maybe we should re-considerate the root cause of this issue "STORM-297 Storm Performance cannot be scaled up by adding more CPU cores"
     


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by Gvain <gi...@git.apache.org>.
Github user Gvain commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r13477759
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -128,133 +185,114 @@ private long getSleepTimeMs()
         }
     
         /**
    -     * Enqueue a task message to be sent to server
    +     * Enqueue task messages to be sent to server
          */
    -    public void send(int task, byte[] message) {
    -        //throw exception if the client is being closed
    -        if (being_closed.get()) {
    +    synchronized public void send(Iterator<TaskMessage> msgs) {
    +
    +        // throw exception if the client is being closed
    +        if (closing) {
                 throw new RuntimeException("Client is being closed, and does not take requests any more");
             }
    -
    -        try {
    -            message_queue.put(new TaskMessage(task, message));
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException(e);
    +        
    +        if (null == msgs || !msgs.hasNext()) {
    +            return;
             }
    -    }
    -
    -    /**
    -     * Retrieve messages from queue, and delivery to server if any
    -     */
    -    synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
    -        //just skip if delivery only if waiting, and we are not waiting currently
    -        if (only_if_waiting && !wait_for_requests)  return;
     
    -        //make sure that channel was not closed
             Channel channel = channelRef.get();
    -        if (channel == null)  return;
    -        if (!channel.isOpen()) {
    -            LOG.info("Channel to {} is no longer open.",remote_addr);
    -            //The channel is not open yet. Reconnect?
    -            reconnect();
    -            return;
    +        if (null == channel) {
    +            connect();
    +            channel = channelRef.get();
             }
     
    -        final MessageBatch requests = tryTakeMessages();
    -        if (requests==null) {
    -            wait_for_requests = true;
    -            return;
    -        }
    +        while (msgs.hasNext()) {
    +            TaskMessage message = msgs.next();
    +            if (null == messageBatch) {
    +                messageBatch = new MessageBatch(messageBatchSize);
    +            }
     
    -        //if channel is being closed and we have no outstanding messages,  let's close the channel
    -        if (requests.isEmpty() && being_closed.get()) {
    -            close_n_release();
    -            return;
    +            messageBatch.add(message);
    +            if (messageBatch.isFull()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                flushRequest(channel, toBeFlushed);
    --- End diff --
    
    @clockfly 
    
    Thanks for your remind. 
    For an unacked topology. Before your patch, the OOM problem do exists, and I filed it at https://issues.apache.org/jira/browse/STORM-339
    
    After your patch, this OOM problem seems still exists just as what you said and filed up at jira https://issues.apache.org/jira/browse/STORM-329. Now, we are stuck on this as we are using unacked topology under heavy throughput, so shall we make this issue's priority higher and fix it in time ?   


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by Gvain <gi...@git.apache.org>.
Github user Gvain commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r13485764
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -128,133 +185,114 @@ private long getSleepTimeMs()
         }
     
         /**
    -     * Enqueue a task message to be sent to server
    +     * Enqueue task messages to be sent to server
          */
    -    public void send(int task, byte[] message) {
    -        //throw exception if the client is being closed
    -        if (being_closed.get()) {
    +    synchronized public void send(Iterator<TaskMessage> msgs) {
    +
    +        // throw exception if the client is being closed
    +        if (closing) {
                 throw new RuntimeException("Client is being closed, and does not take requests any more");
             }
    -
    -        try {
    -            message_queue.put(new TaskMessage(task, message));
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException(e);
    +        
    +        if (null == msgs || !msgs.hasNext()) {
    +            return;
             }
    -    }
    -
    -    /**
    -     * Retrieve messages from queue, and delivery to server if any
    -     */
    -    synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
    -        //just skip if delivery only if waiting, and we are not waiting currently
    -        if (only_if_waiting && !wait_for_requests)  return;
     
    -        //make sure that channel was not closed
             Channel channel = channelRef.get();
    -        if (channel == null)  return;
    -        if (!channel.isOpen()) {
    -            LOG.info("Channel to {} is no longer open.",remote_addr);
    -            //The channel is not open yet. Reconnect?
    -            reconnect();
    -            return;
    +        if (null == channel) {
    +            connect();
    +            channel = channelRef.get();
             }
     
    -        final MessageBatch requests = tryTakeMessages();
    -        if (requests==null) {
    -            wait_for_requests = true;
    -            return;
    -        }
    +        while (msgs.hasNext()) {
    +            TaskMessage message = msgs.next();
    +            if (null == messageBatch) {
    +                messageBatch = new MessageBatch(messageBatchSize);
    +            }
     
    -        //if channel is being closed and we have no outstanding messages,  let's close the channel
    -        if (requests.isEmpty() && being_closed.get()) {
    -            close_n_release();
    -            return;
    +            messageBatch.add(message);
    +            if (messageBatch.isFull()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                flushRequest(channel, toBeFlushed);
    --- End diff --
    
    @clockfly
    
    Shall we just use a limited size of message queue in netty-client, if the queue is full, we can block or drop until channel has flushed away some pending messages. The strategy can be configurable just as https://issues.apache.org/jira/browse/STORM-329 described.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-42634710
  
    Strange, the mail list notification not working.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by Gvain <gi...@git.apache.org>.
Github user Gvain commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45091025
  
    @clockfly 
    
    Thanks for your explanation. The explanation for part 2 seems not so convincing as the CPU and network are not saturated at both env when scaling worker# from 4 to 8. Oops, I don't wanna go any further on this. 
    
    Anyway, your work is really great.
    
    Another thing. Since you use netty to send message in a async way, how can you ensure that  all messages emitted from a fast component are received by a slow component without any loss especially under heavy throughput without any ackers enabled ? To my knowledge, the netty channel may not be writable, and pendings count will increase.       


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12814221
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java ---
    @@ -301,15 +303,39 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String
         }
         
         public static Integer getInt(Object o) {
    -        if(o instanceof Long) {
    -            return ((Long) o ).intValue();
    -        } else if (o instanceof Integer) {
    -            return (Integer) o;
    -        } else if (o instanceof Short) {
    -            return ((Short) o).intValue();
    -        } else {
    -            throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
    -        }
    +      Integer result = getInt(o, null);
    +      if (null == result) {
    +        throw new IllegalArgumentException("Don't know how to convert null + to int");
    +      }
    +      return result;
    +    }
    +    
    +    public static Integer getInt(Object o, Integer defaultValue) {
    +      if (null == o) {
    +        return defaultValue;
    +      }
    +      
    +      if(o instanceof Long) {
    --- End diff --
    
    Why?  If I type in 3.5 for an integer config and I get a 3 out of it, is it a problem?  If so then we need to change Config.java to not just expect a Number for these things, but instead have a proper checker that knows this should be an Integer, and checks ranges etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r13443448
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -128,133 +185,114 @@ private long getSleepTimeMs()
         }
     
         /**
    -     * Enqueue a task message to be sent to server
    +     * Enqueue task messages to be sent to server
          */
    -    public void send(int task, byte[] message) {
    -        //throw exception if the client is being closed
    -        if (being_closed.get()) {
    +    synchronized public void send(Iterator<TaskMessage> msgs) {
    +
    +        // throw exception if the client is being closed
    +        if (closing) {
                 throw new RuntimeException("Client is being closed, and does not take requests any more");
             }
    -
    -        try {
    -            message_queue.put(new TaskMessage(task, message));
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException(e);
    +        
    +        if (null == msgs || !msgs.hasNext()) {
    +            return;
             }
    -    }
    -
    -    /**
    -     * Retrieve messages from queue, and delivery to server if any
    -     */
    -    synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
    -        //just skip if delivery only if waiting, and we are not waiting currently
    -        if (only_if_waiting && !wait_for_requests)  return;
     
    -        //make sure that channel was not closed
             Channel channel = channelRef.get();
    -        if (channel == null)  return;
    -        if (!channel.isOpen()) {
    -            LOG.info("Channel to {} is no longer open.",remote_addr);
    -            //The channel is not open yet. Reconnect?
    -            reconnect();
    -            return;
    +        if (null == channel) {
    +            connect();
    +            channel = channelRef.get();
             }
     
    -        final MessageBatch requests = tryTakeMessages();
    -        if (requests==null) {
    -            wait_for_requests = true;
    -            return;
    -        }
    +        while (msgs.hasNext()) {
    +            TaskMessage message = msgs.next();
    +            if (null == messageBatch) {
    +                messageBatch = new MessageBatch(messageBatchSize);
    +            }
     
    -        //if channel is being closed and we have no outstanding messages,  let's close the channel
    -        if (requests.isEmpty() && being_closed.get()) {
    -            close_n_release();
    -            return;
    +            messageBatch.add(message);
    +            if (messageBatch.isFull()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                flushRequest(channel, toBeFlushed);
    --- End diff --
    
    @Gvain,
    
    For an unacked topology, we can either block, drop, or store. before this patch, storm will always store, so there are potential OOM like you described. this patch steps a bit forward, if the downstream machine is down, it will block the upstream from sending thus avoiding OOM. 
    
    But for the case you mentioned, the downsteam is slow, but not down, it is possible that there will be more message pending at the netty client side or server side. If the network is OK, then it will pend on the server side, if the network is overwhelmed, then the message will be pending at the netty client side. 
    
    An example is that the spout crazyly generate really big message, but the bandwidth is not enough, it will lead to OOM on the spout worker JVM, and nothing can stop that.
    
    Bobby's comment mentioned a similar situation https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986, 
    
    We should allow user to config to drop message in this case, thus avoiding OOM,  this bug is filed at followup jira https://issues.apache.org/jira/browse/STORM-329 after discussion with Bobby.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-storm/pull/103


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by nathanmarz <gi...@git.apache.org>.
Github user nathanmarz commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45418252
  
    -1 Tests should never, ever rely on timing in order to pass. This is the whole reason for doing time simulation in the first place, so that when functionality depends on time it can be properly tested without having to worry about random delays messing up the tests. 
    
    complete-topology is inherently reliant on detecting topology completion based on the spout saying all its tuples are "complete". If you're testing topologies that don't do full tuple acking, then you should be testing using the "tracked topologies" utilities in backtype.storm.testing.clj
    
    For example, here is how the acking system is tested using tracked topologies: https://github.com/apache/incubator-storm/blob/master/storm-core/test/clj/backtype/storm/integration_test.clj#L213
    
    The "tracked-wait" function is the key which will only return when both that many tuples have been emitted by the spouts AND the topology is idle (no tuples have been emitted nor will be emitted without further input) You shouldn't use tracked-topologies for topologies that have tick tuples, but that shouldn't be a problem in this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43716283
  
    > Do you mean that allocating one worker per node is better than several workers per node as the netty threads from different worker process will compete with each other ?
    
    It depends. Executor is the basic unit of parallism, suppose executor number is the same, adding a worker will add more traffic, but if it is co-located with other worker, it may also increase the bandwith of intra-worker communication(as we do intra-process communication instead of intra-machine).
    
    For example, 1, 1, 1 (means 3 machine, one worker on each) may be faster than co-located 4, 4, 4
    But co-located 3, 0, 0 may be faster than distributed 1, 1, 1. This requires experiment to verify it is better or worse.
    
    >And i think using several netty threads working in sync and non-batch mode may have some what the same effect with using only one netty threads working in async and batch mode. Maybe i should test this out. By the way, what storm version do you use in the test ?
    
    Yes, sync-async can only give us pointers about what could be the possible bottleneck, these need continous profile-tune experiments to prove our guess. By the way, I use storm-0.9 release version. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12813854
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java ---
    @@ -41,30 +45,22 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
         
         @Override
         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    -        Object msg = e.getMessage();  
    -        if (msg == null) return;
    -
    -        //end of batch?
    -        if (msg==ControlMessage.EOB_MESSAGE) {
    -            Channel channel = ctx.getChannel();
    -            LOG.debug("Send back response ...");
    -            if (failure_count.get()==0)
    -                channel.write(ControlMessage.OK_RESPONSE);
    -            else channel.write(ControlMessage.FAILURE_RESPONSE);
    -            return;
    -        }
    -        
    -        //enqueue the received message for processing
    -        try {
    -            server.enqueue((TaskMessage)msg);
    -        } catch (InterruptedException e1) {
    -            LOG.info("failed to enqueue a request message", e);
    -            failure_count.incrementAndGet();
    -        }
    +      List<TaskMessage> msgs = (List<TaskMessage>) e.getMessage();
    +      if (msgs == null) {
    +        return;
    +      }
    +      
    +      try {
    +        server.enqueue(msgs);
    +      } catch (InterruptedException e1) {
    +        LOG.info("failed to enqueue a request message", e);
    +        failure_count.incrementAndGet();
    +      }
         }
     
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
    +        e.getCause().printStackTrace();
    --- End diff --
    
    Can we log this properly instead of just printing the stack trace to stderr?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12362548
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -109,25 +111,30 @@
     (defn mk-transfer-fn [worker]
       (let [local-tasks (-> worker :task-ids set)
             local-transfer (:transfer-local-fn worker)
    -        ^DisruptorQueue transfer-queue (:transfer-queue worker)]
    +        ^DisruptorQueue transfer-queue (:transfer-queue worker)
    +        task->node+port (:cached-task->node+port worker)]
         (fn [^KryoTupleSerializer serializer tuple-batch]
           (let [local (ArrayList.)
    -            remote (ArrayList.)]
    +            remoteMap (HashMap.)]
             (fast-list-iter [[task tuple :as pair] tuple-batch]
               (if (local-tasks task)
                 (.add local pair)
    -            (.add remote pair)
    -            ))
    +            (let [node+port (get @task->node+port task)]
    --- End diff --
    
    Move the message grouping(group by node+port) code from worker transfer thread to executor send thread, as we only have ONE worker transfer thread, and it can become the bottleneck.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12362591
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Context.java ---
    @@ -47,12 +48,14 @@ public void prepare(Map storm_conf) {
     
             //each context will have a single client channel factory
             int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
    --- End diff --
    
    give netty thread a name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43791269
  
    Yes getting the best performance of a topology really depends on the resources that your topology is using.  If your topology is CPU bound then you want to spread it out so that you have enough cores to handle the parallelism, but if your topology is I/O bound you want to collocate them as much as possible.  The best performance optimization is simply to stop doing something.  So if you can cut out the serialization/deserialization and sending tuples to another process, even over the loopback device, then that potentially becomes a big win.
    
    The really difficult part is that parts of your topology may be CPU bound, other parts may be I/O bound, and other parts may be constrained by memory (which has it's own limitations).  Also you may have a different definition of "best".  Some users may require a very low latency, and are willing to let most of the cluster sit idle so that they know when something happens they can process it very quickly.  Other times you are willing to sacrifice latency to be sure that everything you want to run fits on a smaller set of hardware.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43625555
  
    **issue 2:**
    
    >storm-core/src/jvm/backtype/storm/messaging/netty/Client
    >```
    +        Thread flushChecker = new Thread(new Runnable() {
    ```
    > Can we make this thread shared between the clients, otherwise we will have a dedicated thread per client, which can cause resource utilization issues, hitting a ulimit with the number of processes allowed per user.
    
    ~~**Reply**: Can we do this in a followup patch? I have a local patch, but it requires more testing.~~
    **Update**: This is resolved in latest checkin,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12827309
  
    --- Diff: conf/defaults.yaml ---
    @@ -109,6 +112,15 @@ storm.messaging.netty.max_retries: 30
     storm.messaging.netty.max_wait_ms: 1000
     storm.messaging.netty.min_wait_ms: 100
     
    +# If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency.
    +storm.messaging.netty.transfer.batch.size: 262144
    +
    +# If storm.messaging.netty.blocking is set to true, the Netty Client will send messages in synchronized way, otherwise it will do it in async way. Set storm.messaging.netty.blocking to false to improve the latency and throughput.
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12813684
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java ---
    @@ -72,34 +106,109 @@
             Channel channel = bootstrap.bind(new InetSocketAddress(port));
             allChannels.add(channel);
         }
    +    
    +    private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) {
    +      ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
    +      
    +      for (int i = 0; i < msgs.size(); i++) {
    +        TaskMessage message = msgs.get(i);
    +        int task = message.task();
    +        
    +        if (task == -1) {
    +          closing = true;
    +          return null;
    +        }
    +        
    +        Integer queueId = getMessageQueueId(task);
    +        
    +        if (null == messageGroups[queueId]) {
    +          messageGroups[queueId] = new ArrayList<TaskMessage>();
    +        }
    +        messageGroups[queueId].add(message);
    +      }
    +      return messageGroups;
    +    }
    +    
    +    private Integer getMessageQueueId(int task) {
    +      // try to construct the map from taskId -> queueId in round robin manner.
    +      
    +      Integer queueId = taskToQueueId.get(task);
    +      if (null == queueId) {
    +        synchronized(taskToQueueId) {
    +          //assgin task to queue in round-robin manner
    +          if (null == taskToQueueId.get(task)) {
    +            queueId = roundRobinQueueId++;
    +            
    +            taskToQueueId.put(task, queueId);
    +            if (roundRobinQueueId == queueCount) {
    +              roundRobinQueueId = 0;
    +            }
    +          }
    +        }
    +      }
    +      return queueId;
    +    }
     
         /**
          * enqueue a received message 
          * @param message
          * @throws InterruptedException
          */
    -    protected void enqueue(TaskMessage message) throws InterruptedException {
    -        message_queue.put(message);
    -        LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
    -    }
    +    protected void enqueue(List<TaskMessage> msgs) throws InterruptedException {
    +      
    +      if (null == msgs || msgs.size() == 0 || closing) {
    +        return;
    +      }
    +      
    +      ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
    +      
    +      if (null == messageGroups || closing) {
    +        return;
    +      }
    +      
    +      for (int receiverId = 0; receiverId < messageGroups.length; receiverId++) {
    +        ArrayList<TaskMessage> msgGroup = messageGroups[receiverId];
    +        if (null != msgGroup) {
    +          message_queue[receiverId].put(msgGroup);
    +        }
    +      }
    +   }
         
         /**
          * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
          */
    -    public TaskMessage recv(int flags)  {
    -        if ((flags & 0x01) == 0x01) { 
    +    public Iterator<TaskMessage> recv(int flags)  {
    --- End diff --
    
    Is there a reason we still have this API here? it seems like we don't actually want anyone to call this code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by nathanmarz <gi...@git.apache.org>.
Github user nathanmarz commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43917540
  
    I would like to see a test added that tests that messages are received between tasks in the same order they are sent. The test is likely to be probabilistic, in that if the code is wrong it won't always fail, but that's ok. This is a really important property to maintain that this patch, or future modifications on this code, could affect. So it needs better testing. The test should have many workers with many send/receive threads. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43627624
  
    Gvain,
    
    >What if we use MORE workers per node than just only ONE worker per node without changing the total number of executors ? By doing so, we will have MORE received threads, transfer threads and netty i/o threads for the total 144 executors. Should this increase total CPU usage and network bandwidth usage?
    
    We tried this approach before, but it won't give us the performance data we want. There were inherit bottleneck there. Besides, receiver.count is not the biggest bottleneck here, netty performance matter more.
    
    The logic behind make "receiver.count" configurable is that, since we allow user to configure spout/bolt executor count per worker, we should also allow user to configure the receiver thread count to make it consistent with the parallism settings.
    
    Besides, increasing worker count will, 
    1. add more cross process or cross machine communication. For tasks inside same worker process,  the message will be **local** dispatched to target task if possible.
    2. More Netty context switch and contention. Check http://yahooeng.tumblr.com/post/64758709722/making-storm-fly-with-netty
    3. More outbound acker message count. Usually we will allocate one acker to one worker.
    
    So in common practice, each worker will have a moderate size of executors, neither too small, nor too big.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12811601
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -109,25 +111,30 @@
     (defn mk-transfer-fn [worker]
       (let [local-tasks (-> worker :task-ids set)
             local-transfer (:transfer-local-fn worker)
    -        ^DisruptorQueue transfer-queue (:transfer-queue worker)]
    +        ^DisruptorQueue transfer-queue (:transfer-queue worker)
    +        task->node+port (:cached-task->node+port worker)]
         (fn [^KryoTupleSerializer serializer tuple-batch]
           (let [local (ArrayList.)
    -            remote (ArrayList.)]
    +            remoteMap (HashMap.)]
             (fast-list-iter [[task tuple :as pair] tuple-batch]
               (if (local-tasks task)
                 (.add local pair)
    -            (.add remote pair)
    -            ))
    +            (let [node+port (get @task->node+port task)]
    +              (when (not (.get remoteMap node+port))
    +                (.put remoteMap node+port (ArrayList.)))
    +              (let [remote (.get remoteMap node+port)]
    +                (.add remote (TaskMessage. task (.serialize serializer tuple)))
    +                 ))))
    --- End diff --
    
    The above code does not really feel like it is clojure, as it is updating mutable state.  I would rather have see us do something like a group-by.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12827274
  
    --- Diff: storm-core/src/clj/backtype/storm/disruptor.clj ---
    @@ -89,7 +90,7 @@
                     (consume-batch-when-available queue handler)
                     0 )
                   :kill-fn kill-fn
    -              :thread-name thread-name
    +              :thread-name (.getName queue)
    --- End diff --
    
    fixed with your suggestion, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12827334
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java ---
    @@ -18,70 +18,24 @@
     package backtype.storm.messaging.netty;
     
     import java.net.ConnectException;
    -import java.util.concurrent.atomic.AtomicBoolean;
     
    -import org.jboss.netty.channel.Channel;
    -import org.jboss.netty.channel.ChannelHandlerContext;
    -import org.jboss.netty.channel.ChannelStateEvent;
    -import org.jboss.netty.channel.ExceptionEvent;
    -import org.jboss.netty.channel.MessageEvent;
    -import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    +import org.jboss.netty.channel.*;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.net.ConnectException;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -
     public class StormClientHandler extends SimpleChannelUpstreamHandler  {
         private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
         private Client client;
    -    long start_time;
         
         StormClientHandler(Client client) {
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45053277
  
    Test case for message delivery order is added at 426d143


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45360004
  
    @clockfly and everyone else I love the discussion that is happening here.  I think the code is now at a point where we can merge it in (2 binding +1s), and continue the discussion and development on separate JIRAs.   I tried to go through all of the comments and most of what is left is around possible improvements that are still remaining, but nothing blocking.  If anyone disagrees or has issues they have seen, but not expressed yet please speak up.  Otherwise I plan to merge this in later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45419642
  
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by Gvain <gi...@git.apache.org>.
Github user Gvain commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-44642174
  
    Hi, clockfly
    
    You can reproduce my test by checking out the modified storm-perf-test here 
    https://github.com/Gvain/storm-perf-test/tree/spout-throughput
    and the storm here
    https://github.com/Gvain/incubator-storm, which is merely a copy from the original apache/incubator-storm several days ago.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12827330
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java ---
    @@ -133,4 +242,12 @@ public synchronized void close() {
         public void send(int task, byte[] message) {
             throw new RuntimeException("Server connection should not send any messages");
         }
    +    
    +    public void send(Iterator<TaskMessage> msgs) {
    +      throw new RuntimeException("Server connection should not send any messages");
    +    }
    +	
    +	 public String name() {
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12827323
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -128,133 +181,105 @@ private long getSleepTimeMs()
         }
     
         /**
    -     * Enqueue a task message to be sent to server
    +     * Enqueue task messages to be sent to server
          */
    -    public void send(int task, byte[] message) {
    -        //throw exception if the client is being closed
    -        if (being_closed.get()) {
    +    synchronized public void send(Iterator<TaskMessage> msgs) {
    +
    +        // throw exception if the client is being closed
    +        if (closing) {
                 throw new RuntimeException("Client is being closed, and does not take requests any more");
             }
    -
    -        try {
    -            message_queue.put(new TaskMessage(task, message));
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException(e);
    +        
    +        if (null == msgs || !msgs.hasNext()) {
    +            return;
             }
    -    }
     
    -    /**
    -     * Retrieve messages from queue, and delivery to server if any
    -     */
    -    synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
    -        //just skip if delivery only if waiting, and we are not waiting currently
    -        if (only_if_waiting && !wait_for_requests)  return;
    -
    -        //make sure that channel was not closed
             Channel channel = channelRef.get();
    -        if (channel == null)  return;
    -        if (!channel.isOpen()) {
    -            LOG.info("Channel to {} is no longer open.",remote_addr);
    -            //The channel is not open yet. Reconnect?
    -            reconnect();
    -            return;
    +        if (null == channel) {
    +            connect();
    +            channel = channelRef.get();
             }
     
    -        final MessageBatch requests = tryTakeMessages();
    -        if (requests==null) {
    -            wait_for_requests = true;
    -            return;
    -        }
    +        while (msgs.hasNext()) {
    +            TaskMessage message = msgs.next();
    +            if (null == messageBatch) {
    +                messageBatch = new MessageBatch(messageBatchSize);
    +            }
     
    -        //if channel is being closed and we have no outstanding messages,  let's close the channel
    -        if (requests.isEmpty() && being_closed.get()) {
    -            close_n_release();
    -            return;
    +            messageBatch.add(message);
    +            if (messageBatch.isFull()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                flushRequest(channel, toBeFlushed, blocking);
    +                messageBatch = null;
    +            }
             }
     
    -        //we are busily delivering messages, and will check queue upon response.
    -        //When send() is called by senders, we should not thus call tryDeliverMessages().
    -        wait_for_requests = false;
    -
    -        //write request into socket channel
    -        ChannelFuture future = channel.write(requests);
    -        future.addListener(new ChannelFutureListener() {
    -            public void operationComplete(ChannelFuture future)
    -                    throws Exception {
    -                if (!future.isSuccess()) {
    -                    LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause());
    -                    reconnect();
    -                } else {
    -                    LOG.debug("{} request(s) sent", requests.size());
    -
    -                    //Now that our requests have been sent, channel could be closed if needed
    -                    if (being_closed.get())
    -                        close_n_release();
    -                }
    +        if (null != messageBatch && !messageBatch.isEmpty()) {
    +            if (channel.isWritable()) {
    +                flushCheckTimer.set(Long.MAX_VALUE);
    +                
    +                // Flush as fast as we can to reduce the latency
    +                MessageBatch toBeFlushed = messageBatch;
    +                messageBatch = null;
    +                flushRequest(channel, toBeFlushed, blocking);
    +                
    +            } else {
    +                // when channel is NOT writable, it means the internal netty buffer is full. 
    +                // In this case, we can try to buffer up more incoming messages.
    +                flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval);
                 }
    -        });
    +        }
    +
         }
     
    -    /**
    -     * Take all enqueued messages from queue
    -     * @return  batch of messages
    -     * @throws InterruptedException
    -     *
    -     * synchronized ... ensure that messages are delivered in the same order
    -     * as they are added into queue
    -     */
    -    private MessageBatch tryTakeMessages() throws InterruptedException {
    -        //1st message
    -        Object msg = message_queue.poll();
    -        if (msg == null) return null;
    -
    -        MessageBatch batch = new MessageBatch(buffer_size);
    -        //we will discard any message after CLOSE
    -        if (msg == ControlMessage.CLOSE_MESSAGE) {
    -            LOG.info("Connection to {} is being closed", remote_addr);
    -            being_closed.set(true);
    -            return batch;
    +    public String name() {
    +        if (null != remote_addr) {
    +            return PREFIX + remote_addr.toString();
             }
    +        return "";
    +    }
     
    -        batch.add((TaskMessage)msg);
    -        while (!batch.isFull() && ((msg = message_queue.peek())!=null)) {
    -            //Is it a CLOSE message?
    -            if (msg == ControlMessage.CLOSE_MESSAGE) {
    -                message_queue.take();
    -                LOG.info("Connection to {} is being closed", remote_addr);
    -                being_closed.set(true);
    -                break;
    +    private synchronized void flush() {
    +        if (!closing) {
    +            if (null != messageBatch && !messageBatch.isEmpty()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                Channel channel = channelRef.get();
    +                if (channel != null) {
    +                    flushCheckTimer.set(Long.MAX_VALUE);
    +                    flushRequest(channel, toBeFlushed, true);
    +                }
    +                messageBatch = null;
                 }
    -
    -            //try to add this msg into batch
    -            if (!batch.tryAdd((TaskMessage) msg))
    -                break;
    -
    -            //remove this message
    -            message_queue.take();
             }
    -
    -        return batch;
         }
    -
    +    
         /**
          * gracefully close this client.
    -     *
    -     * We will send all existing requests, and then invoke close_n_release() method
    +     * 
    +     * We will send all existing requests, and then invoke close_n_release()
    +     * method
          */
    -    public void close() {
    -        //enqueue a CLOSE message so that shutdown() will be invoked
    -        try {
    -            message_queue.put(ControlMessage.CLOSE_MESSAGE);
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            LOG.info("Interrupted Connection to {} is being closed", remote_addr);
    -            being_closed.set(true);
    +    public synchronized void close() {
    +        if (!closing) {
    +            closing = true;
    +            if (null != messageBatch && !messageBatch.isEmpty()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                Channel channel = channelRef.get();
    +                if (channel != null) {
    +                    flushRequest(channel, toBeFlushed, true);
    +                }
    +                messageBatch = null;
    +            }
    +        
    +            //wait for pendings to exit
    +            while(pendings.get() != 0) {
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by nathanmarz <gi...@git.apache.org>.
Github user nathanmarz commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45419054
  
    Additionally, besides having the potential to fail randomly inserting sleeps into test code also slows down the tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43703379
  
    Thanks.
    
    @miguno, @ptgoetz, @nathanmarz, do you have further comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-45414150
  
    OK I'll try to look into it on my side then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12812919
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -128,133 +181,105 @@ private long getSleepTimeMs()
         }
     
         /**
    -     * Enqueue a task message to be sent to server
    +     * Enqueue task messages to be sent to server
          */
    -    public void send(int task, byte[] message) {
    -        //throw exception if the client is being closed
    -        if (being_closed.get()) {
    +    synchronized public void send(Iterator<TaskMessage> msgs) {
    +
    +        // throw exception if the client is being closed
    +        if (closing) {
                 throw new RuntimeException("Client is being closed, and does not take requests any more");
             }
    -
    -        try {
    -            message_queue.put(new TaskMessage(task, message));
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException(e);
    +        
    +        if (null == msgs || !msgs.hasNext()) {
    +            return;
             }
    -    }
     
    -    /**
    -     * Retrieve messages from queue, and delivery to server if any
    -     */
    -    synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
    -        //just skip if delivery only if waiting, and we are not waiting currently
    -        if (only_if_waiting && !wait_for_requests)  return;
    -
    -        //make sure that channel was not closed
             Channel channel = channelRef.get();
    -        if (channel == null)  return;
    -        if (!channel.isOpen()) {
    -            LOG.info("Channel to {} is no longer open.",remote_addr);
    -            //The channel is not open yet. Reconnect?
    -            reconnect();
    -            return;
    +        if (null == channel) {
    +            connect();
    +            channel = channelRef.get();
             }
     
    -        final MessageBatch requests = tryTakeMessages();
    -        if (requests==null) {
    -            wait_for_requests = true;
    -            return;
    -        }
    +        while (msgs.hasNext()) {
    +            TaskMessage message = msgs.next();
    +            if (null == messageBatch) {
    +                messageBatch = new MessageBatch(messageBatchSize);
    +            }
     
    -        //if channel is being closed and we have no outstanding messages,  let's close the channel
    -        if (requests.isEmpty() && being_closed.get()) {
    -            close_n_release();
    -            return;
    +            messageBatch.add(message);
    +            if (messageBatch.isFull()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                flushRequest(channel, toBeFlushed, blocking);
    +                messageBatch = null;
    +            }
             }
     
    -        //we are busily delivering messages, and will check queue upon response.
    -        //When send() is called by senders, we should not thus call tryDeliverMessages().
    -        wait_for_requests = false;
    -
    -        //write request into socket channel
    -        ChannelFuture future = channel.write(requests);
    -        future.addListener(new ChannelFutureListener() {
    -            public void operationComplete(ChannelFuture future)
    -                    throws Exception {
    -                if (!future.isSuccess()) {
    -                    LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause());
    -                    reconnect();
    -                } else {
    -                    LOG.debug("{} request(s) sent", requests.size());
    -
    -                    //Now that our requests have been sent, channel could be closed if needed
    -                    if (being_closed.get())
    -                        close_n_release();
    -                }
    +        if (null != messageBatch && !messageBatch.isEmpty()) {
    +            if (channel.isWritable()) {
    +                flushCheckTimer.set(Long.MAX_VALUE);
    +                
    +                // Flush as fast as we can to reduce the latency
    +                MessageBatch toBeFlushed = messageBatch;
    +                messageBatch = null;
    +                flushRequest(channel, toBeFlushed, blocking);
    +                
    +            } else {
    +                // when channel is NOT writable, it means the internal netty buffer is full. 
    +                // In this case, we can try to buffer up more incoming messages.
    +                flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval);
                 }
    -        });
    +        }
    +
         }
     
    -    /**
    -     * Take all enqueued messages from queue
    -     * @return  batch of messages
    -     * @throws InterruptedException
    -     *
    -     * synchronized ... ensure that messages are delivered in the same order
    -     * as they are added into queue
    -     */
    -    private MessageBatch tryTakeMessages() throws InterruptedException {
    -        //1st message
    -        Object msg = message_queue.poll();
    -        if (msg == null) return null;
    -
    -        MessageBatch batch = new MessageBatch(buffer_size);
    -        //we will discard any message after CLOSE
    -        if (msg == ControlMessage.CLOSE_MESSAGE) {
    -            LOG.info("Connection to {} is being closed", remote_addr);
    -            being_closed.set(true);
    -            return batch;
    +    public String name() {
    +        if (null != remote_addr) {
    +            return PREFIX + remote_addr.toString();
             }
    +        return "";
    +    }
     
    -        batch.add((TaskMessage)msg);
    -        while (!batch.isFull() && ((msg = message_queue.peek())!=null)) {
    -            //Is it a CLOSE message?
    -            if (msg == ControlMessage.CLOSE_MESSAGE) {
    -                message_queue.take();
    -                LOG.info("Connection to {} is being closed", remote_addr);
    -                being_closed.set(true);
    -                break;
    +    private synchronized void flush() {
    +        if (!closing) {
    +            if (null != messageBatch && !messageBatch.isEmpty()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                Channel channel = channelRef.get();
    +                if (channel != null) {
    +                    flushCheckTimer.set(Long.MAX_VALUE);
    +                    flushRequest(channel, toBeFlushed, true);
    +                }
    +                messageBatch = null;
    --- End diff --
    
    If Channel is null do we really want to drop the messageBatch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43636831
  
    Running the unit tests I ran into some errors in backtype.storm.messaging.netty-unit-test (there were 3 failures like this one)
    
    It looks like you need to update the test to set Config.TOPOLOGY_WORKERS before creating the context.
    
    ```
    expected: nil
      actual: java.lang.RuntimeException: Fail to construct messaging plugin from plugin backtype.storm.messaging.netty.Context
     at backtype.storm.messaging.TransportFactory.makeContext (TransportFactory.java:53)
        backtype.storm.messaging.netty_unit_test/fn (netty_unit_test.clj:109)
        clojure.test$test_var$fn__6926.invoke (test.clj:701)
        clojure.test$test_var.invoke (test.clj:701)
        clojure.test$test_all_vars$fn__6930$fn__6937.invoke (test.clj:717)
        clojure.test$default_fixture.invoke (test.clj:671)
        clojure.test$test_all_vars$fn__6930.invoke (test.clj:717)
    ...
    Caused by: java.lang.IllegalArgumentException: Don&apos;t know how to convert null + to int
     at backtype.storm.utils.Utils.getInt (Utils.java:308)
        backtype.storm.messaging.netty.Context.prepare (Context.java:66)
        backtype.storm.messaging.TransportFactory.makeContext (TransportFactory.java:45)
        backtype.storm.messaging.netty_unit_test/fn (netty_unit_test.clj:109)
        clojure.test$test_var$fn__6926.invoke (test.clj:701)
        clojure.test$test_var.invoke (test.clj:701)
        clojure.test$test_all_vars$fn__6930$fn__6937.invoke (test.clj:717)
        clojure.test$default_fixture.invoke (test.clj:671)
        clojure.test$test_all_vars$fn__6930.invoke (test.clj:717)
        clojure.test$default_fixture.invoke (test.clj:671)
    ```
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12826580
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java ---
    @@ -301,15 +303,39 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String
         }
         
         public static Integer getInt(Object o) {
    -        if(o instanceof Long) {
    -            return ((Long) o ).intValue();
    -        } else if (o instanceof Integer) {
    -            return (Integer) o;
    -        } else if (o instanceof Short) {
    -            return ((Short) o).intValue();
    -        } else {
    -            throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
    -        }
    +      Integer result = getInt(o, null);
    +      if (null == result) {
    +        throw new IllegalArgumentException("Don't know how to convert null + to int");
    +      }
    +      return result;
    +    }
    +    
    +    public static Integer getInt(Object o, Integer defaultValue) {
    +      if (null == o) {
    +        return defaultValue;
    +      }
    +      
    +      if(o instanceof Long) {
    --- End diff --
    
    We are consistent with old behavior.
    If a config expect a integer, then short, int, long are valid; double, float are invalid.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-297 Storm Performance cannot b...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12827649
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -109,25 +111,30 @@
     (defn mk-transfer-fn [worker]
       (let [local-tasks (-> worker :task-ids set)
             local-transfer (:transfer-local-fn worker)
    -        ^DisruptorQueue transfer-queue (:transfer-queue worker)]
    +        ^DisruptorQueue transfer-queue (:transfer-queue worker)
    +        task->node+port (:cached-task->node+port worker)]
         (fn [^KryoTupleSerializer serializer tuple-batch]
           (let [local (ArrayList.)
    -            remote (ArrayList.)]
    +            remoteMap (HashMap.)]
             (fast-list-iter [[task tuple :as pair] tuple-batch]
               (if (local-tasks task)
                 (.add local pair)
    -            (.add remote pair)
    -            ))
    +            (let [node+port (get @task->node+port task)]
    +              (when (not (.get remoteMap node+port))
    +                (.put remoteMap node+port (ArrayList.)))
    +              (let [remote (.get remoteMap node+port)]
    +                (.add remote (TaskMessage. task (.serialize serializer tuple)))
    +                 ))))
    --- End diff --
    
    This is on purpose for performance. 
    The ArrayList and HashMap constructed here will be used directly in java. We designed a  customized iterator inside class TransferDrainer, so that we don't need to wrap/unwrap data or copy data from clojure data strucutre to java data structure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---