You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by srdo <gi...@git.apache.org> on 2016/05/15 17:29:52 UTC

[GitHub] storm pull request: STORM-1837: Fix complete-topology and prevent ...

GitHub user srdo opened a pull request:

    https://github.com/apache/storm/pull/1417

    STORM-1837: Fix complete-topology and prevent message loss when runni\u2026

    \u2026ng local clusters with no time simulation
    
    Link for convenience https://issues.apache.org/jira/browse/STORM-1837
    
    @kishorvpatil @revans2 Could you guys review this when you have time?

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/storm STORM-1837-2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1417.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1417
    
----
commit 2bf6db18da7431c4179c7f93efc7d8ab0ce45a6f
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Date:   2016-05-13T16:22:37Z

    STORM-1837: Fix complete-topology and prevent message loss when running local clusters with no time simulation

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1837: Fix complete-topology and prevent ...

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on the pull request:

    https://github.com/apache/storm/pull/1417#issuecomment-219545865
  
    The code changes look good. 
    I ran unit test and had `netty_unit_test` timeout once. I think we need to change the timeout for https://github.com/apache/storm/blob/master/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj#L115
    
    ```
    <error message="Uncaught exception, not in assertion.">Uncaught exception, not in assertion.
    expected: nil
      actual: java.lang.AssertionError: Test timed out (5000ms) (empty? (.getLoad client [(int 1) (int 2)]))
     at org.apache.storm.messaging.netty_unit_test$test_load_fn.invoke (netty_unit_test.clj:115)
        org.apache.storm.messaging.netty_unit_test/fn (netty_unit_test.clj:143)
        clojure.test$test_var$fn__7670.invoke (test.clj:704)
        clojure.test$test_var.invoke (test.clj:704)
        clojure.test$test_vars$fn__7692$fn__7697.invoke (test.clj:722)
        clojure.test$default_fixture.invoke (test.clj:674)
        clojure.test$test_vars$fn__7692.invoke (test.clj:722)
        clojure.test$default_fixture.invoke (test.clj:674)
        clojure.test$test_vars.invoke (test.clj:718)
        clojure.test$test_all_vars.invoke (test.clj:728)
        clojure.test$test_ns.invoke (test.clj:747)
        clojure.core$map$fn__4553.invoke (core.clj:2624)
        clojure.lang.LazySeq.sval (LazySeq.java:40)
        clojure.lang.LazySeq.seq (LazySeq.java:49)
        clojure.lang.Cons.next (Cons.java:39)
        clojure.lang.RT.boundedLength (RT.java:1735)
        clojure.lang.RestFn.applyTo (RestFn.java:130)
        clojure.core$apply.invoke (core.clj:632)
        clojure.test$run_tests.doInvoke (test.clj:762)
        clojure.lang.RestFn.invoke (RestFn.java:408)
        org.apache.storm.testrunner$eval8790$iter__8791__8795$fn__8796$fn__8797$fn__8798.invoke (test_runner.clj:107)
        org.apache.storm.testrunner$eval8790$iter__8791__8795$fn__8796$fn__8797.invoke (test_runner.clj:53)
        org.apache.storm.testrunner$eval8790$iter__8791__8795$fn__8796.invoke (test_runner.clj:52)
        clojure.lang.LazySeq.sval (LazySeq.java:40)
        clojure.lang.LazySeq.seq (LazySeq.java:49)
        clojure.lang.RT.seq (RT.java:507)
        clojure.core/seq (core.clj:137)
        clojure.core$dorun.invoke (core.clj:3009)
        org.apache.storm.testrunner$eval8790.invoke (test_runner.clj:52)
        clojure.lang.Compiler.eval (Compiler.java:6782)
        clojure.lang.Compiler.load (Compiler.java:7227)
        clojure.lang.Compiler.loadFile (Compiler.java:7165)
        clojure.main$load_script.invoke (main.clj:275)
        clojure.main$script_opt.invoke (main.clj:337)
        clojure.main$main.doInvoke (main.clj:421)
        clojure.lang.RestFn.invoke (RestFn.java:421)
        clojure.lang.Var.invoke (Var.java:383)
        clojure.lang.AFn.applyToHelper (AFn.java:156)
        clojure.lang.Var.applyTo (Var.java:700)
        clojure.main.main (main.java:37)
    
          at: test_runner.clj:105</error>
            </testcase>
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1417: STORM-1837: Fix complete-topology and prevent message los...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1417
  
    @kishorvpatil Please take another look at this when you have time. There was a potential NPE in the flushPending function because I forgot to check server._cb for null before dereferencing. It should be fixed now. We run a large number of tests using local clusters, and this fix seems to have made the tests much more reliable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1417: STORM-1837: Fix complete-topology and prevent message los...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1417
  
    For one of the failures it is the normal issue with maven having issues downloading dependencies.
    
    But there also was an NPE in one of the tests
    ```
    java.lang.NullPointerException
    	at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:26) ~[clojure-1.7.0.jar:?]
    	at org.apache.storm.daemon.nimbus$mk_reified_nimbus$reify__5148$iter__5203__5207$fn__5208.invoke(nimbus.clj:1997) ~[classes/:?]
    	at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
    	at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
    	at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
    	at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?]
    	at clojure.core$dorun.invoke(core.clj:3009) ~[clojure-1.7.0.jar:?]
    	at clojure.core$doall.invoke(core.clj:3025) ~[clojure-1.7.0.jar:?]
    	at org.apache.storm.daemon.nimbus$mk_reified_nimbus$reify__5148.getTopologyInfoWithOpts(nimbus.clj:1995) ~[classes/:?]
    	at org.apache.storm.daemon.nimbus$mk_reified_nimbus$reify__5148.getTopologyInfo(nimbus.clj:2034) ~[classes/:?]
    	at org.apache.storm.generated.Nimbus$Processor$getTopologyInfo.getResult(Nimbus.java:3822) ~[classes/:?]
    	at org.apache.storm.generated.Nimbus$Processor$getTopologyInfo.getResult(Nimbus.java:3806) ~[classes/:?]
    	at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) ~[libthrift-0.9.3.jar:0.9.3]
    	at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) ~[libthrift-0.9.3.jar:0.9.3]
    	at org.apache.storm.security.auth.SimpleTransportPlugin$SimpleWrapProcessor.process(SimpleTransportPlugin.java:158) ~[classes/:?]
    	at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:518) [libthrift-0.9.3.jar:0.9.3]
    	at org.apache.thrift.server.Invocation.run(Invocation.java:18) [libthrift-0.9.3.jar:0.9.3]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_31]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_31]
    	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_31]
    ```
    
    That failure is a known issue that I fixed in my new supervisor pull request.
    
    +1 for the changes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1837: Fix complete-topology and prevent ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the pull request:

    https://github.com/apache/storm/pull/1417#issuecomment-219674715
  
    Something like this? https://github.com/apache/storm/pull/1421


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1417: STORM-1837: Fix complete-topology and prevent message los...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1417
  
    +1 pending travis.  @srdo like I said I don't expect _cb to ever go back to null, I just thought it would be good to be defensive because it is a possibility the way the API is currently written.  Not a big deal though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1417: STORM-1837: Fix complete-topology and prevent mess...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1417#discussion_r79412277
  
    --- Diff: storm-core/src/jvm/org/apache/storm/messaging/local/Context.java ---
    @@ -82,31 +86,73 @@ public void close() {
     
         private static class LocalClient implements IConnection {
             private final LocalServer _server;
    +        //Messages sent before the server registered a callback
    +        private final LinkedBlockingQueue<TaskMessage> _pendingDueToUnregisteredServer;
    +        private final ScheduledExecutorService _pendingFlusher;
     
             public LocalClient(LocalServer server) {
                 _server = server;
    +            _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
    +            _pendingFlusher = Executors.newScheduledThreadPool(1, new ThreadFactory(){
    +                @Override
    +                public Thread newThread(Runnable runnable) {
    +                    Thread thread = new Thread(runnable);
    +                    thread.setName("LocalClientFlusher-" + thread.getId());
    +                    thread.setDaemon(true);
    +                    return thread;
    +                }
    +            });
    +            _pendingFlusher.scheduleAtFixedRate(new Runnable(){
    +                @Override
    +                public void run(){
    +                    try {
    +                        //Ensure messages are flushed even if no more sends are performed
    +                        flushPending();
    +                    } catch (Throwable t) {
    +                        LOG.error("Uncaught throwable in pending message flusher thread, messages may be lost", t);
    +                        throw t;
    +                    }
    +                }
    +            }, 5, 5, TimeUnit.SECONDS);
             }
     
             @Override
             public void registerRecv(IConnectionCallback cb) {
                 throw new IllegalArgumentException("SHOULD NOT HAPPEN");
             }
    -
    +        
    +        private void flushPending(){
    +            if (_server._cb != null && !_pendingDueToUnregisteredServer.isEmpty()) {
    +                ArrayList<TaskMessage> ret = new ArrayList<>();
    +                _pendingDueToUnregisteredServer.drainTo(ret);
    +                _server._cb.recv(ret);
    --- End diff --
    
    If _cb is marked as volatile, does that means we expect it to change at any point in time?  If so then the null check is not enough.  We should cache it locally before doing the null check and the call (everywhere).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1417: STORM-1837: Fix complete-topology and prevent message los...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1417
  
    @revans2 Sure, it doesn't hurt. Added the caching.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1417: STORM-1837: Fix complete-topology and prevent message los...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1417
  
    I don't know why Travis is having such a hard time with master. The tests run fine on my machine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1417: STORM-1837: Fix complete-topology and prevent mess...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1417


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1417: STORM-1837: Fix complete-topology and prevent mess...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1417#discussion_r79430410
  
    --- Diff: storm-core/src/jvm/org/apache/storm/messaging/local/Context.java ---
    @@ -82,31 +86,73 @@ public void close() {
     
         private static class LocalClient implements IConnection {
             private final LocalServer _server;
    +        //Messages sent before the server registered a callback
    +        private final LinkedBlockingQueue<TaskMessage> _pendingDueToUnregisteredServer;
    +        private final ScheduledExecutorService _pendingFlusher;
     
             public LocalClient(LocalServer server) {
                 _server = server;
    +            _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
    +            _pendingFlusher = Executors.newScheduledThreadPool(1, new ThreadFactory(){
    +                @Override
    +                public Thread newThread(Runnable runnable) {
    +                    Thread thread = new Thread(runnable);
    +                    thread.setName("LocalClientFlusher-" + thread.getId());
    +                    thread.setDaemon(true);
    +                    return thread;
    +                }
    +            });
    +            _pendingFlusher.scheduleAtFixedRate(new Runnable(){
    +                @Override
    +                public void run(){
    +                    try {
    +                        //Ensure messages are flushed even if no more sends are performed
    +                        flushPending();
    +                    } catch (Throwable t) {
    +                        LOG.error("Uncaught throwable in pending message flusher thread, messages may be lost", t);
    +                        throw t;
    --- End diff --
    
    My guess is that it is java8 being smart and that when we try to pull it into 1.x it will fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1417: STORM-1837: Fix complete-topology and prevent mess...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1417#discussion_r79412428
  
    --- Diff: storm-core/src/jvm/org/apache/storm/messaging/local/Context.java ---
    @@ -82,31 +86,73 @@ public void close() {
     
         private static class LocalClient implements IConnection {
             private final LocalServer _server;
    +        //Messages sent before the server registered a callback
    +        private final LinkedBlockingQueue<TaskMessage> _pendingDueToUnregisteredServer;
    +        private final ScheduledExecutorService _pendingFlusher;
     
             public LocalClient(LocalServer server) {
                 _server = server;
    +            _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
    +            _pendingFlusher = Executors.newScheduledThreadPool(1, new ThreadFactory(){
    +                @Override
    +                public Thread newThread(Runnable runnable) {
    +                    Thread thread = new Thread(runnable);
    +                    thread.setName("LocalClientFlusher-" + thread.getId());
    +                    thread.setDaemon(true);
    +                    return thread;
    +                }
    +            });
    +            _pendingFlusher.scheduleAtFixedRate(new Runnable(){
    +                @Override
    +                public void run(){
    +                    try {
    +                        //Ensure messages are flushed even if no more sends are performed
    +                        flushPending();
    +                    } catch (Throwable t) {
    +                        LOG.error("Uncaught throwable in pending message flusher thread, messages may be lost", t);
    +                        throw t;
    +                    }
    +                }
    +            }, 5, 5, TimeUnit.SECONDS);
             }
     
             @Override
             public void registerRecv(IConnectionCallback cb) {
                 throw new IllegalArgumentException("SHOULD NOT HAPPEN");
             }
    -
    +        
    +        private void flushPending(){
    +            if (_server._cb != null && !_pendingDueToUnregisteredServer.isEmpty()) {
    +                ArrayList<TaskMessage> ret = new ArrayList<>();
    +                _pendingDueToUnregisteredServer.drainTo(ret);
    +                _server._cb.recv(ret);
    +            }
    +        }
    +        
             @Override
             public void send(int taskId,  byte[] payload) {
    +            TaskMessage message = new TaskMessage(taskId, payload);
                 if (_server._cb != null) {
    -                _server._cb.recv(Arrays.asList(new TaskMessage(taskId, payload)));
    +                flushPending();
    +                _server._cb.recv(Arrays.asList(message));
    --- End diff --
    
    cache _cb before using it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1417: STORM-1837: Fix complete-topology and prevent mess...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1417#discussion_r79430642
  
    --- Diff: storm-core/src/jvm/org/apache/storm/messaging/local/Context.java ---
    @@ -82,31 +86,73 @@ public void close() {
     
         private static class LocalClient implements IConnection {
             private final LocalServer _server;
    +        //Messages sent before the server registered a callback
    +        private final LinkedBlockingQueue<TaskMessage> _pendingDueToUnregisteredServer;
    +        private final ScheduledExecutorService _pendingFlusher;
     
             public LocalClient(LocalServer server) {
                 _server = server;
    +            _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
    +            _pendingFlusher = Executors.newScheduledThreadPool(1, new ThreadFactory(){
    +                @Override
    +                public Thread newThread(Runnable runnable) {
    +                    Thread thread = new Thread(runnable);
    +                    thread.setName("LocalClientFlusher-" + thread.getId());
    +                    thread.setDaemon(true);
    +                    return thread;
    +                }
    +            });
    +            _pendingFlusher.scheduleAtFixedRate(new Runnable(){
    +                @Override
    +                public void run(){
    +                    try {
    +                        //Ensure messages are flushed even if no more sends are performed
    +                        flushPending();
    +                    } catch (Throwable t) {
    +                        LOG.error("Uncaught throwable in pending message flusher thread, messages may be lost", t);
    +                        throw t;
    +                    }
    +                }
    +            }, 5, 5, TimeUnit.SECONDS);
             }
     
             @Override
             public void registerRecv(IConnectionCallback cb) {
                 throw new IllegalArgumentException("SHOULD NOT HAPPEN");
             }
    -
    +        
    +        private void flushPending(){
    +            if (_server._cb != null && !_pendingDueToUnregisteredServer.isEmpty()) {
    +                ArrayList<TaskMessage> ret = new ArrayList<>();
    +                _pendingDueToUnregisteredServer.drainTo(ret);
    +                _server._cb.recv(ret);
    --- End diff --
    
    I expect the callback to be set to a non-null value when the receiving worker registers at some point. I don't expect it to be set to null again once it is non-null. The volatile is just to make sure that the value becomes visible to other threads once set.
    
    If it doesn't become null after being set non-null is caching still needed for some reason?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1837: Fix complete-topology and prevent ...

Posted by srdo <gi...@git.apache.org>.
GitHub user srdo reopened a pull request:

    https://github.com/apache/storm/pull/1417

    STORM-1837: Fix complete-topology and prevent message loss when runni\u2026

    \u2026ng local clusters with no time simulation
    
    Link for convenience https://issues.apache.org/jira/browse/STORM-1837
    
    @kishorvpatil @revans2 Could you guys review this when you have time?

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/storm STORM-1837-2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1417.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1417
    
----
commit 2bf6db18da7431c4179c7f93efc7d8ab0ce45a6f
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Date:   2016-05-13T16:22:37Z

    STORM-1837: Fix complete-topology and prevent message loss when running local clusters with no time simulation

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1417: STORM-1837: Fix complete-topology and prevent mess...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1417#discussion_r79413571
  
    --- Diff: storm-core/src/jvm/org/apache/storm/messaging/local/Context.java ---
    @@ -82,31 +86,73 @@ public void close() {
     
         private static class LocalClient implements IConnection {
             private final LocalServer _server;
    +        //Messages sent before the server registered a callback
    +        private final LinkedBlockingQueue<TaskMessage> _pendingDueToUnregisteredServer;
    +        private final ScheduledExecutorService _pendingFlusher;
     
             public LocalClient(LocalServer server) {
                 _server = server;
    +            _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
    +            _pendingFlusher = Executors.newScheduledThreadPool(1, new ThreadFactory(){
    +                @Override
    +                public Thread newThread(Runnable runnable) {
    +                    Thread thread = new Thread(runnable);
    +                    thread.setName("LocalClientFlusher-" + thread.getId());
    +                    thread.setDaemon(true);
    +                    return thread;
    +                }
    +            });
    +            _pendingFlusher.scheduleAtFixedRate(new Runnable(){
    +                @Override
    +                public void run(){
    +                    try {
    +                        //Ensure messages are flushed even if no more sends are performed
    +                        flushPending();
    +                    } catch (Throwable t) {
    +                        LOG.error("Uncaught throwable in pending message flusher thread, messages may be lost", t);
    +                        throw t;
    --- End diff --
    
    Does this compile?  I didn't think we could throw a throwable from a method that does not declare it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1837: Fix complete-topology and prevent ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the pull request:

    https://github.com/apache/storm/pull/1417#issuecomment-219549248
  
    I'd be happy to bump it. I don't think test-load-fn actually hits any of the changed code though, since it uses the netty.Context rather than local.Context. What do you think is a reasonable timeout? 15-20 seconds?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1837: Fix complete-topology and prevent ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo closed the pull request at:

    https://github.com/apache/storm/pull/1417


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1417: STORM-1837: Fix complete-topology and prevent mess...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1417#discussion_r79412490
  
    --- Diff: storm-core/src/jvm/org/apache/storm/messaging/local/Context.java ---
    @@ -82,31 +86,73 @@ public void close() {
     
         private static class LocalClient implements IConnection {
             private final LocalServer _server;
    +        //Messages sent before the server registered a callback
    +        private final LinkedBlockingQueue<TaskMessage> _pendingDueToUnregisteredServer;
    +        private final ScheduledExecutorService _pendingFlusher;
     
             public LocalClient(LocalServer server) {
                 _server = server;
    +            _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
    +            _pendingFlusher = Executors.newScheduledThreadPool(1, new ThreadFactory(){
    +                @Override
    +                public Thread newThread(Runnable runnable) {
    +                    Thread thread = new Thread(runnable);
    +                    thread.setName("LocalClientFlusher-" + thread.getId());
    +                    thread.setDaemon(true);
    +                    return thread;
    +                }
    +            });
    +            _pendingFlusher.scheduleAtFixedRate(new Runnable(){
    +                @Override
    +                public void run(){
    +                    try {
    +                        //Ensure messages are flushed even if no more sends are performed
    +                        flushPending();
    +                    } catch (Throwable t) {
    +                        LOG.error("Uncaught throwable in pending message flusher thread, messages may be lost", t);
    +                        throw t;
    +                    }
    +                }
    +            }, 5, 5, TimeUnit.SECONDS);
             }
     
             @Override
             public void registerRecv(IConnectionCallback cb) {
                 throw new IllegalArgumentException("SHOULD NOT HAPPEN");
             }
    -
    +        
    +        private void flushPending(){
    +            if (_server._cb != null && !_pendingDueToUnregisteredServer.isEmpty()) {
    +                ArrayList<TaskMessage> ret = new ArrayList<>();
    +                _pendingDueToUnregisteredServer.drainTo(ret);
    +                _server._cb.recv(ret);
    +            }
    +        }
    +        
             @Override
             public void send(int taskId,  byte[] payload) {
    +            TaskMessage message = new TaskMessage(taskId, payload);
                 if (_server._cb != null) {
    -                _server._cb.recv(Arrays.asList(new TaskMessage(taskId, payload)));
    +                flushPending();
    +                _server._cb.recv(Arrays.asList(message));
    +            } else {
    +                _pendingDueToUnregisteredServer.add(message);
                 }
             }
      
             @Override
             public void send(Iterator<TaskMessage> msgs) {
                 if (_server._cb != null) {
    +                flushPending();
                     ArrayList<TaskMessage> ret = new ArrayList<>();
                     while (msgs.hasNext()) {
                         ret.add(msgs.next());
                     }
                     _server._cb.recv(ret);
    --- End diff --
    
    cache _cb before using it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1417: STORM-1837: Fix complete-topology and prevent message los...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1417
  
    @revans2 Any chance I could get you to take a look at this? I'd like to get this in, since local clusters are currently likely to drop messages when not using time simulation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1837: Fix complete-topology and prevent ...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/1417#issuecomment-219600127
  
    @kishorvpatil 
    Nice finding. I think it should reflect STORM_TEST_TIMEOUT_MS, but it can be done with another JIRA issue since it seems not related on this PR.
    
    Btw, other test failure is observed from Travis CI, which I think is also not related to this PR.
    
    ```
    Running org.apache.storm.utils.DisruptorQueueTest
    Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 4.352 sec <<< FAILURE! - in org.apache.storm.utils.DisruptorQueueTest
    testInOrderBatch(org.apache.storm.utils.DisruptorQueueTest)  Time elapsed: 0.405 sec  <<< FAILURE!
    junit.framework.AssertionFailedError: producer 0 is still alive
    	at junit.framework.Assert.fail(Assert.java:57)
    	at junit.framework.Assert.assertTrue(Assert.java:22)
    	at junit.framework.Assert.assertFalse(Assert.java:39)
    	at junit.framework.TestCase.assertFalse(TestCase.java:210)
    	at org.apache.storm.utils.DisruptorQueueTest.run(DisruptorQueueTest.java:135)
    	at org.apache.storm.utils.DisruptorQueueTest.testInOrderBatch(DisruptorQueueTest.java:106)
    ```
    
    @srdo 
    I guess what @kishorvpatil reported is a kind of intermittent failure, since one of storm-core build is passed on Travis CI.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1837: Fix complete-topology and prevent ...

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on the pull request:

    https://github.com/apache/storm/pull/1417#issuecomment-219741531
  
    @HeartSaVioR, you are right, I did not notice the `DisruptorQueueTest` failure.
    @srdo, Thank you for putting up #1421. It's ironic to see `LocalNimbusTest` fail for that PR. :) I am bumping this on up.
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---