You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by HeartSaVioR <gi...@git.apache.org> on 2014/10/08 16:08:50 UTC

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

GitHub user HeartSaVioR opened a pull request:

    https://github.com/apache/storm/pull/286

    STORM-513 check heartbeat from multilang subprocess

    Related issue link : https://issues.apache.org/jira/browse/STORM-513
    
    It seems that ShellSpout and ShellBolt doesn't check subprocess, and set heartbeat with their only states.
    Subprocess could hang, but it doesn't affect ShellSpout / ShellBolt. It just stops working on tuple.
    It's better to check heartbeat from subprocess, and suicide if subprocess stops working.
    
    * Spout
      * ShellSpout sends "next" to subprocess continuously
      * subprocess sends "sync" to ShellSpout when "next" is received
      * so we can treat "sync", or any messages to heartbeat
    * Bolt
      * ShellBolt sends tuples to subprocess if it's available
      * so we need to send "heartbeat" tuple
       * subprocess sends "sync" to ShellBolt when "heartbeat" tuple is received

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HeartSaVioR/storm STORM-513

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/286.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #286
    
----
commit ca5874cdf11af8d835335d228b643f28aeb3f9c3
Author: Jungtaek Lim <ka...@gmail.com>
Date:   2014-10-08T13:48:01Z

    STORM-513 check heartbeat from multilang subprocess
    
    * Spout
    ** ShellSpout sends "next" to subprocess continuously
    ** subprocess sends "sync" to ShellSpout when "next" is received
    ** so we can treat "sync", or any messages to heartbeat
    * Bolt
    ** ShellBolt sends tuples to subprocess if it's available
    ** so we need to send "heartbeat" tuple
    ** subprocess sends "sync" to ShellBolt when "heartbeat" tuple is
    received

commit 1a0d4bdd735ba0ade42f6777a4c47affec931557
Author: Jungtaek Lim <ka...@gmail.com>
Date:   2014-10-08T14:06:24Z

    Fix mixed tab / space, remove FIXME

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-60438969
  
    Since there were additional commits added to the pull request, we need to give it more time for others to review before merging, but I am still +1 for the patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r19326747
  
    --- Diff: storm-core/src/jvm/backtype/storm/task/ShellBolt.java ---
    @@ -305,4 +283,95 @@ private void die(Throwable exception) {
                 System.exit(11);
             }
         }
    +
    +    private class BoltHeartbeatTimerTask extends TimerTask {
    +        private ShellBolt bolt;
    +
    +        public BoltHeartbeatTimerTask(ShellBolt bolt) {
    +            this.bolt = bolt;
    +        }
    +
    +        @Override
    +        public void run() {
    +            long currentTimeMillis = System.currentTimeMillis();
    +            long lastHeartbeat = getLastHeartbeat();
    +
    +            LOG.debug("BOLT - current time : {}, last heartbeat : {}, worker timeout (ms) : {}",
    +                    currentTimeMillis, lastHeartbeat, workerTimeoutMills);
    +
    +            if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
    +                bolt.die(new RuntimeException("subprocess heartbeat timeout"));
    +            }
    +
    +            String genId = Long.toString(_rand.nextLong());
    +            try {
    +                _pendingWrites.put(createHeartbeatBoltMessage(genId));
    --- End diff --
    
    @itaifrenkel Oh, I see. I didn't know that options exists. Thanks for letting me know!
    Then we can flip heartbeat flag which means "it's time to send heartbeat" as you state, and let BoltWriter.run() loop takes care of it first.
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18742262
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -189,9 +207,53 @@ private void handleLog(ShellMsg shellMsg) {
     
         @Override
         public void activate() {
    +        LOG.info("Start checking heartbeat...");
    +        // prevent timer to check heartbeat based on last thing before activate
    +        setHeartbeat();
    +        heartBeatExecutor.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
    --- End diff --
    
    I would recommend  scheduleWithFixedDelay since it is more explicit in how it uses a single thread. Again, ask the comitters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-61657845
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18745322
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -189,9 +207,53 @@ private void handleLog(ShellMsg shellMsg) {
     
         @Override
         public void activate() {
    +        LOG.info("Start checking heartbeat...");
    +        // prevent timer to check heartbeat based on last thing before activate
    +        setHeartbeat();
    +        heartBeatExecutor.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
    --- End diff --
    
    @itaifrenkel I don't know well about ScheduledThreadPoolExecutor, so I have a question.
    When our runnable task runs within 1s, can task be executed by fixed rate?
    If it is, I think scheduleAtFixedRate makes sense. I'm expecting heartbeat check exactly per 1s.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18741401
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -189,9 +205,52 @@ private void handleLog(ShellMsg shellMsg) {
     
         @Override
         public void activate() {
    +        LOG.info("Start checking heartbeat...");
    +        // prevent timer to check heartbeat based on last thing before activate
    +        setHeartbeat();
    +        heartBeatTimer.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1000, 1 * 1000);
         }
     
         @Override
         public void deactivate() {
    +        heartBeatTimer.cancel();
    +    }
    +
    +    private void setHeartbeat() {
    +        lastHeartbeatTimestamp.set(System.currentTimeMillis());
    +    }
    +
    +    private long getLastHeartbeat() {
    +        return lastHeartbeatTimestamp.get();
    +    }
    +
    +    private void die(Throwable exception) {
    +        heartBeatTimer.cancel();
    +
    +        LOG.error("Halting process: ShellSpout died.", exception);
    +        _collector.reportError(exception);
    +        System.exit(11);
    --- End diff --
    
    @itaifrenkel I agree that we should process.destroy() before terminating itself. 
    (It has been maintained by JDK and it's implemented with JNI, so it would be OS specific.)
    I also think storm project tries to support Windows, signal handle to SIGTERM maybe not a solution.
    I'll change it to call process.destroy() first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-63094779
  
    @HeartSaVioR  I am working on doing some tests on this PR. I tried to build the storm with your changes in and I am getting these failures. Can you please check if you see any of these issues. Thanks.
    
    java.lang.Exception: Shell Process Exception: Exception in bolt: undefined method `+' for nil:NilClass - tester_bolt.rb:29:in `process'\n/private/var/folders/yb/67h7c1sx2d95r5c_x5cjdwmh0000gp/T/ddda5ca6-8167-4ed1-bfef-a1a2001f65a2/supervisor/stormdist/test-1-1415984043/resources/storm.rb:186:in `run'\ntester_bolt.rb:37:in `<main>'
    	at backtype.storm.task.ShellBolt.handleError(ShellBolt.java:188) [classes/:na]
    	at backtype.storm.task.ShellBolt.access$1100(ShellBolt.java:69) [classes/:na]
    	at backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:331) [classes/:na]
    	at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
    90960 [Thread-1055] ERROR backtype.storm.task.ShellBolt - Halting process: ShellBolt died.
    java.lang.RuntimeException: backtype.storm.multilang.NoOutputException: Pipe to subprocess seems to be broken! No output read.
    Serializer Exception:
    
    
    	at backtype.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:101) ~[classes/:na]
    	at backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:318) ~[classes/:na]
    	at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18745292
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -56,13 +66,18 @@ public void open(Map stormConf, TopologyContext context,
             _collector = collector;
             _context = context;
     
    +        workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
    +
             _process = new ShellProcess(_command);
     
             Number subpid = _process.launch(stormConf, context);
             LOG.info("Launched subprocess with pid " + subpid);
    +
    +        heartBeatExecutor = new ScheduledThreadPoolExecutor(5);
    --- End diff --
    
    @itaifrenkel 
    1. AFAIK, ShellSpout's nextTuple() waits infinitely if subprocess is hang (or doesn't send anything). That's what "sync" is for.
    So we should check heartbeat with "new thread".
    Please correct me if I'm wrong. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-61715752
  
    @harshach Sure. I'll upmerge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-61669206
  
    @HeartSaVioR  Thanks for the patch. can you upmerge the changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r19332243
  
    --- Diff: storm-core/src/jvm/backtype/storm/task/ShellBolt.java ---
    @@ -305,4 +283,95 @@ private void die(Throwable exception) {
                 System.exit(11);
             }
         }
    +
    +    private class BoltHeartbeatTimerTask extends TimerTask {
    +        private ShellBolt bolt;
    +
    +        public BoltHeartbeatTimerTask(ShellBolt bolt) {
    +            this.bolt = bolt;
    +        }
    +
    +        @Override
    +        public void run() {
    +            long currentTimeMillis = System.currentTimeMillis();
    +            long lastHeartbeat = getLastHeartbeat();
    +
    +            LOG.debug("BOLT - current time : {}, last heartbeat : {}, worker timeout (ms) : {}",
    +                    currentTimeMillis, lastHeartbeat, workerTimeoutMills);
    +
    +            if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
    +                bolt.die(new RuntimeException("subprocess heartbeat timeout"));
    +            }
    +
    +            String genId = Long.toString(_rand.nextLong());
    +            try {
    +                _pendingWrites.put(createHeartbeatBoltMessage(genId));
    --- End diff --
    
    @itaifrenkel I've updated PR to reflect it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18741577
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -56,13 +64,18 @@ public void open(Map stormConf, TopologyContext context,
             _collector = collector;
             _context = context;
     
    +        workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
    +
             _process = new ShellProcess(_command);
     
             Number subpid = _process.launch(stormConf, context);
             LOG.info("Launched subprocess with pid " + subpid);
    +
    +        heartBeatTimer = new Timer(context.getThisTaskId() + "-heartbeatTimer", true);
    --- End diff --
    
    @itaifrenkel Users can enable tick tuple and use it to his/her purpose. Also we cannot use high tick time (N minute) for heartbeat.
    So we shouldn't use tick tuples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18745301
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -56,13 +66,18 @@ public void open(Map stormConf, TopologyContext context,
             _collector = collector;
             _context = context;
     
    +        workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
    +
             _process = new ShellProcess(_command);
     
             Number subpid = _process.launch(stormConf, context);
             LOG.info("Launched subprocess with pid " + subpid);
    +
    +        heartBeatExecutor = new ScheduledThreadPoolExecutor(5);
    --- End diff --
    
    @itaifrenkel I agree that 2, 3. I'll update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18741389
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -189,9 +205,52 @@ private void handleLog(ShellMsg shellMsg) {
     
         @Override
         public void activate() {
    +        LOG.info("Start checking heartbeat...");
    +        // prevent timer to check heartbeat based on last thing before activate
    +        setHeartbeat();
    +        heartBeatTimer.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1000, 1 * 1000);
         }
     
         @Override
         public void deactivate() {
    +        heartBeatTimer.cancel();
    +    }
    +
    +    private void setHeartbeat() {
    +        lastHeartbeatTimestamp.set(System.currentTimeMillis());
    +    }
    +
    +    private long getLastHeartbeat() {
    +        return lastHeartbeatTimestamp.get();
    +    }
    +
    +    private void die(Throwable exception) {
    +        heartBeatTimer.cancel();
    +
    +        LOG.error("Halting process: ShellSpout died.", exception);
    +        _collector.reportError(exception);
    +        System.exit(11);
         }
    +
    +    private class SpoutHeartbeatTimerTask extends TimerTask {
    +        private ShellSpout spout;
    +
    +        public SpoutHeartbeatTimerTask(ShellSpout spout) {
    +            this.spout = spout;
    +        }
    +
    +        @Override
    +        public void run() {
    +            long currentTimeMillis = System.currentTimeMillis();
    +            long lastHeartbeat = getLastHeartbeat();
    +
    +            LOG.debug("current time : " + currentTimeMillis + ", last heartbeat : " + lastHeartbeat
    --- End diff --
    
    @itaifrenkel Oh, you're right! It's slf4j so we should use {} to log effectively.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-62295127
  
    Please comment on STORM-528 if you resolved the py files divergence problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18741220
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -56,13 +64,18 @@ public void open(Map stormConf, TopologyContext context,
             _collector = collector;
             _context = context;
     
    +        workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
    +
             _process = new ShellProcess(_command);
     
             Number subpid = _process.launch(stormConf, context);
             LOG.info("Launched subprocess with pid " + subpid);
    +
    +        heartBeatTimer = new Timer(context.getThisTaskId() + "-heartbeatTimer", true);
    --- End diff --
    
    see also http://stackoverflow.com/questions/17419386/scheduledexecutorservice-how-to-stop-action-without-stopping-executor 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-62295379
  
    @itaifrenkel @clockfly 
    I agree @itaifrenkel because there're more implementations (but it doesn't exist on Storm project) on multilang.
    I saw php implementation of multilang (Sorry I can't remember where it is), and there could be more.
    If we let subprocess take care of many things, implementations should apply it and update.
    (Actually we already force them to apply this change because bolt has to treat heartbeat tuple. ;( )
    So we should consider trade-off, and maybe we should have documentation of multilang specification.
    
    AND changes of multilang protocol introduced on this PR should be documented when we announce to release 0.9.3.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18741231
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -56,13 +64,18 @@ public void open(Map stormConf, TopologyContext context,
             _collector = collector;
             _context = context;
     
    +        workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
    +
             _process = new ShellProcess(_command);
     
             Number subpid = _process.launch(stormConf, context);
             LOG.info("Launched subprocess with pid " + subpid);
    +
    +        heartBeatTimer = new Timer(context.getThisTaskId() + "-heartbeatTimer", true);
    --- End diff --
    
    Is there a reason you have not used tick tuples instead? http://kitmenke.com/blog/2014/08/04/tick-tuples-within-storm/ 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18741189
  
    --- Diff: storm-core/src/dev/resources/storm.js ---
    @@ -243,6 +243,12 @@ BasicBolt.prototype.__emit = function(commandDetails) {
     BasicBolt.prototype.handleNewCommand = function(command) {
         var self = this;
         var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
    +
    +    if (tup.task == -1 && tup.stream == "__heartbeat") {
    +        self.sync();
    +        return;
    +    }
    +
         var callback = function(err) {
               if (err) {
    --- End diff --
    
    storm.py , storm.rb, and storm.js are comitted three times. Each change needs to be modified in all three files. (Yeay - I know ....)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18741196
  
    --- Diff: storm-core/src/dev/resources/storm.js ---
    @@ -243,6 +243,12 @@ BasicBolt.prototype.__emit = function(commandDetails) {
     BasicBolt.prototype.handleNewCommand = function(command) {
         var self = this;
         var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
    +
    +    if (tup.task == -1 && tup.stream == "__heartbeat") {
    --- End diff --
    
    In nodejs use === 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18741514
  
    --- Diff: storm-core/src/dev/resources/storm.js ---
    @@ -243,6 +243,12 @@ BasicBolt.prototype.__emit = function(commandDetails) {
     BasicBolt.prototype.handleNewCommand = function(command) {
         var self = this;
         var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
    +
    +    if (tup.task == -1 && tup.stream == "__heartbeat") {
    --- End diff --
    
    @itaifrenkel Oh, thanks! Actually I'm not nodejs user. I'll change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r19326582
  
    --- Diff: storm-core/src/jvm/backtype/storm/task/ShellBolt.java ---
    @@ -305,4 +283,95 @@ private void die(Throwable exception) {
                 System.exit(11);
             }
         }
    +
    +    private class BoltHeartbeatTimerTask extends TimerTask {
    +        private ShellBolt bolt;
    +
    +        public BoltHeartbeatTimerTask(ShellBolt bolt) {
    +            this.bolt = bolt;
    +        }
    +
    +        @Override
    +        public void run() {
    +            long currentTimeMillis = System.currentTimeMillis();
    +            long lastHeartbeat = getLastHeartbeat();
    +
    +            LOG.debug("BOLT - current time : {}, last heartbeat : {}, worker timeout (ms) : {}",
    +                    currentTimeMillis, lastHeartbeat, workerTimeoutMills);
    +
    +            if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
    +                bolt.die(new RuntimeException("subprocess heartbeat timeout"));
    +            }
    +
    +            String genId = Long.toString(_rand.nextLong());
    +            try {
    +                _pendingWrites.put(createHeartbeatBoltMessage(genId));
    --- End diff --
    
    I reread the code and think that we need here just to flip an atomicboolean (a priority queue for heartbeats of size 1). The reason is that the size of the _pendingWrites queue is Config.TOPOLOGY_SHELLBOLT_MAX_PENDING which by its name is the number of real tuples to retrieve from the disruptor queue. We set it to 1 to optimize for shortest latency... which would cause this thread to block.... which means you cannot share this thread between bolts event if you wanted too... which we need to think if this is an issue or not. A stronger argument in favor of a priority queue for heartbeats is that the rate of heartbeat messages will not be skewed by the length of the queue. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18741213
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -56,13 +64,18 @@ public void open(Map stormConf, TopologyContext context,
             _collector = collector;
             _context = context;
     
    +        workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
    +
             _process = new ShellProcess(_command);
     
             Number subpid = _process.launch(stormConf, context);
             LOG.info("Launched subprocess with pid " + subpid);
    +
    +        heartBeatTimer = new Timer(context.getThisTaskId() + "-heartbeatTimer", true);
    --- End diff --
    
    Recommended to use SingleThreadedScheduledExecutorService. see http://stackoverflow.com/questions/409932/java-timer-vs-executorservice


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/286


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18741678
  
    --- Diff: storm-core/src/dev/resources/storm.js ---
    @@ -243,6 +243,12 @@ BasicBolt.prototype.__emit = function(commandDetails) {
     BasicBolt.prototype.handleNewCommand = function(command) {
         var self = this;
         var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
    +
    +    if (tup.task == -1 && tup.stream == "__heartbeat") {
    +        self.sync();
    +        return;
    +    }
    +
         var callback = function(err) {
               if (err) {
    --- End diff --
    
    @itaifrenkel Oh, I see... It exists 3 times. I think only one should be existed, but I'm not sure it can be.
    I'll apply my changes to other files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18741104
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -189,9 +205,52 @@ private void handleLog(ShellMsg shellMsg) {
     
         @Override
         public void activate() {
    +        LOG.info("Start checking heartbeat...");
    +        // prevent timer to check heartbeat based on last thing before activate
    +        setHeartbeat();
    +        heartBeatTimer.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1000, 1 * 1000);
    --- End diff --
    
    Could you please add a  bolt configuration to diable it? Some of our python bolt are expiriemental and are unanchored. I would not want to crash the entire worker, since it would incur serious downtime.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18745857
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -189,9 +207,53 @@ private void handleLog(ShellMsg shellMsg) {
     
         @Override
         public void activate() {
    +        LOG.info("Start checking heartbeat...");
    +        // prevent timer to check heartbeat based on last thing before activate
    +        setHeartbeat();
    +        heartBeatExecutor.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
    --- End diff --
    
    When having a single thread there is not much difference between the two. Even more your call back is non blocking so the difference is insignificant. Nevertheless, even if it were entirley not accurate the code logic would stay intact. There is nothing in that callback that warrants exactly one second.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-62295398
  
    @itaifrenkel OK, I've commented to STORM-528.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-62295920
  
    Maybe we can have a discussion about multilang from mailing list or other issue.
    There seems to leak documentation / strategy / etc. about multilang. How do you all think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-60854884
  
    Can PR be included to 0.9.3, or next version?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-60438795
  
    @itaifrenkel No, not that's available for use via the bolt API, but it's an interesting idea. You could effectively do the same by making the scheduler static (1 per worker/JVM), but that feels kind of hacky.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-58752058
  
    @itaifrenkel Thanks for pointing out missed spots and points to improve!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18803104
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -56,13 +67,18 @@ public void open(Map stormConf, TopologyContext context,
             _collector = collector;
             _context = context;
     
    +        workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
    +
             _process = new ShellProcess(_command);
     
             Number subpid = _process.launch(stormConf, context);
             LOG.info("Launched subprocess with pid " + subpid);
    +
    +        heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    --- End diff --
    
    @itaifrenkel OK, I don't like explicit cast, so I'd not modify it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by dan-blanchard <gi...@git.apache.org>.
Github user dan-blanchard commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-58363290
  
    As the person who filed the issue. Thanks for coming up with a solution so quickly!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-59983717
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18741463
  
    --- Diff: storm-core/src/dev/resources/storm.js ---
    @@ -243,6 +243,12 @@ BasicBolt.prototype.__emit = function(commandDetails) {
     BasicBolt.prototype.handleNewCommand = function(command) {
         var self = this;
         var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
    +
    +    if (tup.task == -1 && tup.stream == "__heartbeat") {
    +        self.sync();
    +        return;
    +    }
    +
         var callback = function(err) {
               if (err) {
    --- End diff --
    
    @itaifrenkel Sorry, I don't know what you say. Could you please describe more clear? I committed all changes in one commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-62294914
  
    @harshach @ptgoez @clockfly 
    Could we merge upmerged PR now, or PR should take care of additional modification before merge?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-63134028
  
    +1 (again)
    
    @harshach The problem you saw was due to two of the `storm.rb` files being out of sync. I will correct that at merge time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by hustfxj <gi...@git.apache.org>.
Github user hustfxj commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-156884789
  
    +1 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18741096
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -189,9 +205,52 @@ private void handleLog(ShellMsg shellMsg) {
     
         @Override
         public void activate() {
    +        LOG.info("Start checking heartbeat...");
    +        // prevent timer to check heartbeat based on last thing before activate
    +        setHeartbeat();
    +        heartBeatTimer.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1000, 1 * 1000);
         }
     
         @Override
         public void deactivate() {
    +        heartBeatTimer.cancel();
    +    }
    +
    +    private void setHeartbeat() {
    +        lastHeartbeatTimestamp.set(System.currentTimeMillis());
    +    }
    +
    +    private long getLastHeartbeat() {
    +        return lastHeartbeatTimestamp.get();
    +    }
    +
    +    private void die(Throwable exception) {
    +        heartBeatTimer.cancel();
    +
    +        LOG.error("Halting process: ShellSpout died.", exception);
    +        _collector.reportError(exception);
    +        System.exit(11);
         }
    +
    +    private class SpoutHeartbeatTimerTask extends TimerTask {
    +        private ShellSpout spout;
    +
    +        public SpoutHeartbeatTimerTask(ShellSpout spout) {
    +            this.spout = spout;
    +        }
    +
    +        @Override
    +        public void run() {
    +            long currentTimeMillis = System.currentTimeMillis();
    +            long lastHeartbeat = getLastHeartbeat();
    +
    +            LOG.debug("current time : " + currentTimeMillis + ", last heartbeat : " + lastHeartbeat
    --- End diff --
    
    whe doing debug logging try to refrain from using +. Either surraound with isDebugLevel()  or  use "current time :{}, last heartbeat : {} ....


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-60365096
  
    @ptgoetz Is there a shared ScheduledExecutor in storm? It seems redundant that each bolt would dedicate a thread for the scheduler. Especially now, that the task itself would be non-blocking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18741180
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -189,9 +205,52 @@ private void handleLog(ShellMsg shellMsg) {
     
         @Override
         public void activate() {
    +        LOG.info("Start checking heartbeat...");
    +        // prevent timer to check heartbeat based on last thing before activate
    +        setHeartbeat();
    +        heartBeatTimer.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1000, 1 * 1000);
         }
     
         @Override
         public void deactivate() {
    +        heartBeatTimer.cancel();
    +    }
    +
    +    private void setHeartbeat() {
    +        lastHeartbeatTimestamp.set(System.currentTimeMillis());
    +    }
    +
    +    private long getLastHeartbeat() {
    +        return lastHeartbeatTimestamp.get();
    +    }
    +
    +    private void die(Throwable exception) {
    +        heartBeatTimer.cancel();
    +
    +        LOG.error("Halting process: ShellSpout died.", exception);
    +        _collector.reportError(exception);
    +        System.exit(11);
    --- End diff --
    
    All of our pyton and multilang bolts have special code that intercepts the SIG_TERM singal and kill when parent process dies. This has not been contributed back since it is very linux specific and logger specific. Without it you might end up having zomie worker processes. This does not relate to your commit since you didn't invent the System.exit(11) thingy, however it would make things worse when a process is not responding. Ideally you would at least want to call process.destory() first. As process destroy is implemented without kill -9 it is not guaranteed to work (sigar's implements this per OS quite nicely).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-62298800
  
    AFAIK, there was a discussion about moving documents (containing website) to git, started by @ptgoetz, and +1 by many committers.
    But it's actually not applied, so we can't fix ourselves. We can just report to dev mailing list.
    
    Personally I'm heavily inspired to Redis documentation. Document has been treated to same as code.
    It's on http://github.com/antirez/redis-doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18747035
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -56,13 +67,18 @@ public void open(Map stormConf, TopologyContext context,
             _collector = collector;
             _context = context;
     
    +        workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
    +
             _process = new ShellProcess(_command);
     
             Number subpid = _process.launch(stormConf, context);
             LOG.info("Launched subprocess with pid " + subpid);
    +
    +        heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    --- End diff --
    
    You could cast, But that's ok as it is


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18742240
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -56,13 +66,18 @@ public void open(Map stormConf, TopologyContext context,
             _collector = collector;
             _context = context;
     
    +        workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
    +
             _process = new ShellProcess(_command);
     
             Number subpid = _process.launch(stormConf, context);
             LOG.info("Launched subprocess with pid " + subpid);
    +
    +        heartBeatExecutor = new ScheduledThreadPoolExecutor(5);
    --- End diff --
    
    1. You should consult the comitters if they are happy with another thread, or you are requested to use Tick tuples. A pro for another thread is the fact that maybe maybe a multilang bolt would want to use tick tuples too. But still...
    
    2. 1 Thread should be enough.
    
    3. This thread must not halt the process when main exists (as in tests), so it should be daemonized. The way to do it AFAIK is this
    heartBeanExecutor = MoreExecutors.getExitingScheduledExecutorService(Executors.newScheduledThreadPool(1))


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18745860
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -56,13 +67,18 @@ public void open(Map stormConf, TopologyContext context,
             _collector = collector;
             _context = context;
     
    +        workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
    +
             _process = new ShellProcess(_command);
     
             Number subpid = _process.launch(stormConf, context);
             LOG.info("Launched subprocess with pid " + subpid);
    +
    +        heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    --- End diff --
    
    new ScheduledThreadPoolExecutor(1) --> Executors.newScheduledThreadPool(1)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-62295232
  
    @HeartSaVioR @clockfly  I think we need to keep the multilang protocl implementation as simple as possible. A full roundtrip of heartbeat messages is not that bad, as long as it does not add too much latency. If you would like an optimization for the rountrip messages then you could consider any emit as an heartbeat, and trigger the heartbeat rountrip only if there are not enough emits from the bolt. It makes the java code more complicated :(, but achieves similar goals, and leaves the multilang implementation simpler :). All-in-all I think this commit is good, and we could discuss various optimizations later on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18746971
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -56,13 +67,18 @@ public void open(Map stormConf, TopologyContext context,
             _collector = collector;
             _context = context;
     
    +        workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
    +
             _process = new ShellProcess(_command);
     
             Number subpid = _process.launch(stormConf, context);
             LOG.info("Launched subprocess with pid " + subpid);
    +
    +        heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    --- End diff --
    
    @itaifrenkel 
    MoreExecutors.getExitingScheduledExecutorService() receives ScheduledThreadPoolExecutor, not ScheduledExecutorService. I tried to change it, but compiler complained.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-62296198
  
    We should document this feature to http://storm.apache.org/documentation/Multilang-protocol.html
    In bolts, bolt should handle heartbeat tuple, and send sync response ASAP.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r19330106
  
    --- Diff: storm-core/src/jvm/backtype/storm/task/ShellBolt.java ---
    @@ -305,4 +283,95 @@ private void die(Throwable exception) {
                 System.exit(11);
             }
         }
    +
    +    private class BoltHeartbeatTimerTask extends TimerTask {
    +        private ShellBolt bolt;
    +
    +        public BoltHeartbeatTimerTask(ShellBolt bolt) {
    +            this.bolt = bolt;
    +        }
    +
    +        @Override
    +        public void run() {
    +            long currentTimeMillis = System.currentTimeMillis();
    +            long lastHeartbeat = getLastHeartbeat();
    +
    +            LOG.debug("BOLT - current time : {}, last heartbeat : {}, worker timeout (ms) : {}",
    +                    currentTimeMillis, lastHeartbeat, workerTimeoutMills);
    +
    +            if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
    +                bolt.die(new RuntimeException("subprocess heartbeat timeout"));
    +            }
    +
    +            String genId = Long.toString(_rand.nextLong());
    +            try {
    +                _pendingWrites.put(createHeartbeatBoltMessage(genId));
    --- End diff --
    
    That sounds very good :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-58366793
  
    @dan-blanchard You're welcome!
    
    Actually it doesn't let subprocess sends heartbeat periodically, so it can't cover some situations.
    ex) subprocess is alive but cannot process "heartbeat" tuple in time 
    It can be possible if subprocess is too busy or subprocess takes long time to process "one" tuple.
    Current approach is simpler than others, but if we should cover edge-case, I will try to take care of it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-61720636
  
    OK, I've upmerged.
    Btw, I found py files are diverged too so I need to copy and paste one file to another.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/286#discussion_r18741698
  
    --- Diff: storm-core/src/dev/resources/storm.js ---
    @@ -243,6 +243,12 @@ BasicBolt.prototype.__emit = function(commandDetails) {
     BasicBolt.prototype.handleNewCommand = function(command) {
         var self = this;
         var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
    +
    +    if (tup.task == -1 && tup.stream == "__heartbeat") {
    +        self.sync();
    +        return;
    +    }
    +
         var callback = function(err) {
               if (err) {
    --- End diff --
    
    @itaifrenkel OK, I applied it to all files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-61719213
  
    I've got a change to discuss about this PR with @clockfly , and he also stated if subprocess is too busy, subprocess cannot send heartbeat in time, which I've stated first of this PR.
    
    Actually it's better to let subprocess have heartbeat thread and send heartbeat periodically.
    But there're two things to consider.
    1. ShellSpout runs with PING-PONG communication, and ShellSpout must wait "sync" from nextTuple(). So if we change ShellSpout to have reader thread, we should implement nextTuple() to wait for reading "sync" from reader thread, which is a little complex than current.
    2. We should ensure that main thread and heartbeat thread don't write stdout (maybe Pipe) at the same time. GIL could let us feel free, but there will be other languages that support real (?) thread. Writing operation should be with lock.
    
    Since I'm not a Javascript (nodejs) guy, and I'm a beginner to Ruby, I cannot cover two things with .js. 
    So I wish to implement it to other PR when we think we can't stand its limitation, or I have some more time.
    
    Btw, Nimbus / Supervisor can find dead process due to subprocess hang up to SUPERVISOR_WORKER_TIMEOUT_SECS * 2 + a (maybe), cause there're two heartbeat check, ShellProcess checks subprocess (and suicide if subprocess cannot respond), Nimbus / Supervisor checks ShellProcess.
    (Just for @clockfly )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-513 check heartbeat from multilang subpr...

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on the pull request:

    https://github.com/apache/storm/pull/286#issuecomment-62296939
  
    @HeartSaVioR I wanted to fix the multilang docs some time ago for nodejs support, but couldn't find a way to provide a pull request ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---