You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by William Oberman <ob...@civicscience.com> on 2015/02/06 15:48:37 UTC

ShellSpout hangs on reportError?

Hi,

For reference, I'm talking about 0.9.3 ShellSpout, line 234.

I'll try to cover the important facts that led to this issue:

-I was on 0.9.2 using multilang to bridge to PHP to get to some existing
business logic

-I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition to
the ShellBolt protocol)

-I have some odd topologies where I try to do some legacy background
processing.  This processing takes a highly variable amount time in the
Bolts, from milliseconds to minutes.  But, eventually due to randomness the
spout's "pending" pool fills up, causing the spout to block on nextTuple,
which eventually causes a heartbeat timeout. (I believe my only fix is to
increase the heartbeat timeout at the topology level. that's not the
purpose of this email, though confirmation of this as my only workaround
would be appreciated!  I feel like this wasn't anticipated when the
heartbeat patch was designed, as it was assumed the spout's nextTuple
wouldn't block I guess?)

-The purpose of this email is the fact that the topology "jams up" when the
ShellSpout has a heartbeat timeout.  I can see my PHP spout/bolt still
running (I added logging to them), but Storm itself is doing nothing.

-I added logging to ShellSpout and recompiled, because I saw the log
message on like 233 (Halting process: ShellSpout died) but as noted the PHP
process was still running, so I was curious if _process.destroy(); failed.
But, my logging didn't appear.  I assumed I was compiling/deploying wrong.
Eventually I commented out line 234: _collector.reportError(exception);
 and everything started working!!!

Does this make *any* sense?  Why would _collector.reportError(exception);
block and never return (I waited quite a long time, 10's of minutes).  When
I comment out line 234, Storm immediately kills my bad tasks and respawns
almost instantly.

I feel fairly confident that this will be recreatable.  My topology:
-1 spout (ShellSpout)
-1 bolt (ShellBolt)
-The ShellSpout has a heartbeat timeout due to slow tasks in ShellBolt +
the pending queue is full

Thanks for any feedback!

will

Re: ShellSpout hangs on reportError?

Posted by 임정택 <ka...@gmail.com>.
Hello.

I haven't seen this behavior.
But at a glimpse, die() method should ensure that subprocess will be
destroyed (with try-finally), because worker process is going to be killed
whether reportError() throws RuntimeException or not.

You can apply my suggestion to verify it works, and file a JIRA.

Hope this helps.

Regards.
Jungtaek Lim (HeartSaVioR)

2015-02-13 5:08 GMT+09:00 William Oberman <ob...@civicscience.com>:

> Sorry for the cross post to dev, but I think this thread has veered into
> actual dev questions.  I still don't know if there is something
> fundamentally wrong about my use case, or if this is a bug.  For a dev
> reading this for the first time, the main correction I'd make is to my
> email subject.  reportError isn't hanging, it's throwing a runtime
> exception (wrapping an interrupted exception).  As for what is throwing the
> interrupted exception, I think it's Zookeeper itself.
>
> Both ShellSpout and ShellBolt's die() has a
> "_collector.reportError(exception);" line.  I changed both to:
> ====
>         try {
>             _collector.reportError(exception);
>         } catch (RuntimeException e) {
>             if(e.getCause() instanceof InterruptedException) {
>                 //zookeeper.clj wraps zk InterruptedException with runtime
> exception
>             } else {
>                 throw e;
>             }
>         }
> ======
> and now everything starts to work as I expected.
>
> Does this patch make any sense?  Or is it a bandaid over a deeper issue?
>
> will
>
> On Thu, Feb 12, 2015 at 2:15 PM, William Oberman <oberman@civicscience.com
> >
> wrote:
>
> > Ok, I realized that I did NOT check if ShellSpout.die() was throwing a
> > RuntimeException.   I added a try/catch block, and it is!   The
> > RuntimeException is preventing _process.destroy and System.exit() from
> > happening, both of which need to happen to make topology recovery happen.
> >
> > But, I'm not sure *why* this exception is happening yet, since it's an
> > interrupted exception and I don't think the exception tells me *who*
> > interrupted my thread...
> >
> > 2015-02-12T14:12:35.581-0500 b.s.s.ShellSpout [ERROR] die exception!
> > java.lang.RuntimeException: java.lang.InterruptedException
> > at backtype.storm.util$wrap_in_runtime.invoke(util.clj:44)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:102)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at backtype.storm.zookeeper$exists_node_QMARK_.invoke(zookeeper.clj:98)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at backtype.storm.zookeeper$mkdirs.invoke(zookeeper.clj:114)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> backtype.storm.cluster$mk_distributed_cluster_state$reify__3533.mkdirs(cluster.clj:119)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> backtype.storm.cluster$mk_storm_cluster_state$reify__3990.report_error(cluster.clj:400)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> backtype.storm.daemon.executor$throttled_report_error_fn$fn__5565.invoke(executor.clj:180)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> backtype.storm.daemon.executor$fn__5717$fn$reify__5759.reportError(executor.clj:533)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> backtype.storm.spout.SpoutOutputCollector.reportError(SpoutOutputCollector.java:132)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at backtype.storm.spout.ShellSpout.die(ShellSpout.java:235)
> > [storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at backtype.storm.spout.ShellSpout.access$200(ShellSpout.java:42)
> > [storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
> > [storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> > [na:1.7.0_71]
> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> > [na:1.7.0_71]
> > at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> > [na:1.7.0_71]
> > at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> > [na:1.7.0_71]
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> > [na:1.7.0_71]
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > [na:1.7.0_71]
> > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> > Caused by: java.lang.InterruptedException: null
> > at java.lang.Object.wait(Native Method) ~[na:1.7.0_71]
> > at java.lang.Object.wait(Object.java:503) ~[na:1.7.0_71]
> > at
> > org.apache.storm.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at org.apache.storm.zookeeper.ZooKeeper.exists(ZooKeeper.java:1040)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:172)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:161)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at org.apache.storm.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl.pathInForeground(ExistsBuilderImpl.java:157)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:148)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:36)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:101)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > ... 17 common frames omitted
> >
> >
> > On Wed, Feb 11, 2015 at 4:27 PM, William Oberman <
> oberman@civicscience.com
> > > wrote:
> >
> >> I'm not sure what I've learned adds up to yet....
> >>
> >> I tried setting up a local storm development environment.  By mistake, I
> >> forgot to switch to 0.9.3-branch (e.g. I was working on master at
> first).
> >> In master, I was seeing the ShellBolt heartbeat fail first (which in
> >> retrospect makes sense!).  Then I remembered I was trying to debug
> 0.9.3,
> >> so I switched to 0.9.3-branch.  In the branch, I see ShellSpout
> heartbeat
> >> fail first and jam up, so that's good (?).  In both cases, I was trying
> >> local mode rather than cluster mode, as I figured I had a better shot at
> >> debugging in local mode.
> >>
> >> At this point, I figured I had a good test case.  This was all using
> >> command line tools (git, mvn) so far.   Two thoughts at this point:
> 1.) I
> >> don't plan on running master, and it's not 100% clear to me if ShellBolt
> >> failing first will solve any problems...  2.) in 0.9.3-branch, after
> >> ShellSpout fails and everything jams up if I "ctl-c" the process I
> >> immediately see the ShellBolt heartbeat timeout message.  It's like it
> was
> >> waiting to write, but was blocked on something (hrmmmm....?)
> >>
> >> But, I hit the limits of my ability to add "System.out" debugging, so I
> >> tried setting up storm in IntelliJ.  That took a bit, but I finally
> figured
> >> out how to run a topology in local mode with a debugger.  Once again, I
> was
> >> seeing ShellSpout heartbeat fail and then nothing happen.
> >>
> >> The next problem is that I don't know how to setup IntelliJ to
> understand
> >> clojure compiled code (at least, I think that's my problem....) so the
> >> "Step Into/Out of" information is really weird in the debugger.  The
> >> best/most complete stack trace I have is:
> >> invoke():102, zookeeper$exists_node_QMARK_$fn__3279 (backtype.storm)
> >> invoke():98, zookeeper$exists_node_QMARK_ (backtype.storm)
> >> invoke():114, zookeeper$mkdirs (backtype.storm)
> >> mkdirs():119, cluster$mk_distributed_cluster_state$reify__3526
> >> (backtype.storm)
> >> report_error():397, cluster$mk_storm_cluster_state$reify__3983
> >> (backtype.storm)
> >> invoke():180, executor$throttled_report_error_fn$fn__5548
> >> (backtype.storm.daemon)
> >> reportError():533, executor$fn__5700$fn$reify__5742
> >> (backtype.storm.daemon)
> >> reportError():132, SpoutOutputCollector (backtype.storm.spout)
> >>
> >> I had a better stack trace (that I lost) that lead into:
> >> org.apache.storm.curator.RetryLoop.callWithRetry()
> >> which for me is my "prime suspect" (based on name alone) for something
> >> that is blocking things up....  :-)
> >>
> >> Though, once again, not understanding the big picture of storm, I have
> no
> >> idea what all of the above adds up to in terms of what's wrong, and how
> to
> >> fix it still....
> >>
> >> will
> >>
> >> On Wed, Feb 11, 2015 at 1:31 PM, William Oberman <
> >> oberman@civicscience.com> wrote:
> >>
> >>> I'm glad to hear I'm not the only one!
> >>>
> >>> (no new news yet)
> >>>
> >>>
> >>> On Wed, Feb 11, 2015 at 3:09 AM, Alex Sobrino <al...@v5tech.es> wrote:
> >>>
> >>>> Hi William,
> >>>>
> >>>> I'm having the same problem running a multilang topology (written in
> >>>> python). If you find a solution, please post it here, it will sure
> help us.
> >>>>
> >>>> To upgrade from 0.9.2-incubating we updated storm.py (
> >>>>
> https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py
> )
> >>>> and pom.xml.
> >>>>
> >>>> Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it
> >>>> works like hell.
> >>>>
> >>>> Best regards,
> >>>>
> >>>> On Tue, Feb 10, 2015 at 8:59 PM, William Oberman <
> >>>> oberman@civicscience.com> wrote:
> >>>>
> >>>>> I'm not sure the best way to share a test case.  I'll copy and paste
> >>>>> code below....  If you run the below code (and find the worker that
> was
> >>>>> running it's log file), you should see in ~30 seconds:
> >>>>> ====
> >>>>> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting
> process:
> >>>>> ShellSpout died.
> >>>>> java.lang.RuntimeException: subprocess heartbeat timeout
> >>>>>         at
> >>>>>
> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
> >>>>> [storm-core-0.9.3.jar:0.9.3]
> >>>>>         at
> >>>>>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >>>>> [na:1.7.0_71]
> >>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> >>>>> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR]
> >>>>> java.lang.RuntimeException: subprocess heartbeat timeout
> >>>>>         at
> >>>>>
> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
> >>>>> [storm-core-0.9.3.jar:0.9.3]
> >>>>>         at
> >>>>>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >>>>> [na:1.7.0_71]
> >>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> >>>>> =======
> >>>>>
> >>>>> But, the topology will run in a kind of weird zombie state forever.
> >>>>> More specifically I see the multilang bolt process all tuples in the
> >>>>> pending queue, and then an infinite loop of nextTuple()/fail() from
> the
> >>>>> multilang spout.  But, as noted in my original email, if I comment
> out:
> >>>>>  _collector.reportError(exception);
> >>>>> in the Java ShellSpout then the worker will immediately die and
> >>>>> respawn.
> >>>>>
> >>>>> If no one can help, the next step for me is rough, as I'll have to
> >>>>> learn how to actually develop and debug storm itself, which is
> usually at
> >>>>> least 10x harder than just using something :-)
> >>>>>
> >>>>> In any case, my test code:
> >>>>>
> >>>>> Topology = 1 process with two tasks (multilang spout and bolt), and
> >>>>> small pool of pending messages (yes, using the word count example in
> >>>>> storm-starter as a starting point....)
> >>>>> =============
> >>>>> public class SlowTopology {
> >>>>>   public static class SlowPhpBolt extends ShellBolt implements
> >>>>> IRichBolt {
> >>>>>
> >>>>>     public SlowPhpBolt() {
> >>>>>       super("php", "slowBolt.php");
> >>>>>     }
> >>>>>
> >>>>>     @Override
> >>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >>>>>       declarer.declare(new Fields());
> >>>>>     }
> >>>>>
> >>>>>     @Override
> >>>>>     public Map<String, Object> getComponentConfiguration() {
> >>>>>       return null;
> >>>>>     }
> >>>>>   }
> >>>>>
> >>>>>   public static class SlowPhpSpout extends ShellSpout implements
> >>>>> IRichSpout {
> >>>>>
> >>>>>       public SlowPhpSpout() {
> >>>>>           super("php", "slowSpout.php");
> >>>>>       }
> >>>>>
> >>>>>     @Override
> >>>>>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
> >>>>>         ofd.declare(new Fields("output"));
> >>>>>     }
> >>>>>
> >>>>>     @Override
> >>>>>     public Map<String, Object> getComponentConfiguration() {
> >>>>>         return null;
> >>>>>     }
> >>>>>   }
> >>>>>
> >>>>>   public static void main(String[] args) throws Exception {
> >>>>>
> >>>>>     TopologyBuilder builder = new TopologyBuilder();
> >>>>>
> >>>>>     builder.setSpout("spout", new SlowPhpSpout(),
> >>>>> 1).setNumTasks(1).setMaxSpoutPending(3);
> >>>>>     builder.setBolt("bolt", new SlowPhpBolt(),
> >>>>> 1).setNumTasks(1).shuffleGrouping("spout");
> >>>>>
> >>>>>     Config conf = new Config();
> >>>>>     conf.setDebug(true);
> >>>>>
> >>>>>     if (args != null && args.length > 0) {
> >>>>>       conf.setNumWorkers(1);
> >>>>>       StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
> >>>>> builder.createTopology());
> >>>>>     }
> >>>>>     else {
> >>>>>       LocalCluster cluster = new LocalCluster();
> >>>>>       cluster.submitTopology("slow", conf, builder.createTopology());
> >>>>>       Thread.sleep(10000);
> >>>>>       cluster.shutdown();
> >>>>>     }
> >>>>>   }
> >>>>> }
> >>>>> ===========
> >>>>>
> >>>>> slowSpout.php
> >>>>> ==========
> >>>>> <?php
> >>>>> require_once "storm.php";
> >>>>> class slowSpout extends \ShellSpout {
> >>>>>   protected function nextTuple() {
> >>>>>     $value = rand(0,100);
> >>>>>     $id = rand(0, 100);
> >>>>>     $this->emit(array($value), $id);
> >>>>>     file_put_contents("/tmp/storm_slow.log",
> >>>>> "nextTuple()->value[$value] id[$id]\n", FILE_APPEND);
> >>>>>     sleep(1);
> >>>>>   }
> >>>>>   protected function ack($id) {
> >>>>>     file_put_contents("/tmp/storm_slow.log", "ack($id)\n",
> >>>>> FILE_APPEND);
> >>>>>   }
> >>>>>
> >>>>>   protected function fail($id) {
> >>>>>     file_put_contents("/tmp/storm_slow.log", "fail($id)\n",
> >>>>> FILE_APPEND);
> >>>>>   }
> >>>>> }
> >>>>>
> >>>>> (new slowSpout())->run();
> >>>>> ===========
> >>>>>
> >>>>> slowBolt.php
> >>>>> ============
> >>>>> <?php
> >>>>> require_once "storm.php";
> >>>>> class slowBolt extends \BasicBolt {
> >>>>>   protected function process(\Tuple $t) {
> >>>>>     $sleep = rand(1, 180);
> >>>>>     file_put_contents("/tmp/storm_slow.log", "process(".print_r($t,
> >>>>> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND);
> >>>>>     sleep($sleep);
> >>>>>   }
> >>>>> }
> >>>>> (new slowBolt())->run();
> >>>>> ============
> >>>>>
> >>>>> storm.php  (from  https://github.com/lazyshot/storm-php, and I think
> >>>>> I added more error checking on reads/writes to standard in/out, added
> >>>>> sync() to the ShellSpout to make new classes easier to write, and
> the new
> >>>>> heartbeat protocol)
> >>>>> =========
> >>>>> <?php
> >>>>> interface iShellBolt {
> >>>>> }
> >>>>>
> >>>>> interface iShellSpout {
> >>>>> }
> >>>>>
> >>>>> class Tuple {
> >>>>>     public $id, $component, $stream, $task, $values;
> >>>>>
> >>>>>     public function __construct($id, $component, $stream, $task,
> >>>>> $values) {
> >>>>>         $this->id = $id;
> >>>>>         $this->component = $component;
> >>>>>         $this->stream = $stream;
> >>>>>         $this->task = $task;
> >>>>>         $this->values = $values;
> >>>>>     }
> >>>>> }
> >>>>>
> >>>>> abstract class ShellComponent {
> >>>>>     protected $pid;
> >>>>>     protected $stormConf;
> >>>>>     protected $topologyContext;
> >>>>>
> >>>>>     protected $stormInc = null;
> >>>>>
> >>>>>     public function __construct() {
> >>>>>         $this->pid = getmypid();
> >>>>>         $this->sendCommand(array("pid" => $this->pid));
> >>>>>
> >>>>>         $handshake = $this->parseMessage($this->waitForMessage());
> >>>>>
> >>>>>         $this->stormConf = $handshake['conf'];
> >>>>>         $this->topologyContext = $handshake['context'];
> >>>>>         $pidDir = $handshake['pidDir'];
> >>>>>
> >>>>>         @fclose(@fopen($pidDir . "/" . $this->pid, "w"));
> >>>>>     }
> >>>>>
> >>>>>     protected function readLine() {
> >>>>>         $raw = fgets(STDIN);
> >>>>>
> >>>>>         if ($raw === false) {
> >>>>>             throw new Exception("STDIN is broken");
> >>>>>         }
> >>>>>
> >>>>>         $line = trim($raw);
> >>>>>
> >>>>>         return $line;
> >>>>>     }
> >>>>>
> >>>>>     protected function waitForMessage() {
> >>>>>         $message = '';
> >>>>>         while (true) {
> >>>>>             $line = trim($this->readLine());
> >>>>>
> >>>>>             if (strlen($line) == 0) {
> >>>>>                 continue;
> >>>>>             } else if ($line == 'end') {
> >>>>>                 break;
> >>>>>             } else if ($line == 'sync') {
> >>>>>                 $message = '';
> >>>>>                 continue;
> >>>>>             }
> >>>>>
> >>>>>             $message .= $line . "\n";
> >>>>>         }
> >>>>>
> >>>>>         return trim($message);
> >>>>>     }
> >>>>>
> >>>>>     protected function sendCommand(array $command) {
> >>>>>         $this->sendMessage(json_encode($command));
> >>>>>     }
> >>>>>
> >>>>>     protected function sendLog($message) {
> >>>>>         return $this->sendCommand(array(
> >>>>>             'command' => 'log',
> >>>>>             'msg' => $message
> >>>>>         ));
> >>>>>     }
> >>>>>
> >>>>>     protected function parseMessage($message) {
> >>>>>         $msg = json_decode($message, true);
> >>>>>
> >>>>>         if ($msg) {
> >>>>>             return $msg;
> >>>>>         } else {
> >>>>>             return $message;
> >>>>>         }
> >>>>>     }
> >>>>>
> >>>>>     protected function sendMessage($message) {
> >>>>>         $message = "$message\nend\n";
> >>>>>         $bytesWritten = fwrite(STDOUT, $message);
> >>>>>         fflush(STDOUT);
> >>>>>         if ($bytesWritten === false) {
> >>>>>             throw new Exception("STDOUT is broken");
> >>>>>         }
> >>>>>         if ($bytesWritten != strlen($message)) {
> >>>>>             throw new Exception("Unable to write all bytes to STDOUT
> >>>>> (message=$message, bytesWritten=$bytesWritten)");
> >>>>>         }
> >>>>>     }
> >>>>>
> >>>>>     final protected function sync() {
> >>>>>         $command = array(
> >>>>>             'command' => 'sync',
> >>>>>         );
> >>>>>
> >>>>>         $this->sendCommand($command);
> >>>>>     }
> >>>>>
> >>>>> }
> >>>>>
> >>>>> abstract class ShellBolt extends ShellComponent implements
> iShellBolt {
> >>>>>
> >>>>>     public $anchor_tuple = null;
> >>>>>
> >>>>>     public function __construct() {
> >>>>>         parent::__construct();
> >>>>>
> >>>>>         $this->init($this->stormConf, $this->topologyContext);
> >>>>>     }
> >>>>>
> >>>>>     public function run() {
> >>>>>         try {
> >>>>>             while (true) {
> >>>>>                 $command =
> >>>>> $this->parseMessage($this->waitForMessage());
> >>>>>
> >>>>>                 if (is_array($command)) {
> >>>>>                     if (isset($command['tuple'])) {
> >>>>>                         $tupleMap = array_merge(array(
> >>>>>                                 'id' => null,
> >>>>>                                 'comp' => null,
> >>>>>                                 'stream' => null,
> >>>>>                                 'task' => null,
> >>>>>                                 'tuple' => null
> >>>>>                             ),
> >>>>>
> >>>>>                             $command);
> >>>>>
> >>>>>                         if($tupleMap['task'] == -1 &&
> >>>>> $tupleMap['stream'] == "__heartbeat") {
> >>>>>                             $this->sync();
> >>>>>                         } else {
> >>>>>                             $tuple = new Tuple($tupleMap['id'],
> >>>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
> >>>>> $tupleMap['tuple']);
> >>>>>                             $this->process($tuple);
> >>>>>                         }
> >>>>>                     }
> >>>>>                 }
> >>>>>             }
> >>>>>         } catch (Exception $e) {
> >>>>>             $this->sendLog((string)$e);
> >>>>>         }
> >>>>>     }
> >>>>>
> >>>>>     abstract protected function process(Tuple $tuple);
> >>>>>
> >>>>>     protected function init($conf, $topology) {
> >>>>>         return;
> >>>>>     }
> >>>>>
> >>>>>     protected function emitTuple(array $tuple, $stream = null,
> >>>>> $anchors = array(), $directTask = null) {
> >>>>>         if ($this->anchor_tuple !== null) {
> >>>>>             $anchors = array($this->anchor_tuple);
> >>>>>         }
> >>>>>
> >>>>>         $command = array(
> >>>>>             'command' => 'emit'
> >>>>>         );
> >>>>>
> >>>>>         if ($stream !== null) {
> >>>>>             $command['stream'] = $stream;
> >>>>>         }
> >>>>>
> >>>>>         $command['anchors'] = array_map(function ($a) {
> >>>>>             return $a->id;
> >>>>>         }, $anchors);
> >>>>>
> >>>>>         if ($directTask !== null) {
> >>>>>             $command['task'] = $directTask;
> >>>>>         }
> >>>>>
> >>>>>         $command['tuple'] = $tuple;
> >>>>>
> >>>>>         $this->sendCommand($command);
> >>>>>     }
> >>>>>
> >>>>>     protected function emit($tuple, $stream = null, $anchors =
> >>>>> array()) {
> >>>>>         $this->emitTuple($tuple, $stream, $anchors);
> >>>>>     }
> >>>>>
> >>>>>     protected function emitDirect($directTask, $tuple, $stream =
> null,
> >>>>> $anchors = array()) {
> >>>>>         $this->emitTuple($tuple, $stream, $anchors, $directTask);
> >>>>>     }
> >>>>>
> >>>>>     protected function ack(Tuple $tuple) {
> >>>>>         $command = array(
> >>>>>             'command' => 'ack',
> >>>>>             'id' => $tuple->id
> >>>>>         );
> >>>>>
> >>>>>         $this->sendCommand($command);
> >>>>>     }
> >>>>>
> >>>>>     protected function fail(Tuple $tuple) {
> >>>>>         $command = array(
> >>>>>             'command' => 'fail',
> >>>>>             'id' => $tuple->id
> >>>>>         );
> >>>>>
> >>>>>         $this->sendCommand($command);
> >>>>>     }
> >>>>> }
> >>>>>
> >>>>> abstract class BasicBolt extends ShellBolt {
> >>>>>     public function run() {
> >>>>>         try {
> >>>>>             while (true) {
> >>>>>                 $command =
> >>>>> $this->parseMessage($this->waitForMessage());
> >>>>>
> >>>>>                 if (is_array($command)) {
> >>>>>                     if (isset($command['tuple'])) {
> >>>>>                         $tupleMap = array_merge(array(
> >>>>>                                 'id' => null,
> >>>>>                                 'comp' => null,
> >>>>>                                 'stream' => null,
> >>>>>                                 'task' => null,
> >>>>>                                 'tuple' => null
> >>>>>                             ),
> >>>>>
> >>>>>                             $command);
> >>>>>
> >>>>>                         if($tupleMap['task'] == -1 &&
> >>>>> $tupleMap['stream'] == "__heartbeat") {
> >>>>>                             $this->sync();
> >>>>>                         } else {
> >>>>>                             $tuple = new Tuple($tupleMap['id'],
> >>>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
> >>>>> $tupleMap['tuple']);
> >>>>>
> >>>>>                             $this->anchor_tuple = $tuple;
> >>>>>
> >>>>>                             try {
> >>>>>                                 $processed = $this->process($tuple);
> >>>>>
> >>>>>                                 $this->ack($tuple);
> >>>>>                             } catch (BoltProcessException $e) {
> >>>>>                                 $this->fail($tuple);
> >>>>>                             }
> >>>>>                         }
> >>>>>                     }
> >>>>>                 }
> >>>>>             }
> >>>>>         } catch (Exception $e) {
> >>>>>             $this->sendLog((string)$e);
> >>>>>         }
> >>>>>
> >>>>>     }
> >>>>> }
> >>>>>
> >>>>> abstract class ShellSpout extends ShellComponent implements
> >>>>> iShellSpout {
> >>>>>     protected $tuples = array();
> >>>>>
> >>>>>     public function __construct() {
> >>>>>         parent::__construct();
> >>>>>
> >>>>>         $this->init($this->stormConf, $this->topologyContext);
> >>>>>     }
> >>>>>
> >>>>>
> >>>>>     abstract protected function nextTuple();
> >>>>>
> >>>>>     abstract protected function ack($tuple_id);
> >>>>>
> >>>>>     abstract protected function fail($tuple_id);
> >>>>>
> >>>>>     public function run() {
> >>>>>         try {
> >>>>>             while (true) {
> >>>>>                 $command =
> >>>>> $this->parseMessage($this->waitForMessage());
> >>>>>
> >>>>>                 if (is_array($command)) {
> >>>>>                     if (isset($command['command'])) {
> >>>>>                         if ($command['command'] == 'ack') {
> >>>>>                             $this->ack($command['id']);
> >>>>>                             $this->sync();
> >>>>>                         } else if ($command['command'] == 'fail') {
> >>>>>                             $this->fail($command['id']);
> >>>>>                             $this->sync();
> >>>>>                         } else if ($command['command'] == 'next') {
> >>>>>                             $this->nextTuple();
> >>>>>                             $this->sync();
> >>>>>                         }
> >>>>>                     }
> >>>>>                 }
> >>>>>             }
> >>>>>         } catch (Exception $e) {
> >>>>>             $this->sendLog((string)$e);
> >>>>>             $this->sync();
> >>>>>         }
> >>>>>     }
> >>>>>
> >>>>>     protected function init($stormConf, $topologyContext) {
> >>>>>         return;
> >>>>>     }
> >>>>>
> >>>>>     final protected function emit(array $tuple, $messageId = null,
> >>>>> $streamId = null) {
> >>>>>         return $this->emitTuple($tuple, $messageId, $streamId, null);
> >>>>>     }
> >>>>>
> >>>>>     final protected function emitDirect($directTask, array $tuple,
> >>>>> $messageId = null, $streamId = null) {
> >>>>>         return $this->emitTuple($tuple, $messageId, $streamId,
> >>>>> $directTask);
> >>>>>     }
> >>>>>
> >>>>>     final private function emitTuple(array $tuple, $messageId = null,
> >>>>> $streamId = null, $directTask = null) {
> >>>>>         $command = array(
> >>>>>             'command' => 'emit'
> >>>>>         );
> >>>>>
> >>>>>         if ($messageId !== null) {
> >>>>>             $command['id'] = $messageId;
> >>>>>         }
> >>>>>
> >>>>>         if ($streamId !== null) {
> >>>>>             $command['stream'] = $streamId;
> >>>>>         }
> >>>>>
> >>>>>         if ($directTask !== null) {
> >>>>>             $command['task'] = $directTask;
> >>>>>         }
> >>>>>
> >>>>>         $command['tuple'] = $tuple;
> >>>>>
> >>>>>         return $this->sendCommand($command);
> >>>>>     }
> >>>>> }
> >>>>>
> >>>>> class BoltProcessException extends Exception {
> >>>>> }
> >>>>>
> >>>>> =========================
> >>>>>
> >>>>>
> >>>>> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman <
> >>>>> oberman@civicscience.com> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> For reference, I'm talking about 0.9.3 ShellSpout, line 234.
> >>>>>>
> >>>>>> I'll try to cover the important facts that led to this issue:
> >>>>>>
> >>>>>> -I was on 0.9.2 using multilang to bridge to PHP to get to some
> >>>>>> existing business logic
> >>>>>>
> >>>>>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat
> addition
> >>>>>> to the ShellBolt protocol)
> >>>>>>
> >>>>>> -I have some odd topologies where I try to do some legacy background
> >>>>>> processing.  This processing takes a highly variable amount time in
> the
> >>>>>> Bolts, from milliseconds to minutes.  But, eventually due to
> randomness the
> >>>>>> spout's "pending" pool fills up, causing the spout to block on
> nextTuple,
> >>>>>> which eventually causes a heartbeat timeout. (I believe my only fix
> is to
> >>>>>> increase the heartbeat timeout at the topology level. that's not the
> >>>>>> purpose of this email, though confirmation of this as my only
> workaround
> >>>>>> would be appreciated!  I feel like this wasn't anticipated when the
> >>>>>> heartbeat patch was designed, as it was assumed the spout's
> nextTuple
> >>>>>> wouldn't block I guess?)
> >>>>>>
> >>>>>> -The purpose of this email is the fact that the topology "jams up"
> >>>>>> when the ShellSpout has a heartbeat timeout.  I can see my PHP
> spout/bolt
> >>>>>> still running (I added logging to them), but Storm itself is doing
> nothing.
> >>>>>>
> >>>>>> -I added logging to ShellSpout and recompiled, because I saw the log
> >>>>>> message on like 233 (Halting process: ShellSpout died) but as noted
> the PHP
> >>>>>> process was still running, so I was curious if _process.destroy();
> failed.
> >>>>>> But, my logging didn't appear.  I assumed I was compiling/deploying
> wrong.
> >>>>>> Eventually I commented out line 234:
> _collector.reportError(exception);
> >>>>>>  and everything started working!!!
> >>>>>>
> >>>>>> Does this make *any* sense?  Why would
> >>>>>> _collector.reportError(exception); block and never return (I waited
> quite a
> >>>>>> long time, 10's of minutes).  When I comment out line 234, Storm
> >>>>>> immediately kills my bad tasks and respawns almost instantly.
> >>>>>>
> >>>>>> I feel fairly confident that this will be recreatable.  My topology:
> >>>>>> -1 spout (ShellSpout)
> >>>>>> -1 bolt (ShellBolt)
> >>>>>> -The ShellSpout has a heartbeat timeout due to slow tasks in
> >>>>>> ShellBolt + the pending queue is full
> >>>>>>
> >>>>>> Thanks for any feedback!
> >>>>>>
> >>>>>> will
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> Alex Sobrino Beltrán
> >>>> Registered Linux User #273657
> >>>>
> >>>> http://v5tech.es
> >>>>
> >>>
> >>>
> >>>
> >>>
> >>
> >>
> >
> >
> >
>



-- 
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Re: ShellSpout hangs on reportError?

Posted by William Oberman <ob...@civicscience.com>.
Sorry for the cross post to dev, but I think this thread has veered into
actual dev questions.  I still don't know if there is something
fundamentally wrong about my use case, or if this is a bug.  For a dev
reading this for the first time, the main correction I'd make is to my
email subject.  reportError isn't hanging, it's throwing a runtime
exception (wrapping an interrupted exception).  As for what is throwing the
interrupted exception, I think it's Zookeeper itself.

Both ShellSpout and ShellBolt's die() has a
"_collector.reportError(exception);" line.  I changed both to:
====
        try {
            _collector.reportError(exception);
        } catch (RuntimeException e) {
            if(e.getCause() instanceof InterruptedException) {
                //zookeeper.clj wraps zk InterruptedException with runtime
exception
            } else {
                throw e;
            }
        }
======
and now everything starts to work as I expected.

Does this patch make any sense?  Or is it a bandaid over a deeper issue?

will

On Thu, Feb 12, 2015 at 2:15 PM, William Oberman <ob...@civicscience.com>
wrote:

> Ok, I realized that I did NOT check if ShellSpout.die() was throwing a
> RuntimeException.   I added a try/catch block, and it is!   The
> RuntimeException is preventing _process.destroy and System.exit() from
> happening, both of which need to happen to make topology recovery happen.
>
> But, I'm not sure *why* this exception is happening yet, since it's an
> interrupted exception and I don't think the exception tells me *who*
> interrupted my thread...
>
> 2015-02-12T14:12:35.581-0500 b.s.s.ShellSpout [ERROR] die exception!
> java.lang.RuntimeException: java.lang.InterruptedException
> at backtype.storm.util$wrap_in_runtime.invoke(util.clj:44)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:102)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at backtype.storm.zookeeper$exists_node_QMARK_.invoke(zookeeper.clj:98)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at backtype.storm.zookeeper$mkdirs.invoke(zookeeper.clj:114)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.cluster$mk_distributed_cluster_state$reify__3533.mkdirs(cluster.clj:119)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.cluster$mk_storm_cluster_state$reify__3990.report_error(cluster.clj:400)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.daemon.executor$throttled_report_error_fn$fn__5565.invoke(executor.clj:180)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.daemon.executor$fn__5717$fn$reify__5759.reportError(executor.clj:533)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.spout.SpoutOutputCollector.reportError(SpoutOutputCollector.java:132)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at backtype.storm.spout.ShellSpout.die(ShellSpout.java:235)
> [storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at backtype.storm.spout.ShellSpout.access$200(ShellSpout.java:42)
> [storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
> [storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> [na:1.7.0_71]
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> [na:1.7.0_71]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> [na:1.7.0_71]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [na:1.7.0_71]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> [na:1.7.0_71]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> [na:1.7.0_71]
> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> Caused by: java.lang.InterruptedException: null
> at java.lang.Object.wait(Native Method) ~[na:1.7.0_71]
> at java.lang.Object.wait(Object.java:503) ~[na:1.7.0_71]
> at
> org.apache.storm.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at org.apache.storm.zookeeper.ZooKeeper.exists(ZooKeeper.java:1040)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:172)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:161)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at org.apache.storm.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl.pathInForeground(ExistsBuilderImpl.java:157)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:148)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:36)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:101)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> ... 17 common frames omitted
>
>
> On Wed, Feb 11, 2015 at 4:27 PM, William Oberman <oberman@civicscience.com
> > wrote:
>
>> I'm not sure what I've learned adds up to yet....
>>
>> I tried setting up a local storm development environment.  By mistake, I
>> forgot to switch to 0.9.3-branch (e.g. I was working on master at first).
>> In master, I was seeing the ShellBolt heartbeat fail first (which in
>> retrospect makes sense!).  Then I remembered I was trying to debug 0.9.3,
>> so I switched to 0.9.3-branch.  In the branch, I see ShellSpout heartbeat
>> fail first and jam up, so that's good (?).  In both cases, I was trying
>> local mode rather than cluster mode, as I figured I had a better shot at
>> debugging in local mode.
>>
>> At this point, I figured I had a good test case.  This was all using
>> command line tools (git, mvn) so far.   Two thoughts at this point:  1.) I
>> don't plan on running master, and it's not 100% clear to me if ShellBolt
>> failing first will solve any problems...  2.) in 0.9.3-branch, after
>> ShellSpout fails and everything jams up if I "ctl-c" the process I
>> immediately see the ShellBolt heartbeat timeout message.  It's like it was
>> waiting to write, but was blocked on something (hrmmmm....?)
>>
>> But, I hit the limits of my ability to add "System.out" debugging, so I
>> tried setting up storm in IntelliJ.  That took a bit, but I finally figured
>> out how to run a topology in local mode with a debugger.  Once again, I was
>> seeing ShellSpout heartbeat fail and then nothing happen.
>>
>> The next problem is that I don't know how to setup IntelliJ to understand
>> clojure compiled code (at least, I think that's my problem....) so the
>> "Step Into/Out of" information is really weird in the debugger.  The
>> best/most complete stack trace I have is:
>> invoke():102, zookeeper$exists_node_QMARK_$fn__3279 (backtype.storm)
>> invoke():98, zookeeper$exists_node_QMARK_ (backtype.storm)
>> invoke():114, zookeeper$mkdirs (backtype.storm)
>> mkdirs():119, cluster$mk_distributed_cluster_state$reify__3526
>> (backtype.storm)
>> report_error():397, cluster$mk_storm_cluster_state$reify__3983
>> (backtype.storm)
>> invoke():180, executor$throttled_report_error_fn$fn__5548
>> (backtype.storm.daemon)
>> reportError():533, executor$fn__5700$fn$reify__5742
>> (backtype.storm.daemon)
>> reportError():132, SpoutOutputCollector (backtype.storm.spout)
>>
>> I had a better stack trace (that I lost) that lead into:
>> org.apache.storm.curator.RetryLoop.callWithRetry()
>> which for me is my "prime suspect" (based on name alone) for something
>> that is blocking things up....  :-)
>>
>> Though, once again, not understanding the big picture of storm, I have no
>> idea what all of the above adds up to in terms of what's wrong, and how to
>> fix it still....
>>
>> will
>>
>> On Wed, Feb 11, 2015 at 1:31 PM, William Oberman <
>> oberman@civicscience.com> wrote:
>>
>>> I'm glad to hear I'm not the only one!
>>>
>>> (no new news yet)
>>>
>>>
>>> On Wed, Feb 11, 2015 at 3:09 AM, Alex Sobrino <al...@v5tech.es> wrote:
>>>
>>>> Hi William,
>>>>
>>>> I'm having the same problem running a multilang topology (written in
>>>> python). If you find a solution, please post it here, it will sure help us.
>>>>
>>>> To upgrade from 0.9.2-incubating we updated storm.py (
>>>> https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py)
>>>> and pom.xml.
>>>>
>>>> Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it
>>>> works like hell.
>>>>
>>>> Best regards,
>>>>
>>>> On Tue, Feb 10, 2015 at 8:59 PM, William Oberman <
>>>> oberman@civicscience.com> wrote:
>>>>
>>>>> I'm not sure the best way to share a test case.  I'll copy and paste
>>>>> code below....  If you run the below code (and find the worker that was
>>>>> running it's log file), you should see in ~30 seconds:
>>>>> ====
>>>>> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process:
>>>>> ShellSpout died.
>>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>>         at
>>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> [na:1.7.0_71]
>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>>>>> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR]
>>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>>         at
>>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> [na:1.7.0_71]
>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>>>>> =======
>>>>>
>>>>> But, the topology will run in a kind of weird zombie state forever.
>>>>> More specifically I see the multilang bolt process all tuples in the
>>>>> pending queue, and then an infinite loop of nextTuple()/fail() from the
>>>>> multilang spout.  But, as noted in my original email, if I comment out:
>>>>>  _collector.reportError(exception);
>>>>> in the Java ShellSpout then the worker will immediately die and
>>>>> respawn.
>>>>>
>>>>> If no one can help, the next step for me is rough, as I'll have to
>>>>> learn how to actually develop and debug storm itself, which is usually at
>>>>> least 10x harder than just using something :-)
>>>>>
>>>>> In any case, my test code:
>>>>>
>>>>> Topology = 1 process with two tasks (multilang spout and bolt), and
>>>>> small pool of pending messages (yes, using the word count example in
>>>>> storm-starter as a starting point....)
>>>>> =============
>>>>> public class SlowTopology {
>>>>>   public static class SlowPhpBolt extends ShellBolt implements
>>>>> IRichBolt {
>>>>>
>>>>>     public SlowPhpBolt() {
>>>>>       super("php", "slowBolt.php");
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>       declarer.declare(new Fields());
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public Map<String, Object> getComponentConfiguration() {
>>>>>       return null;
>>>>>     }
>>>>>   }
>>>>>
>>>>>   public static class SlowPhpSpout extends ShellSpout implements
>>>>> IRichSpout {
>>>>>
>>>>>       public SlowPhpSpout() {
>>>>>           super("php", "slowSpout.php");
>>>>>       }
>>>>>
>>>>>     @Override
>>>>>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>>>         ofd.declare(new Fields("output"));
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public Map<String, Object> getComponentConfiguration() {
>>>>>         return null;
>>>>>     }
>>>>>   }
>>>>>
>>>>>   public static void main(String[] args) throws Exception {
>>>>>
>>>>>     TopologyBuilder builder = new TopologyBuilder();
>>>>>
>>>>>     builder.setSpout("spout", new SlowPhpSpout(),
>>>>> 1).setNumTasks(1).setMaxSpoutPending(3);
>>>>>     builder.setBolt("bolt", new SlowPhpBolt(),
>>>>> 1).setNumTasks(1).shuffleGrouping("spout");
>>>>>
>>>>>     Config conf = new Config();
>>>>>     conf.setDebug(true);
>>>>>
>>>>>     if (args != null && args.length > 0) {
>>>>>       conf.setNumWorkers(1);
>>>>>       StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
>>>>> builder.createTopology());
>>>>>     }
>>>>>     else {
>>>>>       LocalCluster cluster = new LocalCluster();
>>>>>       cluster.submitTopology("slow", conf, builder.createTopology());
>>>>>       Thread.sleep(10000);
>>>>>       cluster.shutdown();
>>>>>     }
>>>>>   }
>>>>> }
>>>>> ===========
>>>>>
>>>>> slowSpout.php
>>>>> ==========
>>>>> <?php
>>>>> require_once "storm.php";
>>>>> class slowSpout extends \ShellSpout {
>>>>>   protected function nextTuple() {
>>>>>     $value = rand(0,100);
>>>>>     $id = rand(0, 100);
>>>>>     $this->emit(array($value), $id);
>>>>>     file_put_contents("/tmp/storm_slow.log",
>>>>> "nextTuple()->value[$value] id[$id]\n", FILE_APPEND);
>>>>>     sleep(1);
>>>>>   }
>>>>>   protected function ack($id) {
>>>>>     file_put_contents("/tmp/storm_slow.log", "ack($id)\n",
>>>>> FILE_APPEND);
>>>>>   }
>>>>>
>>>>>   protected function fail($id) {
>>>>>     file_put_contents("/tmp/storm_slow.log", "fail($id)\n",
>>>>> FILE_APPEND);
>>>>>   }
>>>>> }
>>>>>
>>>>> (new slowSpout())->run();
>>>>> ===========
>>>>>
>>>>> slowBolt.php
>>>>> ============
>>>>> <?php
>>>>> require_once "storm.php";
>>>>> class slowBolt extends \BasicBolt {
>>>>>   protected function process(\Tuple $t) {
>>>>>     $sleep = rand(1, 180);
>>>>>     file_put_contents("/tmp/storm_slow.log", "process(".print_r($t,
>>>>> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND);
>>>>>     sleep($sleep);
>>>>>   }
>>>>> }
>>>>> (new slowBolt())->run();
>>>>> ============
>>>>>
>>>>> storm.php  (from  https://github.com/lazyshot/storm-php, and I think
>>>>> I added more error checking on reads/writes to standard in/out, added
>>>>> sync() to the ShellSpout to make new classes easier to write, and the new
>>>>> heartbeat protocol)
>>>>> =========
>>>>> <?php
>>>>> interface iShellBolt {
>>>>> }
>>>>>
>>>>> interface iShellSpout {
>>>>> }
>>>>>
>>>>> class Tuple {
>>>>>     public $id, $component, $stream, $task, $values;
>>>>>
>>>>>     public function __construct($id, $component, $stream, $task,
>>>>> $values) {
>>>>>         $this->id = $id;
>>>>>         $this->component = $component;
>>>>>         $this->stream = $stream;
>>>>>         $this->task = $task;
>>>>>         $this->values = $values;
>>>>>     }
>>>>> }
>>>>>
>>>>> abstract class ShellComponent {
>>>>>     protected $pid;
>>>>>     protected $stormConf;
>>>>>     protected $topologyContext;
>>>>>
>>>>>     protected $stormInc = null;
>>>>>
>>>>>     public function __construct() {
>>>>>         $this->pid = getmypid();
>>>>>         $this->sendCommand(array("pid" => $this->pid));
>>>>>
>>>>>         $handshake = $this->parseMessage($this->waitForMessage());
>>>>>
>>>>>         $this->stormConf = $handshake['conf'];
>>>>>         $this->topologyContext = $handshake['context'];
>>>>>         $pidDir = $handshake['pidDir'];
>>>>>
>>>>>         @fclose(@fopen($pidDir . "/" . $this->pid, "w"));
>>>>>     }
>>>>>
>>>>>     protected function readLine() {
>>>>>         $raw = fgets(STDIN);
>>>>>
>>>>>         if ($raw === false) {
>>>>>             throw new Exception("STDIN is broken");
>>>>>         }
>>>>>
>>>>>         $line = trim($raw);
>>>>>
>>>>>         return $line;
>>>>>     }
>>>>>
>>>>>     protected function waitForMessage() {
>>>>>         $message = '';
>>>>>         while (true) {
>>>>>             $line = trim($this->readLine());
>>>>>
>>>>>             if (strlen($line) == 0) {
>>>>>                 continue;
>>>>>             } else if ($line == 'end') {
>>>>>                 break;
>>>>>             } else if ($line == 'sync') {
>>>>>                 $message = '';
>>>>>                 continue;
>>>>>             }
>>>>>
>>>>>             $message .= $line . "\n";
>>>>>         }
>>>>>
>>>>>         return trim($message);
>>>>>     }
>>>>>
>>>>>     protected function sendCommand(array $command) {
>>>>>         $this->sendMessage(json_encode($command));
>>>>>     }
>>>>>
>>>>>     protected function sendLog($message) {
>>>>>         return $this->sendCommand(array(
>>>>>             'command' => 'log',
>>>>>             'msg' => $message
>>>>>         ));
>>>>>     }
>>>>>
>>>>>     protected function parseMessage($message) {
>>>>>         $msg = json_decode($message, true);
>>>>>
>>>>>         if ($msg) {
>>>>>             return $msg;
>>>>>         } else {
>>>>>             return $message;
>>>>>         }
>>>>>     }
>>>>>
>>>>>     protected function sendMessage($message) {
>>>>>         $message = "$message\nend\n";
>>>>>         $bytesWritten = fwrite(STDOUT, $message);
>>>>>         fflush(STDOUT);
>>>>>         if ($bytesWritten === false) {
>>>>>             throw new Exception("STDOUT is broken");
>>>>>         }
>>>>>         if ($bytesWritten != strlen($message)) {
>>>>>             throw new Exception("Unable to write all bytes to STDOUT
>>>>> (message=$message, bytesWritten=$bytesWritten)");
>>>>>         }
>>>>>     }
>>>>>
>>>>>     final protected function sync() {
>>>>>         $command = array(
>>>>>             'command' => 'sync',
>>>>>         );
>>>>>
>>>>>         $this->sendCommand($command);
>>>>>     }
>>>>>
>>>>> }
>>>>>
>>>>> abstract class ShellBolt extends ShellComponent implements iShellBolt {
>>>>>
>>>>>     public $anchor_tuple = null;
>>>>>
>>>>>     public function __construct() {
>>>>>         parent::__construct();
>>>>>
>>>>>         $this->init($this->stormConf, $this->topologyContext);
>>>>>     }
>>>>>
>>>>>     public function run() {
>>>>>         try {
>>>>>             while (true) {
>>>>>                 $command =
>>>>> $this->parseMessage($this->waitForMessage());
>>>>>
>>>>>                 if (is_array($command)) {
>>>>>                     if (isset($command['tuple'])) {
>>>>>                         $tupleMap = array_merge(array(
>>>>>                                 'id' => null,
>>>>>                                 'comp' => null,
>>>>>                                 'stream' => null,
>>>>>                                 'task' => null,
>>>>>                                 'tuple' => null
>>>>>                             ),
>>>>>
>>>>>                             $command);
>>>>>
>>>>>                         if($tupleMap['task'] == -1 &&
>>>>> $tupleMap['stream'] == "__heartbeat") {
>>>>>                             $this->sync();
>>>>>                         } else {
>>>>>                             $tuple = new Tuple($tupleMap['id'],
>>>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>>>>> $tupleMap['tuple']);
>>>>>                             $this->process($tuple);
>>>>>                         }
>>>>>                     }
>>>>>                 }
>>>>>             }
>>>>>         } catch (Exception $e) {
>>>>>             $this->sendLog((string)$e);
>>>>>         }
>>>>>     }
>>>>>
>>>>>     abstract protected function process(Tuple $tuple);
>>>>>
>>>>>     protected function init($conf, $topology) {
>>>>>         return;
>>>>>     }
>>>>>
>>>>>     protected function emitTuple(array $tuple, $stream = null,
>>>>> $anchors = array(), $directTask = null) {
>>>>>         if ($this->anchor_tuple !== null) {
>>>>>             $anchors = array($this->anchor_tuple);
>>>>>         }
>>>>>
>>>>>         $command = array(
>>>>>             'command' => 'emit'
>>>>>         );
>>>>>
>>>>>         if ($stream !== null) {
>>>>>             $command['stream'] = $stream;
>>>>>         }
>>>>>
>>>>>         $command['anchors'] = array_map(function ($a) {
>>>>>             return $a->id;
>>>>>         }, $anchors);
>>>>>
>>>>>         if ($directTask !== null) {
>>>>>             $command['task'] = $directTask;
>>>>>         }
>>>>>
>>>>>         $command['tuple'] = $tuple;
>>>>>
>>>>>         $this->sendCommand($command);
>>>>>     }
>>>>>
>>>>>     protected function emit($tuple, $stream = null, $anchors =
>>>>> array()) {
>>>>>         $this->emitTuple($tuple, $stream, $anchors);
>>>>>     }
>>>>>
>>>>>     protected function emitDirect($directTask, $tuple, $stream = null,
>>>>> $anchors = array()) {
>>>>>         $this->emitTuple($tuple, $stream, $anchors, $directTask);
>>>>>     }
>>>>>
>>>>>     protected function ack(Tuple $tuple) {
>>>>>         $command = array(
>>>>>             'command' => 'ack',
>>>>>             'id' => $tuple->id
>>>>>         );
>>>>>
>>>>>         $this->sendCommand($command);
>>>>>     }
>>>>>
>>>>>     protected function fail(Tuple $tuple) {
>>>>>         $command = array(
>>>>>             'command' => 'fail',
>>>>>             'id' => $tuple->id
>>>>>         );
>>>>>
>>>>>         $this->sendCommand($command);
>>>>>     }
>>>>> }
>>>>>
>>>>> abstract class BasicBolt extends ShellBolt {
>>>>>     public function run() {
>>>>>         try {
>>>>>             while (true) {
>>>>>                 $command =
>>>>> $this->parseMessage($this->waitForMessage());
>>>>>
>>>>>                 if (is_array($command)) {
>>>>>                     if (isset($command['tuple'])) {
>>>>>                         $tupleMap = array_merge(array(
>>>>>                                 'id' => null,
>>>>>                                 'comp' => null,
>>>>>                                 'stream' => null,
>>>>>                                 'task' => null,
>>>>>                                 'tuple' => null
>>>>>                             ),
>>>>>
>>>>>                             $command);
>>>>>
>>>>>                         if($tupleMap['task'] == -1 &&
>>>>> $tupleMap['stream'] == "__heartbeat") {
>>>>>                             $this->sync();
>>>>>                         } else {
>>>>>                             $tuple = new Tuple($tupleMap['id'],
>>>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>>>>> $tupleMap['tuple']);
>>>>>
>>>>>                             $this->anchor_tuple = $tuple;
>>>>>
>>>>>                             try {
>>>>>                                 $processed = $this->process($tuple);
>>>>>
>>>>>                                 $this->ack($tuple);
>>>>>                             } catch (BoltProcessException $e) {
>>>>>                                 $this->fail($tuple);
>>>>>                             }
>>>>>                         }
>>>>>                     }
>>>>>                 }
>>>>>             }
>>>>>         } catch (Exception $e) {
>>>>>             $this->sendLog((string)$e);
>>>>>         }
>>>>>
>>>>>     }
>>>>> }
>>>>>
>>>>> abstract class ShellSpout extends ShellComponent implements
>>>>> iShellSpout {
>>>>>     protected $tuples = array();
>>>>>
>>>>>     public function __construct() {
>>>>>         parent::__construct();
>>>>>
>>>>>         $this->init($this->stormConf, $this->topologyContext);
>>>>>     }
>>>>>
>>>>>
>>>>>     abstract protected function nextTuple();
>>>>>
>>>>>     abstract protected function ack($tuple_id);
>>>>>
>>>>>     abstract protected function fail($tuple_id);
>>>>>
>>>>>     public function run() {
>>>>>         try {
>>>>>             while (true) {
>>>>>                 $command =
>>>>> $this->parseMessage($this->waitForMessage());
>>>>>
>>>>>                 if (is_array($command)) {
>>>>>                     if (isset($command['command'])) {
>>>>>                         if ($command['command'] == 'ack') {
>>>>>                             $this->ack($command['id']);
>>>>>                             $this->sync();
>>>>>                         } else if ($command['command'] == 'fail') {
>>>>>                             $this->fail($command['id']);
>>>>>                             $this->sync();
>>>>>                         } else if ($command['command'] == 'next') {
>>>>>                             $this->nextTuple();
>>>>>                             $this->sync();
>>>>>                         }
>>>>>                     }
>>>>>                 }
>>>>>             }
>>>>>         } catch (Exception $e) {
>>>>>             $this->sendLog((string)$e);
>>>>>             $this->sync();
>>>>>         }
>>>>>     }
>>>>>
>>>>>     protected function init($stormConf, $topologyContext) {
>>>>>         return;
>>>>>     }
>>>>>
>>>>>     final protected function emit(array $tuple, $messageId = null,
>>>>> $streamId = null) {
>>>>>         return $this->emitTuple($tuple, $messageId, $streamId, null);
>>>>>     }
>>>>>
>>>>>     final protected function emitDirect($directTask, array $tuple,
>>>>> $messageId = null, $streamId = null) {
>>>>>         return $this->emitTuple($tuple, $messageId, $streamId,
>>>>> $directTask);
>>>>>     }
>>>>>
>>>>>     final private function emitTuple(array $tuple, $messageId = null,
>>>>> $streamId = null, $directTask = null) {
>>>>>         $command = array(
>>>>>             'command' => 'emit'
>>>>>         );
>>>>>
>>>>>         if ($messageId !== null) {
>>>>>             $command['id'] = $messageId;
>>>>>         }
>>>>>
>>>>>         if ($streamId !== null) {
>>>>>             $command['stream'] = $streamId;
>>>>>         }
>>>>>
>>>>>         if ($directTask !== null) {
>>>>>             $command['task'] = $directTask;
>>>>>         }
>>>>>
>>>>>         $command['tuple'] = $tuple;
>>>>>
>>>>>         return $this->sendCommand($command);
>>>>>     }
>>>>> }
>>>>>
>>>>> class BoltProcessException extends Exception {
>>>>> }
>>>>>
>>>>> =========================
>>>>>
>>>>>
>>>>> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman <
>>>>> oberman@civicscience.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> For reference, I'm talking about 0.9.3 ShellSpout, line 234.
>>>>>>
>>>>>> I'll try to cover the important facts that led to this issue:
>>>>>>
>>>>>> -I was on 0.9.2 using multilang to bridge to PHP to get to some
>>>>>> existing business logic
>>>>>>
>>>>>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition
>>>>>> to the ShellBolt protocol)
>>>>>>
>>>>>> -I have some odd topologies where I try to do some legacy background
>>>>>> processing.  This processing takes a highly variable amount time in the
>>>>>> Bolts, from milliseconds to minutes.  But, eventually due to randomness the
>>>>>> spout's "pending" pool fills up, causing the spout to block on nextTuple,
>>>>>> which eventually causes a heartbeat timeout. (I believe my only fix is to
>>>>>> increase the heartbeat timeout at the topology level. that's not the
>>>>>> purpose of this email, though confirmation of this as my only workaround
>>>>>> would be appreciated!  I feel like this wasn't anticipated when the
>>>>>> heartbeat patch was designed, as it was assumed the spout's nextTuple
>>>>>> wouldn't block I guess?)
>>>>>>
>>>>>> -The purpose of this email is the fact that the topology "jams up"
>>>>>> when the ShellSpout has a heartbeat timeout.  I can see my PHP spout/bolt
>>>>>> still running (I added logging to them), but Storm itself is doing nothing.
>>>>>>
>>>>>> -I added logging to ShellSpout and recompiled, because I saw the log
>>>>>> message on like 233 (Halting process: ShellSpout died) but as noted the PHP
>>>>>> process was still running, so I was curious if _process.destroy(); failed.
>>>>>> But, my logging didn't appear.  I assumed I was compiling/deploying wrong.
>>>>>> Eventually I commented out line 234: _collector.reportError(exception);
>>>>>>  and everything started working!!!
>>>>>>
>>>>>> Does this make *any* sense?  Why would
>>>>>> _collector.reportError(exception); block and never return (I waited quite a
>>>>>> long time, 10's of minutes).  When I comment out line 234, Storm
>>>>>> immediately kills my bad tasks and respawns almost instantly.
>>>>>>
>>>>>> I feel fairly confident that this will be recreatable.  My topology:
>>>>>> -1 spout (ShellSpout)
>>>>>> -1 bolt (ShellBolt)
>>>>>> -The ShellSpout has a heartbeat timeout due to slow tasks in
>>>>>> ShellBolt + the pending queue is full
>>>>>>
>>>>>> Thanks for any feedback!
>>>>>>
>>>>>> will
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Alex Sobrino Beltrán
>>>> Registered Linux User #273657
>>>>
>>>> http://v5tech.es
>>>>
>>>
>>>
>>>
>>>
>>
>>
>
>
>

Re: ShellSpout hangs on reportError?

Posted by William Oberman <ob...@civicscience.com>.
Sorry for the cross post to dev, but I think this thread has veered into
actual dev questions.  I still don't know if there is something
fundamentally wrong about my use case, or if this is a bug.  For a dev
reading this for the first time, the main correction I'd make is to my
email subject.  reportError isn't hanging, it's throwing a runtime
exception (wrapping an interrupted exception).  As for what is throwing the
interrupted exception, I think it's Zookeeper itself.

Both ShellSpout and ShellBolt's die() has a
"_collector.reportError(exception);" line.  I changed both to:
====
        try {
            _collector.reportError(exception);
        } catch (RuntimeException e) {
            if(e.getCause() instanceof InterruptedException) {
                //zookeeper.clj wraps zk InterruptedException with runtime
exception
            } else {
                throw e;
            }
        }
======
and now everything starts to work as I expected.

Does this patch make any sense?  Or is it a bandaid over a deeper issue?

will

On Thu, Feb 12, 2015 at 2:15 PM, William Oberman <ob...@civicscience.com>
wrote:

> Ok, I realized that I did NOT check if ShellSpout.die() was throwing a
> RuntimeException.   I added a try/catch block, and it is!   The
> RuntimeException is preventing _process.destroy and System.exit() from
> happening, both of which need to happen to make topology recovery happen.
>
> But, I'm not sure *why* this exception is happening yet, since it's an
> interrupted exception and I don't think the exception tells me *who*
> interrupted my thread...
>
> 2015-02-12T14:12:35.581-0500 b.s.s.ShellSpout [ERROR] die exception!
> java.lang.RuntimeException: java.lang.InterruptedException
> at backtype.storm.util$wrap_in_runtime.invoke(util.clj:44)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:102)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at backtype.storm.zookeeper$exists_node_QMARK_.invoke(zookeeper.clj:98)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at backtype.storm.zookeeper$mkdirs.invoke(zookeeper.clj:114)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.cluster$mk_distributed_cluster_state$reify__3533.mkdirs(cluster.clj:119)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.cluster$mk_storm_cluster_state$reify__3990.report_error(cluster.clj:400)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.daemon.executor$throttled_report_error_fn$fn__5565.invoke(executor.clj:180)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.daemon.executor$fn__5717$fn$reify__5759.reportError(executor.clj:533)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.spout.SpoutOutputCollector.reportError(SpoutOutputCollector.java:132)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at backtype.storm.spout.ShellSpout.die(ShellSpout.java:235)
> [storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at backtype.storm.spout.ShellSpout.access$200(ShellSpout.java:42)
> [storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
> [storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> [na:1.7.0_71]
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> [na:1.7.0_71]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> [na:1.7.0_71]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [na:1.7.0_71]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> [na:1.7.0_71]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> [na:1.7.0_71]
> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> Caused by: java.lang.InterruptedException: null
> at java.lang.Object.wait(Native Method) ~[na:1.7.0_71]
> at java.lang.Object.wait(Object.java:503) ~[na:1.7.0_71]
> at
> org.apache.storm.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at org.apache.storm.zookeeper.ZooKeeper.exists(ZooKeeper.java:1040)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:172)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:161)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at org.apache.storm.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl.pathInForeground(ExistsBuilderImpl.java:157)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:148)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:36)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:101)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> ... 17 common frames omitted
>
>
> On Wed, Feb 11, 2015 at 4:27 PM, William Oberman <oberman@civicscience.com
> > wrote:
>
>> I'm not sure what I've learned adds up to yet....
>>
>> I tried setting up a local storm development environment.  By mistake, I
>> forgot to switch to 0.9.3-branch (e.g. I was working on master at first).
>> In master, I was seeing the ShellBolt heartbeat fail first (which in
>> retrospect makes sense!).  Then I remembered I was trying to debug 0.9.3,
>> so I switched to 0.9.3-branch.  In the branch, I see ShellSpout heartbeat
>> fail first and jam up, so that's good (?).  In both cases, I was trying
>> local mode rather than cluster mode, as I figured I had a better shot at
>> debugging in local mode.
>>
>> At this point, I figured I had a good test case.  This was all using
>> command line tools (git, mvn) so far.   Two thoughts at this point:  1.) I
>> don't plan on running master, and it's not 100% clear to me if ShellBolt
>> failing first will solve any problems...  2.) in 0.9.3-branch, after
>> ShellSpout fails and everything jams up if I "ctl-c" the process I
>> immediately see the ShellBolt heartbeat timeout message.  It's like it was
>> waiting to write, but was blocked on something (hrmmmm....?)
>>
>> But, I hit the limits of my ability to add "System.out" debugging, so I
>> tried setting up storm in IntelliJ.  That took a bit, but I finally figured
>> out how to run a topology in local mode with a debugger.  Once again, I was
>> seeing ShellSpout heartbeat fail and then nothing happen.
>>
>> The next problem is that I don't know how to setup IntelliJ to understand
>> clojure compiled code (at least, I think that's my problem....) so the
>> "Step Into/Out of" information is really weird in the debugger.  The
>> best/most complete stack trace I have is:
>> invoke():102, zookeeper$exists_node_QMARK_$fn__3279 (backtype.storm)
>> invoke():98, zookeeper$exists_node_QMARK_ (backtype.storm)
>> invoke():114, zookeeper$mkdirs (backtype.storm)
>> mkdirs():119, cluster$mk_distributed_cluster_state$reify__3526
>> (backtype.storm)
>> report_error():397, cluster$mk_storm_cluster_state$reify__3983
>> (backtype.storm)
>> invoke():180, executor$throttled_report_error_fn$fn__5548
>> (backtype.storm.daemon)
>> reportError():533, executor$fn__5700$fn$reify__5742
>> (backtype.storm.daemon)
>> reportError():132, SpoutOutputCollector (backtype.storm.spout)
>>
>> I had a better stack trace (that I lost) that lead into:
>> org.apache.storm.curator.RetryLoop.callWithRetry()
>> which for me is my "prime suspect" (based on name alone) for something
>> that is blocking things up....  :-)
>>
>> Though, once again, not understanding the big picture of storm, I have no
>> idea what all of the above adds up to in terms of what's wrong, and how to
>> fix it still....
>>
>> will
>>
>> On Wed, Feb 11, 2015 at 1:31 PM, William Oberman <
>> oberman@civicscience.com> wrote:
>>
>>> I'm glad to hear I'm not the only one!
>>>
>>> (no new news yet)
>>>
>>>
>>> On Wed, Feb 11, 2015 at 3:09 AM, Alex Sobrino <al...@v5tech.es> wrote:
>>>
>>>> Hi William,
>>>>
>>>> I'm having the same problem running a multilang topology (written in
>>>> python). If you find a solution, please post it here, it will sure help us.
>>>>
>>>> To upgrade from 0.9.2-incubating we updated storm.py (
>>>> https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py)
>>>> and pom.xml.
>>>>
>>>> Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it
>>>> works like hell.
>>>>
>>>> Best regards,
>>>>
>>>> On Tue, Feb 10, 2015 at 8:59 PM, William Oberman <
>>>> oberman@civicscience.com> wrote:
>>>>
>>>>> I'm not sure the best way to share a test case.  I'll copy and paste
>>>>> code below....  If you run the below code (and find the worker that was
>>>>> running it's log file), you should see in ~30 seconds:
>>>>> ====
>>>>> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process:
>>>>> ShellSpout died.
>>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>>         at
>>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> [na:1.7.0_71]
>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>>>>> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR]
>>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>>         at
>>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> [na:1.7.0_71]
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> [na:1.7.0_71]
>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>>>>> =======
>>>>>
>>>>> But, the topology will run in a kind of weird zombie state forever.
>>>>> More specifically I see the multilang bolt process all tuples in the
>>>>> pending queue, and then an infinite loop of nextTuple()/fail() from the
>>>>> multilang spout.  But, as noted in my original email, if I comment out:
>>>>>  _collector.reportError(exception);
>>>>> in the Java ShellSpout then the worker will immediately die and
>>>>> respawn.
>>>>>
>>>>> If no one can help, the next step for me is rough, as I'll have to
>>>>> learn how to actually develop and debug storm itself, which is usually at
>>>>> least 10x harder than just using something :-)
>>>>>
>>>>> In any case, my test code:
>>>>>
>>>>> Topology = 1 process with two tasks (multilang spout and bolt), and
>>>>> small pool of pending messages (yes, using the word count example in
>>>>> storm-starter as a starting point....)
>>>>> =============
>>>>> public class SlowTopology {
>>>>>   public static class SlowPhpBolt extends ShellBolt implements
>>>>> IRichBolt {
>>>>>
>>>>>     public SlowPhpBolt() {
>>>>>       super("php", "slowBolt.php");
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>       declarer.declare(new Fields());
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public Map<String, Object> getComponentConfiguration() {
>>>>>       return null;
>>>>>     }
>>>>>   }
>>>>>
>>>>>   public static class SlowPhpSpout extends ShellSpout implements
>>>>> IRichSpout {
>>>>>
>>>>>       public SlowPhpSpout() {
>>>>>           super("php", "slowSpout.php");
>>>>>       }
>>>>>
>>>>>     @Override
>>>>>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>>>         ofd.declare(new Fields("output"));
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public Map<String, Object> getComponentConfiguration() {
>>>>>         return null;
>>>>>     }
>>>>>   }
>>>>>
>>>>>   public static void main(String[] args) throws Exception {
>>>>>
>>>>>     TopologyBuilder builder = new TopologyBuilder();
>>>>>
>>>>>     builder.setSpout("spout", new SlowPhpSpout(),
>>>>> 1).setNumTasks(1).setMaxSpoutPending(3);
>>>>>     builder.setBolt("bolt", new SlowPhpBolt(),
>>>>> 1).setNumTasks(1).shuffleGrouping("spout");
>>>>>
>>>>>     Config conf = new Config();
>>>>>     conf.setDebug(true);
>>>>>
>>>>>     if (args != null && args.length > 0) {
>>>>>       conf.setNumWorkers(1);
>>>>>       StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
>>>>> builder.createTopology());
>>>>>     }
>>>>>     else {
>>>>>       LocalCluster cluster = new LocalCluster();
>>>>>       cluster.submitTopology("slow", conf, builder.createTopology());
>>>>>       Thread.sleep(10000);
>>>>>       cluster.shutdown();
>>>>>     }
>>>>>   }
>>>>> }
>>>>> ===========
>>>>>
>>>>> slowSpout.php
>>>>> ==========
>>>>> <?php
>>>>> require_once "storm.php";
>>>>> class slowSpout extends \ShellSpout {
>>>>>   protected function nextTuple() {
>>>>>     $value = rand(0,100);
>>>>>     $id = rand(0, 100);
>>>>>     $this->emit(array($value), $id);
>>>>>     file_put_contents("/tmp/storm_slow.log",
>>>>> "nextTuple()->value[$value] id[$id]\n", FILE_APPEND);
>>>>>     sleep(1);
>>>>>   }
>>>>>   protected function ack($id) {
>>>>>     file_put_contents("/tmp/storm_slow.log", "ack($id)\n",
>>>>> FILE_APPEND);
>>>>>   }
>>>>>
>>>>>   protected function fail($id) {
>>>>>     file_put_contents("/tmp/storm_slow.log", "fail($id)\n",
>>>>> FILE_APPEND);
>>>>>   }
>>>>> }
>>>>>
>>>>> (new slowSpout())->run();
>>>>> ===========
>>>>>
>>>>> slowBolt.php
>>>>> ============
>>>>> <?php
>>>>> require_once "storm.php";
>>>>> class slowBolt extends \BasicBolt {
>>>>>   protected function process(\Tuple $t) {
>>>>>     $sleep = rand(1, 180);
>>>>>     file_put_contents("/tmp/storm_slow.log", "process(".print_r($t,
>>>>> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND);
>>>>>     sleep($sleep);
>>>>>   }
>>>>> }
>>>>> (new slowBolt())->run();
>>>>> ============
>>>>>
>>>>> storm.php  (from  https://github.com/lazyshot/storm-php, and I think
>>>>> I added more error checking on reads/writes to standard in/out, added
>>>>> sync() to the ShellSpout to make new classes easier to write, and the new
>>>>> heartbeat protocol)
>>>>> =========
>>>>> <?php
>>>>> interface iShellBolt {
>>>>> }
>>>>>
>>>>> interface iShellSpout {
>>>>> }
>>>>>
>>>>> class Tuple {
>>>>>     public $id, $component, $stream, $task, $values;
>>>>>
>>>>>     public function __construct($id, $component, $stream, $task,
>>>>> $values) {
>>>>>         $this->id = $id;
>>>>>         $this->component = $component;
>>>>>         $this->stream = $stream;
>>>>>         $this->task = $task;
>>>>>         $this->values = $values;
>>>>>     }
>>>>> }
>>>>>
>>>>> abstract class ShellComponent {
>>>>>     protected $pid;
>>>>>     protected $stormConf;
>>>>>     protected $topologyContext;
>>>>>
>>>>>     protected $stormInc = null;
>>>>>
>>>>>     public function __construct() {
>>>>>         $this->pid = getmypid();
>>>>>         $this->sendCommand(array("pid" => $this->pid));
>>>>>
>>>>>         $handshake = $this->parseMessage($this->waitForMessage());
>>>>>
>>>>>         $this->stormConf = $handshake['conf'];
>>>>>         $this->topologyContext = $handshake['context'];
>>>>>         $pidDir = $handshake['pidDir'];
>>>>>
>>>>>         @fclose(@fopen($pidDir . "/" . $this->pid, "w"));
>>>>>     }
>>>>>
>>>>>     protected function readLine() {
>>>>>         $raw = fgets(STDIN);
>>>>>
>>>>>         if ($raw === false) {
>>>>>             throw new Exception("STDIN is broken");
>>>>>         }
>>>>>
>>>>>         $line = trim($raw);
>>>>>
>>>>>         return $line;
>>>>>     }
>>>>>
>>>>>     protected function waitForMessage() {
>>>>>         $message = '';
>>>>>         while (true) {
>>>>>             $line = trim($this->readLine());
>>>>>
>>>>>             if (strlen($line) == 0) {
>>>>>                 continue;
>>>>>             } else if ($line == 'end') {
>>>>>                 break;
>>>>>             } else if ($line == 'sync') {
>>>>>                 $message = '';
>>>>>                 continue;
>>>>>             }
>>>>>
>>>>>             $message .= $line . "\n";
>>>>>         }
>>>>>
>>>>>         return trim($message);
>>>>>     }
>>>>>
>>>>>     protected function sendCommand(array $command) {
>>>>>         $this->sendMessage(json_encode($command));
>>>>>     }
>>>>>
>>>>>     protected function sendLog($message) {
>>>>>         return $this->sendCommand(array(
>>>>>             'command' => 'log',
>>>>>             'msg' => $message
>>>>>         ));
>>>>>     }
>>>>>
>>>>>     protected function parseMessage($message) {
>>>>>         $msg = json_decode($message, true);
>>>>>
>>>>>         if ($msg) {
>>>>>             return $msg;
>>>>>         } else {
>>>>>             return $message;
>>>>>         }
>>>>>     }
>>>>>
>>>>>     protected function sendMessage($message) {
>>>>>         $message = "$message\nend\n";
>>>>>         $bytesWritten = fwrite(STDOUT, $message);
>>>>>         fflush(STDOUT);
>>>>>         if ($bytesWritten === false) {
>>>>>             throw new Exception("STDOUT is broken");
>>>>>         }
>>>>>         if ($bytesWritten != strlen($message)) {
>>>>>             throw new Exception("Unable to write all bytes to STDOUT
>>>>> (message=$message, bytesWritten=$bytesWritten)");
>>>>>         }
>>>>>     }
>>>>>
>>>>>     final protected function sync() {
>>>>>         $command = array(
>>>>>             'command' => 'sync',
>>>>>         );
>>>>>
>>>>>         $this->sendCommand($command);
>>>>>     }
>>>>>
>>>>> }
>>>>>
>>>>> abstract class ShellBolt extends ShellComponent implements iShellBolt {
>>>>>
>>>>>     public $anchor_tuple = null;
>>>>>
>>>>>     public function __construct() {
>>>>>         parent::__construct();
>>>>>
>>>>>         $this->init($this->stormConf, $this->topologyContext);
>>>>>     }
>>>>>
>>>>>     public function run() {
>>>>>         try {
>>>>>             while (true) {
>>>>>                 $command =
>>>>> $this->parseMessage($this->waitForMessage());
>>>>>
>>>>>                 if (is_array($command)) {
>>>>>                     if (isset($command['tuple'])) {
>>>>>                         $tupleMap = array_merge(array(
>>>>>                                 'id' => null,
>>>>>                                 'comp' => null,
>>>>>                                 'stream' => null,
>>>>>                                 'task' => null,
>>>>>                                 'tuple' => null
>>>>>                             ),
>>>>>
>>>>>                             $command);
>>>>>
>>>>>                         if($tupleMap['task'] == -1 &&
>>>>> $tupleMap['stream'] == "__heartbeat") {
>>>>>                             $this->sync();
>>>>>                         } else {
>>>>>                             $tuple = new Tuple($tupleMap['id'],
>>>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>>>>> $tupleMap['tuple']);
>>>>>                             $this->process($tuple);
>>>>>                         }
>>>>>                     }
>>>>>                 }
>>>>>             }
>>>>>         } catch (Exception $e) {
>>>>>             $this->sendLog((string)$e);
>>>>>         }
>>>>>     }
>>>>>
>>>>>     abstract protected function process(Tuple $tuple);
>>>>>
>>>>>     protected function init($conf, $topology) {
>>>>>         return;
>>>>>     }
>>>>>
>>>>>     protected function emitTuple(array $tuple, $stream = null,
>>>>> $anchors = array(), $directTask = null) {
>>>>>         if ($this->anchor_tuple !== null) {
>>>>>             $anchors = array($this->anchor_tuple);
>>>>>         }
>>>>>
>>>>>         $command = array(
>>>>>             'command' => 'emit'
>>>>>         );
>>>>>
>>>>>         if ($stream !== null) {
>>>>>             $command['stream'] = $stream;
>>>>>         }
>>>>>
>>>>>         $command['anchors'] = array_map(function ($a) {
>>>>>             return $a->id;
>>>>>         }, $anchors);
>>>>>
>>>>>         if ($directTask !== null) {
>>>>>             $command['task'] = $directTask;
>>>>>         }
>>>>>
>>>>>         $command['tuple'] = $tuple;
>>>>>
>>>>>         $this->sendCommand($command);
>>>>>     }
>>>>>
>>>>>     protected function emit($tuple, $stream = null, $anchors =
>>>>> array()) {
>>>>>         $this->emitTuple($tuple, $stream, $anchors);
>>>>>     }
>>>>>
>>>>>     protected function emitDirect($directTask, $tuple, $stream = null,
>>>>> $anchors = array()) {
>>>>>         $this->emitTuple($tuple, $stream, $anchors, $directTask);
>>>>>     }
>>>>>
>>>>>     protected function ack(Tuple $tuple) {
>>>>>         $command = array(
>>>>>             'command' => 'ack',
>>>>>             'id' => $tuple->id
>>>>>         );
>>>>>
>>>>>         $this->sendCommand($command);
>>>>>     }
>>>>>
>>>>>     protected function fail(Tuple $tuple) {
>>>>>         $command = array(
>>>>>             'command' => 'fail',
>>>>>             'id' => $tuple->id
>>>>>         );
>>>>>
>>>>>         $this->sendCommand($command);
>>>>>     }
>>>>> }
>>>>>
>>>>> abstract class BasicBolt extends ShellBolt {
>>>>>     public function run() {
>>>>>         try {
>>>>>             while (true) {
>>>>>                 $command =
>>>>> $this->parseMessage($this->waitForMessage());
>>>>>
>>>>>                 if (is_array($command)) {
>>>>>                     if (isset($command['tuple'])) {
>>>>>                         $tupleMap = array_merge(array(
>>>>>                                 'id' => null,
>>>>>                                 'comp' => null,
>>>>>                                 'stream' => null,
>>>>>                                 'task' => null,
>>>>>                                 'tuple' => null
>>>>>                             ),
>>>>>
>>>>>                             $command);
>>>>>
>>>>>                         if($tupleMap['task'] == -1 &&
>>>>> $tupleMap['stream'] == "__heartbeat") {
>>>>>                             $this->sync();
>>>>>                         } else {
>>>>>                             $tuple = new Tuple($tupleMap['id'],
>>>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>>>>> $tupleMap['tuple']);
>>>>>
>>>>>                             $this->anchor_tuple = $tuple;
>>>>>
>>>>>                             try {
>>>>>                                 $processed = $this->process($tuple);
>>>>>
>>>>>                                 $this->ack($tuple);
>>>>>                             } catch (BoltProcessException $e) {
>>>>>                                 $this->fail($tuple);
>>>>>                             }
>>>>>                         }
>>>>>                     }
>>>>>                 }
>>>>>             }
>>>>>         } catch (Exception $e) {
>>>>>             $this->sendLog((string)$e);
>>>>>         }
>>>>>
>>>>>     }
>>>>> }
>>>>>
>>>>> abstract class ShellSpout extends ShellComponent implements
>>>>> iShellSpout {
>>>>>     protected $tuples = array();
>>>>>
>>>>>     public function __construct() {
>>>>>         parent::__construct();
>>>>>
>>>>>         $this->init($this->stormConf, $this->topologyContext);
>>>>>     }
>>>>>
>>>>>
>>>>>     abstract protected function nextTuple();
>>>>>
>>>>>     abstract protected function ack($tuple_id);
>>>>>
>>>>>     abstract protected function fail($tuple_id);
>>>>>
>>>>>     public function run() {
>>>>>         try {
>>>>>             while (true) {
>>>>>                 $command =
>>>>> $this->parseMessage($this->waitForMessage());
>>>>>
>>>>>                 if (is_array($command)) {
>>>>>                     if (isset($command['command'])) {
>>>>>                         if ($command['command'] == 'ack') {
>>>>>                             $this->ack($command['id']);
>>>>>                             $this->sync();
>>>>>                         } else if ($command['command'] == 'fail') {
>>>>>                             $this->fail($command['id']);
>>>>>                             $this->sync();
>>>>>                         } else if ($command['command'] == 'next') {
>>>>>                             $this->nextTuple();
>>>>>                             $this->sync();
>>>>>                         }
>>>>>                     }
>>>>>                 }
>>>>>             }
>>>>>         } catch (Exception $e) {
>>>>>             $this->sendLog((string)$e);
>>>>>             $this->sync();
>>>>>         }
>>>>>     }
>>>>>
>>>>>     protected function init($stormConf, $topologyContext) {
>>>>>         return;
>>>>>     }
>>>>>
>>>>>     final protected function emit(array $tuple, $messageId = null,
>>>>> $streamId = null) {
>>>>>         return $this->emitTuple($tuple, $messageId, $streamId, null);
>>>>>     }
>>>>>
>>>>>     final protected function emitDirect($directTask, array $tuple,
>>>>> $messageId = null, $streamId = null) {
>>>>>         return $this->emitTuple($tuple, $messageId, $streamId,
>>>>> $directTask);
>>>>>     }
>>>>>
>>>>>     final private function emitTuple(array $tuple, $messageId = null,
>>>>> $streamId = null, $directTask = null) {
>>>>>         $command = array(
>>>>>             'command' => 'emit'
>>>>>         );
>>>>>
>>>>>         if ($messageId !== null) {
>>>>>             $command['id'] = $messageId;
>>>>>         }
>>>>>
>>>>>         if ($streamId !== null) {
>>>>>             $command['stream'] = $streamId;
>>>>>         }
>>>>>
>>>>>         if ($directTask !== null) {
>>>>>             $command['task'] = $directTask;
>>>>>         }
>>>>>
>>>>>         $command['tuple'] = $tuple;
>>>>>
>>>>>         return $this->sendCommand($command);
>>>>>     }
>>>>> }
>>>>>
>>>>> class BoltProcessException extends Exception {
>>>>> }
>>>>>
>>>>> =========================
>>>>>
>>>>>
>>>>> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman <
>>>>> oberman@civicscience.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> For reference, I'm talking about 0.9.3 ShellSpout, line 234.
>>>>>>
>>>>>> I'll try to cover the important facts that led to this issue:
>>>>>>
>>>>>> -I was on 0.9.2 using multilang to bridge to PHP to get to some
>>>>>> existing business logic
>>>>>>
>>>>>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition
>>>>>> to the ShellBolt protocol)
>>>>>>
>>>>>> -I have some odd topologies where I try to do some legacy background
>>>>>> processing.  This processing takes a highly variable amount time in the
>>>>>> Bolts, from milliseconds to minutes.  But, eventually due to randomness the
>>>>>> spout's "pending" pool fills up, causing the spout to block on nextTuple,
>>>>>> which eventually causes a heartbeat timeout. (I believe my only fix is to
>>>>>> increase the heartbeat timeout at the topology level. that's not the
>>>>>> purpose of this email, though confirmation of this as my only workaround
>>>>>> would be appreciated!  I feel like this wasn't anticipated when the
>>>>>> heartbeat patch was designed, as it was assumed the spout's nextTuple
>>>>>> wouldn't block I guess?)
>>>>>>
>>>>>> -The purpose of this email is the fact that the topology "jams up"
>>>>>> when the ShellSpout has a heartbeat timeout.  I can see my PHP spout/bolt
>>>>>> still running (I added logging to them), but Storm itself is doing nothing.
>>>>>>
>>>>>> -I added logging to ShellSpout and recompiled, because I saw the log
>>>>>> message on like 233 (Halting process: ShellSpout died) but as noted the PHP
>>>>>> process was still running, so I was curious if _process.destroy(); failed.
>>>>>> But, my logging didn't appear.  I assumed I was compiling/deploying wrong.
>>>>>> Eventually I commented out line 234: _collector.reportError(exception);
>>>>>>  and everything started working!!!
>>>>>>
>>>>>> Does this make *any* sense?  Why would
>>>>>> _collector.reportError(exception); block and never return (I waited quite a
>>>>>> long time, 10's of minutes).  When I comment out line 234, Storm
>>>>>> immediately kills my bad tasks and respawns almost instantly.
>>>>>>
>>>>>> I feel fairly confident that this will be recreatable.  My topology:
>>>>>> -1 spout (ShellSpout)
>>>>>> -1 bolt (ShellBolt)
>>>>>> -The ShellSpout has a heartbeat timeout due to slow tasks in
>>>>>> ShellBolt + the pending queue is full
>>>>>>
>>>>>> Thanks for any feedback!
>>>>>>
>>>>>> will
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Alex Sobrino Beltrán
>>>> Registered Linux User #273657
>>>>
>>>> http://v5tech.es
>>>>
>>>
>>>
>>>
>>>
>>
>>
>
>
>

Re: ShellSpout hangs on reportError?

Posted by William Oberman <ob...@civicscience.com>.
Ok, I realized that I did NOT check if ShellSpout.die() was throwing a
RuntimeException.   I added a try/catch block, and it is!   The
RuntimeException is preventing _process.destroy and System.exit() from
happening, both of which need to happen to make topology recovery happen.

But, I'm not sure *why* this exception is happening yet, since it's an
interrupted exception and I don't think the exception tells me *who*
interrupted my thread...

2015-02-12T14:12:35.581-0500 b.s.s.ShellSpout [ERROR] die exception!
java.lang.RuntimeException: java.lang.InterruptedException
at backtype.storm.util$wrap_in_runtime.invoke(util.clj:44)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:102)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at backtype.storm.zookeeper$exists_node_QMARK_.invoke(zookeeper.clj:98)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at backtype.storm.zookeeper$mkdirs.invoke(zookeeper.clj:114)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
backtype.storm.cluster$mk_distributed_cluster_state$reify__3533.mkdirs(cluster.clj:119)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
backtype.storm.cluster$mk_storm_cluster_state$reify__3990.report_error(cluster.clj:400)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
backtype.storm.daemon.executor$throttled_report_error_fn$fn__5565.invoke(executor.clj:180)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
backtype.storm.daemon.executor$fn__5717$fn$reify__5759.reportError(executor.clj:533)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
backtype.storm.spout.SpoutOutputCollector.reportError(SpoutOutputCollector.java:132)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at backtype.storm.spout.ShellSpout.die(ShellSpout.java:235)
[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at backtype.storm.spout.ShellSpout.access$200(ShellSpout.java:42)
[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
[na:1.7.0_71]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
[na:1.7.0_71]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
[na:1.7.0_71]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[na:1.7.0_71]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_71]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_71]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[na:1.7.0_71]
at java.lang.Object.wait(Object.java:503) ~[na:1.7.0_71]
at
org.apache.storm.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at org.apache.storm.zookeeper.ZooKeeper.exists(ZooKeeper.java:1040)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:172)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:161)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at org.apache.storm.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
org.apache.storm.curator.framework.imps.ExistsBuilderImpl.pathInForeground(ExistsBuilderImpl.java:157)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:148)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:36)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:101)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
... 17 common frames omitted


On Wed, Feb 11, 2015 at 4:27 PM, William Oberman <ob...@civicscience.com>
wrote:

> I'm not sure what I've learned adds up to yet....
>
> I tried setting up a local storm development environment.  By mistake, I
> forgot to switch to 0.9.3-branch (e.g. I was working on master at first).
> In master, I was seeing the ShellBolt heartbeat fail first (which in
> retrospect makes sense!).  Then I remembered I was trying to debug 0.9.3,
> so I switched to 0.9.3-branch.  In the branch, I see ShellSpout heartbeat
> fail first and jam up, so that's good (?).  In both cases, I was trying
> local mode rather than cluster mode, as I figured I had a better shot at
> debugging in local mode.
>
> At this point, I figured I had a good test case.  This was all using
> command line tools (git, mvn) so far.   Two thoughts at this point:  1.) I
> don't plan on running master, and it's not 100% clear to me if ShellBolt
> failing first will solve any problems...  2.) in 0.9.3-branch, after
> ShellSpout fails and everything jams up if I "ctl-c" the process I
> immediately see the ShellBolt heartbeat timeout message.  It's like it was
> waiting to write, but was blocked on something (hrmmmm....?)
>
> But, I hit the limits of my ability to add "System.out" debugging, so I
> tried setting up storm in IntelliJ.  That took a bit, but I finally figured
> out how to run a topology in local mode with a debugger.  Once again, I was
> seeing ShellSpout heartbeat fail and then nothing happen.
>
> The next problem is that I don't know how to setup IntelliJ to understand
> clojure compiled code (at least, I think that's my problem....) so the
> "Step Into/Out of" information is really weird in the debugger.  The
> best/most complete stack trace I have is:
> invoke():102, zookeeper$exists_node_QMARK_$fn__3279 (backtype.storm)
> invoke():98, zookeeper$exists_node_QMARK_ (backtype.storm)
> invoke():114, zookeeper$mkdirs (backtype.storm)
> mkdirs():119, cluster$mk_distributed_cluster_state$reify__3526
> (backtype.storm)
> report_error():397, cluster$mk_storm_cluster_state$reify__3983
> (backtype.storm)
> invoke():180, executor$throttled_report_error_fn$fn__5548
> (backtype.storm.daemon)
> reportError():533, executor$fn__5700$fn$reify__5742 (backtype.storm.daemon)
> reportError():132, SpoutOutputCollector (backtype.storm.spout)
>
> I had a better stack trace (that I lost) that lead into:
> org.apache.storm.curator.RetryLoop.callWithRetry()
> which for me is my "prime suspect" (based on name alone) for something
> that is blocking things up....  :-)
>
> Though, once again, not understanding the big picture of storm, I have no
> idea what all of the above adds up to in terms of what's wrong, and how to
> fix it still....
>
> will
>
> On Wed, Feb 11, 2015 at 1:31 PM, William Oberman <oberman@civicscience.com
> > wrote:
>
>> I'm glad to hear I'm not the only one!
>>
>> (no new news yet)
>>
>>
>> On Wed, Feb 11, 2015 at 3:09 AM, Alex Sobrino <al...@v5tech.es> wrote:
>>
>>> Hi William,
>>>
>>> I'm having the same problem running a multilang topology (written in
>>> python). If you find a solution, please post it here, it will sure help us.
>>>
>>> To upgrade from 0.9.2-incubating we updated storm.py (
>>> https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py)
>>> and pom.xml.
>>>
>>> Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it
>>> works like hell.
>>>
>>> Best regards,
>>>
>>> On Tue, Feb 10, 2015 at 8:59 PM, William Oberman <
>>> oberman@civicscience.com> wrote:
>>>
>>>> I'm not sure the best way to share a test case.  I'll copy and paste
>>>> code below....  If you run the below code (and find the worker that was
>>>> running it's log file), you should see in ~30 seconds:
>>>> ====
>>>> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process:
>>>> ShellSpout died.
>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>         at
>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>         at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> [na:1.7.0_71]
>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>>>> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR]
>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>         at
>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>         at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> [na:1.7.0_71]
>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>>>> =======
>>>>
>>>> But, the topology will run in a kind of weird zombie state forever.
>>>> More specifically I see the multilang bolt process all tuples in the
>>>> pending queue, and then an infinite loop of nextTuple()/fail() from the
>>>> multilang spout.  But, as noted in my original email, if I comment out:
>>>>  _collector.reportError(exception);
>>>> in the Java ShellSpout then the worker will immediately die and respawn.
>>>>
>>>> If no one can help, the next step for me is rough, as I'll have to
>>>> learn how to actually develop and debug storm itself, which is usually at
>>>> least 10x harder than just using something :-)
>>>>
>>>> In any case, my test code:
>>>>
>>>> Topology = 1 process with two tasks (multilang spout and bolt), and
>>>> small pool of pending messages (yes, using the word count example in
>>>> storm-starter as a starting point....)
>>>> =============
>>>> public class SlowTopology {
>>>>   public static class SlowPhpBolt extends ShellBolt implements
>>>> IRichBolt {
>>>>
>>>>     public SlowPhpBolt() {
>>>>       super("php", "slowBolt.php");
>>>>     }
>>>>
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>       declarer.declare(new Fields());
>>>>     }
>>>>
>>>>     @Override
>>>>     public Map<String, Object> getComponentConfiguration() {
>>>>       return null;
>>>>     }
>>>>   }
>>>>
>>>>   public static class SlowPhpSpout extends ShellSpout implements
>>>> IRichSpout {
>>>>
>>>>       public SlowPhpSpout() {
>>>>           super("php", "slowSpout.php");
>>>>       }
>>>>
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>>         ofd.declare(new Fields("output"));
>>>>     }
>>>>
>>>>     @Override
>>>>     public Map<String, Object> getComponentConfiguration() {
>>>>         return null;
>>>>     }
>>>>   }
>>>>
>>>>   public static void main(String[] args) throws Exception {
>>>>
>>>>     TopologyBuilder builder = new TopologyBuilder();
>>>>
>>>>     builder.setSpout("spout", new SlowPhpSpout(),
>>>> 1).setNumTasks(1).setMaxSpoutPending(3);
>>>>     builder.setBolt("bolt", new SlowPhpBolt(),
>>>> 1).setNumTasks(1).shuffleGrouping("spout");
>>>>
>>>>     Config conf = new Config();
>>>>     conf.setDebug(true);
>>>>
>>>>     if (args != null && args.length > 0) {
>>>>       conf.setNumWorkers(1);
>>>>       StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
>>>> builder.createTopology());
>>>>     }
>>>>     else {
>>>>       LocalCluster cluster = new LocalCluster();
>>>>       cluster.submitTopology("slow", conf, builder.createTopology());
>>>>       Thread.sleep(10000);
>>>>       cluster.shutdown();
>>>>     }
>>>>   }
>>>> }
>>>> ===========
>>>>
>>>> slowSpout.php
>>>> ==========
>>>> <?php
>>>> require_once "storm.php";
>>>> class slowSpout extends \ShellSpout {
>>>>   protected function nextTuple() {
>>>>     $value = rand(0,100);
>>>>     $id = rand(0, 100);
>>>>     $this->emit(array($value), $id);
>>>>     file_put_contents("/tmp/storm_slow.log",
>>>> "nextTuple()->value[$value] id[$id]\n", FILE_APPEND);
>>>>     sleep(1);
>>>>   }
>>>>   protected function ack($id) {
>>>>     file_put_contents("/tmp/storm_slow.log", "ack($id)\n", FILE_APPEND);
>>>>   }
>>>>
>>>>   protected function fail($id) {
>>>>     file_put_contents("/tmp/storm_slow.log", "fail($id)\n",
>>>> FILE_APPEND);
>>>>   }
>>>> }
>>>>
>>>> (new slowSpout())->run();
>>>> ===========
>>>>
>>>> slowBolt.php
>>>> ============
>>>> <?php
>>>> require_once "storm.php";
>>>> class slowBolt extends \BasicBolt {
>>>>   protected function process(\Tuple $t) {
>>>>     $sleep = rand(1, 180);
>>>>     file_put_contents("/tmp/storm_slow.log", "process(".print_r($t,
>>>> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND);
>>>>     sleep($sleep);
>>>>   }
>>>> }
>>>> (new slowBolt())->run();
>>>> ============
>>>>
>>>> storm.php  (from  https://github.com/lazyshot/storm-php, and I think I
>>>> added more error checking on reads/writes to standard in/out, added sync()
>>>> to the ShellSpout to make new classes easier to write, and the new
>>>> heartbeat protocol)
>>>> =========
>>>> <?php
>>>> interface iShellBolt {
>>>> }
>>>>
>>>> interface iShellSpout {
>>>> }
>>>>
>>>> class Tuple {
>>>>     public $id, $component, $stream, $task, $values;
>>>>
>>>>     public function __construct($id, $component, $stream, $task,
>>>> $values) {
>>>>         $this->id = $id;
>>>>         $this->component = $component;
>>>>         $this->stream = $stream;
>>>>         $this->task = $task;
>>>>         $this->values = $values;
>>>>     }
>>>> }
>>>>
>>>> abstract class ShellComponent {
>>>>     protected $pid;
>>>>     protected $stormConf;
>>>>     protected $topologyContext;
>>>>
>>>>     protected $stormInc = null;
>>>>
>>>>     public function __construct() {
>>>>         $this->pid = getmypid();
>>>>         $this->sendCommand(array("pid" => $this->pid));
>>>>
>>>>         $handshake = $this->parseMessage($this->waitForMessage());
>>>>
>>>>         $this->stormConf = $handshake['conf'];
>>>>         $this->topologyContext = $handshake['context'];
>>>>         $pidDir = $handshake['pidDir'];
>>>>
>>>>         @fclose(@fopen($pidDir . "/" . $this->pid, "w"));
>>>>     }
>>>>
>>>>     protected function readLine() {
>>>>         $raw = fgets(STDIN);
>>>>
>>>>         if ($raw === false) {
>>>>             throw new Exception("STDIN is broken");
>>>>         }
>>>>
>>>>         $line = trim($raw);
>>>>
>>>>         return $line;
>>>>     }
>>>>
>>>>     protected function waitForMessage() {
>>>>         $message = '';
>>>>         while (true) {
>>>>             $line = trim($this->readLine());
>>>>
>>>>             if (strlen($line) == 0) {
>>>>                 continue;
>>>>             } else if ($line == 'end') {
>>>>                 break;
>>>>             } else if ($line == 'sync') {
>>>>                 $message = '';
>>>>                 continue;
>>>>             }
>>>>
>>>>             $message .= $line . "\n";
>>>>         }
>>>>
>>>>         return trim($message);
>>>>     }
>>>>
>>>>     protected function sendCommand(array $command) {
>>>>         $this->sendMessage(json_encode($command));
>>>>     }
>>>>
>>>>     protected function sendLog($message) {
>>>>         return $this->sendCommand(array(
>>>>             'command' => 'log',
>>>>             'msg' => $message
>>>>         ));
>>>>     }
>>>>
>>>>     protected function parseMessage($message) {
>>>>         $msg = json_decode($message, true);
>>>>
>>>>         if ($msg) {
>>>>             return $msg;
>>>>         } else {
>>>>             return $message;
>>>>         }
>>>>     }
>>>>
>>>>     protected function sendMessage($message) {
>>>>         $message = "$message\nend\n";
>>>>         $bytesWritten = fwrite(STDOUT, $message);
>>>>         fflush(STDOUT);
>>>>         if ($bytesWritten === false) {
>>>>             throw new Exception("STDOUT is broken");
>>>>         }
>>>>         if ($bytesWritten != strlen($message)) {
>>>>             throw new Exception("Unable to write all bytes to STDOUT
>>>> (message=$message, bytesWritten=$bytesWritten)");
>>>>         }
>>>>     }
>>>>
>>>>     final protected function sync() {
>>>>         $command = array(
>>>>             'command' => 'sync',
>>>>         );
>>>>
>>>>         $this->sendCommand($command);
>>>>     }
>>>>
>>>> }
>>>>
>>>> abstract class ShellBolt extends ShellComponent implements iShellBolt {
>>>>
>>>>     public $anchor_tuple = null;
>>>>
>>>>     public function __construct() {
>>>>         parent::__construct();
>>>>
>>>>         $this->init($this->stormConf, $this->topologyContext);
>>>>     }
>>>>
>>>>     public function run() {
>>>>         try {
>>>>             while (true) {
>>>>                 $command = $this->parseMessage($this->waitForMessage());
>>>>
>>>>                 if (is_array($command)) {
>>>>                     if (isset($command['tuple'])) {
>>>>                         $tupleMap = array_merge(array(
>>>>                                 'id' => null,
>>>>                                 'comp' => null,
>>>>                                 'stream' => null,
>>>>                                 'task' => null,
>>>>                                 'tuple' => null
>>>>                             ),
>>>>
>>>>                             $command);
>>>>
>>>>                         if($tupleMap['task'] == -1 &&
>>>> $tupleMap['stream'] == "__heartbeat") {
>>>>                             $this->sync();
>>>>                         } else {
>>>>                             $tuple = new Tuple($tupleMap['id'],
>>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>>>> $tupleMap['tuple']);
>>>>                             $this->process($tuple);
>>>>                         }
>>>>                     }
>>>>                 }
>>>>             }
>>>>         } catch (Exception $e) {
>>>>             $this->sendLog((string)$e);
>>>>         }
>>>>     }
>>>>
>>>>     abstract protected function process(Tuple $tuple);
>>>>
>>>>     protected function init($conf, $topology) {
>>>>         return;
>>>>     }
>>>>
>>>>     protected function emitTuple(array $tuple, $stream = null, $anchors
>>>> = array(), $directTask = null) {
>>>>         if ($this->anchor_tuple !== null) {
>>>>             $anchors = array($this->anchor_tuple);
>>>>         }
>>>>
>>>>         $command = array(
>>>>             'command' => 'emit'
>>>>         );
>>>>
>>>>         if ($stream !== null) {
>>>>             $command['stream'] = $stream;
>>>>         }
>>>>
>>>>         $command['anchors'] = array_map(function ($a) {
>>>>             return $a->id;
>>>>         }, $anchors);
>>>>
>>>>         if ($directTask !== null) {
>>>>             $command['task'] = $directTask;
>>>>         }
>>>>
>>>>         $command['tuple'] = $tuple;
>>>>
>>>>         $this->sendCommand($command);
>>>>     }
>>>>
>>>>     protected function emit($tuple, $stream = null, $anchors = array())
>>>> {
>>>>         $this->emitTuple($tuple, $stream, $anchors);
>>>>     }
>>>>
>>>>     protected function emitDirect($directTask, $tuple, $stream = null,
>>>> $anchors = array()) {
>>>>         $this->emitTuple($tuple, $stream, $anchors, $directTask);
>>>>     }
>>>>
>>>>     protected function ack(Tuple $tuple) {
>>>>         $command = array(
>>>>             'command' => 'ack',
>>>>             'id' => $tuple->id
>>>>         );
>>>>
>>>>         $this->sendCommand($command);
>>>>     }
>>>>
>>>>     protected function fail(Tuple $tuple) {
>>>>         $command = array(
>>>>             'command' => 'fail',
>>>>             'id' => $tuple->id
>>>>         );
>>>>
>>>>         $this->sendCommand($command);
>>>>     }
>>>> }
>>>>
>>>> abstract class BasicBolt extends ShellBolt {
>>>>     public function run() {
>>>>         try {
>>>>             while (true) {
>>>>                 $command = $this->parseMessage($this->waitForMessage());
>>>>
>>>>                 if (is_array($command)) {
>>>>                     if (isset($command['tuple'])) {
>>>>                         $tupleMap = array_merge(array(
>>>>                                 'id' => null,
>>>>                                 'comp' => null,
>>>>                                 'stream' => null,
>>>>                                 'task' => null,
>>>>                                 'tuple' => null
>>>>                             ),
>>>>
>>>>                             $command);
>>>>
>>>>                         if($tupleMap['task'] == -1 &&
>>>> $tupleMap['stream'] == "__heartbeat") {
>>>>                             $this->sync();
>>>>                         } else {
>>>>                             $tuple = new Tuple($tupleMap['id'],
>>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>>>> $tupleMap['tuple']);
>>>>
>>>>                             $this->anchor_tuple = $tuple;
>>>>
>>>>                             try {
>>>>                                 $processed = $this->process($tuple);
>>>>
>>>>                                 $this->ack($tuple);
>>>>                             } catch (BoltProcessException $e) {
>>>>                                 $this->fail($tuple);
>>>>                             }
>>>>                         }
>>>>                     }
>>>>                 }
>>>>             }
>>>>         } catch (Exception $e) {
>>>>             $this->sendLog((string)$e);
>>>>         }
>>>>
>>>>     }
>>>> }
>>>>
>>>> abstract class ShellSpout extends ShellComponent implements iShellSpout
>>>> {
>>>>     protected $tuples = array();
>>>>
>>>>     public function __construct() {
>>>>         parent::__construct();
>>>>
>>>>         $this->init($this->stormConf, $this->topologyContext);
>>>>     }
>>>>
>>>>
>>>>     abstract protected function nextTuple();
>>>>
>>>>     abstract protected function ack($tuple_id);
>>>>
>>>>     abstract protected function fail($tuple_id);
>>>>
>>>>     public function run() {
>>>>         try {
>>>>             while (true) {
>>>>                 $command = $this->parseMessage($this->waitForMessage());
>>>>
>>>>                 if (is_array($command)) {
>>>>                     if (isset($command['command'])) {
>>>>                         if ($command['command'] == 'ack') {
>>>>                             $this->ack($command['id']);
>>>>                             $this->sync();
>>>>                         } else if ($command['command'] == 'fail') {
>>>>                             $this->fail($command['id']);
>>>>                             $this->sync();
>>>>                         } else if ($command['command'] == 'next') {
>>>>                             $this->nextTuple();
>>>>                             $this->sync();
>>>>                         }
>>>>                     }
>>>>                 }
>>>>             }
>>>>         } catch (Exception $e) {
>>>>             $this->sendLog((string)$e);
>>>>             $this->sync();
>>>>         }
>>>>     }
>>>>
>>>>     protected function init($stormConf, $topologyContext) {
>>>>         return;
>>>>     }
>>>>
>>>>     final protected function emit(array $tuple, $messageId = null,
>>>> $streamId = null) {
>>>>         return $this->emitTuple($tuple, $messageId, $streamId, null);
>>>>     }
>>>>
>>>>     final protected function emitDirect($directTask, array $tuple,
>>>> $messageId = null, $streamId = null) {
>>>>         return $this->emitTuple($tuple, $messageId, $streamId,
>>>> $directTask);
>>>>     }
>>>>
>>>>     final private function emitTuple(array $tuple, $messageId = null,
>>>> $streamId = null, $directTask = null) {
>>>>         $command = array(
>>>>             'command' => 'emit'
>>>>         );
>>>>
>>>>         if ($messageId !== null) {
>>>>             $command['id'] = $messageId;
>>>>         }
>>>>
>>>>         if ($streamId !== null) {
>>>>             $command['stream'] = $streamId;
>>>>         }
>>>>
>>>>         if ($directTask !== null) {
>>>>             $command['task'] = $directTask;
>>>>         }
>>>>
>>>>         $command['tuple'] = $tuple;
>>>>
>>>>         return $this->sendCommand($command);
>>>>     }
>>>> }
>>>>
>>>> class BoltProcessException extends Exception {
>>>> }
>>>>
>>>> =========================
>>>>
>>>>
>>>> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman <
>>>> oberman@civicscience.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> For reference, I'm talking about 0.9.3 ShellSpout, line 234.
>>>>>
>>>>> I'll try to cover the important facts that led to this issue:
>>>>>
>>>>> -I was on 0.9.2 using multilang to bridge to PHP to get to some
>>>>> existing business logic
>>>>>
>>>>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition
>>>>> to the ShellBolt protocol)
>>>>>
>>>>> -I have some odd topologies where I try to do some legacy background
>>>>> processing.  This processing takes a highly variable amount time in the
>>>>> Bolts, from milliseconds to minutes.  But, eventually due to randomness the
>>>>> spout's "pending" pool fills up, causing the spout to block on nextTuple,
>>>>> which eventually causes a heartbeat timeout. (I believe my only fix is to
>>>>> increase the heartbeat timeout at the topology level. that's not the
>>>>> purpose of this email, though confirmation of this as my only workaround
>>>>> would be appreciated!  I feel like this wasn't anticipated when the
>>>>> heartbeat patch was designed, as it was assumed the spout's nextTuple
>>>>> wouldn't block I guess?)
>>>>>
>>>>> -The purpose of this email is the fact that the topology "jams up"
>>>>> when the ShellSpout has a heartbeat timeout.  I can see my PHP spout/bolt
>>>>> still running (I added logging to them), but Storm itself is doing nothing.
>>>>>
>>>>> -I added logging to ShellSpout and recompiled, because I saw the log
>>>>> message on like 233 (Halting process: ShellSpout died) but as noted the PHP
>>>>> process was still running, so I was curious if _process.destroy(); failed.
>>>>> But, my logging didn't appear.  I assumed I was compiling/deploying wrong.
>>>>> Eventually I commented out line 234: _collector.reportError(exception);
>>>>>  and everything started working!!!
>>>>>
>>>>> Does this make *any* sense?  Why would
>>>>> _collector.reportError(exception); block and never return (I waited quite a
>>>>> long time, 10's of minutes).  When I comment out line 234, Storm
>>>>> immediately kills my bad tasks and respawns almost instantly.
>>>>>
>>>>> I feel fairly confident that this will be recreatable.  My topology:
>>>>> -1 spout (ShellSpout)
>>>>> -1 bolt (ShellBolt)
>>>>> -The ShellSpout has a heartbeat timeout due to slow tasks in ShellBolt
>>>>> + the pending queue is full
>>>>>
>>>>> Thanks for any feedback!
>>>>>
>>>>> will
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Alex Sobrino Beltrán
>>> Registered Linux User #273657
>>>
>>> http://v5tech.es
>>>
>>
>>
>>
>>
>
>

Re: ShellSpout hangs on reportError?

Posted by William Oberman <ob...@civicscience.com>.
I'm not sure what I've learned adds up to yet....

I tried setting up a local storm development environment.  By mistake, I
forgot to switch to 0.9.3-branch (e.g. I was working on master at first).
In master, I was seeing the ShellBolt heartbeat fail first (which in
retrospect makes sense!).  Then I remembered I was trying to debug 0.9.3,
so I switched to 0.9.3-branch.  In the branch, I see ShellSpout heartbeat
fail first and jam up, so that's good (?).  In both cases, I was trying
local mode rather than cluster mode, as I figured I had a better shot at
debugging in local mode.

At this point, I figured I had a good test case.  This was all using
command line tools (git, mvn) so far.   Two thoughts at this point:  1.) I
don't plan on running master, and it's not 100% clear to me if ShellBolt
failing first will solve any problems...  2.) in 0.9.3-branch, after
ShellSpout fails and everything jams up if I "ctl-c" the process I
immediately see the ShellBolt heartbeat timeout message.  It's like it was
waiting to write, but was blocked on something (hrmmmm....?)

But, I hit the limits of my ability to add "System.out" debugging, so I
tried setting up storm in IntelliJ.  That took a bit, but I finally figured
out how to run a topology in local mode with a debugger.  Once again, I was
seeing ShellSpout heartbeat fail and then nothing happen.

The next problem is that I don't know how to setup IntelliJ to understand
clojure compiled code (at least, I think that's my problem....) so the
"Step Into/Out of" information is really weird in the debugger.  The
best/most complete stack trace I have is:
invoke():102, zookeeper$exists_node_QMARK_$fn__3279 (backtype.storm)
invoke():98, zookeeper$exists_node_QMARK_ (backtype.storm)
invoke():114, zookeeper$mkdirs (backtype.storm)
mkdirs():119, cluster$mk_distributed_cluster_state$reify__3526
(backtype.storm)
report_error():397, cluster$mk_storm_cluster_state$reify__3983
(backtype.storm)
invoke():180, executor$throttled_report_error_fn$fn__5548
(backtype.storm.daemon)
reportError():533, executor$fn__5700$fn$reify__5742 (backtype.storm.daemon)
reportError():132, SpoutOutputCollector (backtype.storm.spout)

I had a better stack trace (that I lost) that lead into:
org.apache.storm.curator.RetryLoop.callWithRetry()
which for me is my "prime suspect" (based on name alone) for something that
is blocking things up....  :-)

Though, once again, not understanding the big picture of storm, I have no
idea what all of the above adds up to in terms of what's wrong, and how to
fix it still....

will

On Wed, Feb 11, 2015 at 1:31 PM, William Oberman <ob...@civicscience.com>
wrote:

> I'm glad to hear I'm not the only one!
>
> (no new news yet)
>
>
> On Wed, Feb 11, 2015 at 3:09 AM, Alex Sobrino <al...@v5tech.es> wrote:
>
>> Hi William,
>>
>> I'm having the same problem running a multilang topology (written in
>> python). If you find a solution, please post it here, it will sure help us.
>>
>> To upgrade from 0.9.2-incubating we updated storm.py (
>> https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py)
>> and pom.xml.
>>
>> Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it
>> works like hell.
>>
>> Best regards,
>>
>> On Tue, Feb 10, 2015 at 8:59 PM, William Oberman <
>> oberman@civicscience.com> wrote:
>>
>>> I'm not sure the best way to share a test case.  I'll copy and paste
>>> code below....  If you run the below code (and find the worker that was
>>> running it's log file), you should see in ~30 seconds:
>>> ====
>>> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process:
>>> ShellSpout died.
>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>         at
>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>>> [storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> [na:1.7.0_71]
>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>>> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR]
>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>         at
>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>>> [storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> [na:1.7.0_71]
>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>>> =======
>>>
>>> But, the topology will run in a kind of weird zombie state forever.
>>> More specifically I see the multilang bolt process all tuples in the
>>> pending queue, and then an infinite loop of nextTuple()/fail() from the
>>> multilang spout.  But, as noted in my original email, if I comment out:
>>>  _collector.reportError(exception);
>>> in the Java ShellSpout then the worker will immediately die and respawn.
>>>
>>> If no one can help, the next step for me is rough, as I'll have to learn
>>> how to actually develop and debug storm itself, which is usually at least
>>> 10x harder than just using something :-)
>>>
>>> In any case, my test code:
>>>
>>> Topology = 1 process with two tasks (multilang spout and bolt), and
>>> small pool of pending messages (yes, using the word count example in
>>> storm-starter as a starting point....)
>>> =============
>>> public class SlowTopology {
>>>   public static class SlowPhpBolt extends ShellBolt implements IRichBolt
>>> {
>>>
>>>     public SlowPhpBolt() {
>>>       super("php", "slowBolt.php");
>>>     }
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>       declarer.declare(new Fields());
>>>     }
>>>
>>>     @Override
>>>     public Map<String, Object> getComponentConfiguration() {
>>>       return null;
>>>     }
>>>   }
>>>
>>>   public static class SlowPhpSpout extends ShellSpout implements
>>> IRichSpout {
>>>
>>>       public SlowPhpSpout() {
>>>           super("php", "slowSpout.php");
>>>       }
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>         ofd.declare(new Fields("output"));
>>>     }
>>>
>>>     @Override
>>>     public Map<String, Object> getComponentConfiguration() {
>>>         return null;
>>>     }
>>>   }
>>>
>>>   public static void main(String[] args) throws Exception {
>>>
>>>     TopologyBuilder builder = new TopologyBuilder();
>>>
>>>     builder.setSpout("spout", new SlowPhpSpout(),
>>> 1).setNumTasks(1).setMaxSpoutPending(3);
>>>     builder.setBolt("bolt", new SlowPhpBolt(),
>>> 1).setNumTasks(1).shuffleGrouping("spout");
>>>
>>>     Config conf = new Config();
>>>     conf.setDebug(true);
>>>
>>>     if (args != null && args.length > 0) {
>>>       conf.setNumWorkers(1);
>>>       StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
>>> builder.createTopology());
>>>     }
>>>     else {
>>>       LocalCluster cluster = new LocalCluster();
>>>       cluster.submitTopology("slow", conf, builder.createTopology());
>>>       Thread.sleep(10000);
>>>       cluster.shutdown();
>>>     }
>>>   }
>>> }
>>> ===========
>>>
>>> slowSpout.php
>>> ==========
>>> <?php
>>> require_once "storm.php";
>>> class slowSpout extends \ShellSpout {
>>>   protected function nextTuple() {
>>>     $value = rand(0,100);
>>>     $id = rand(0, 100);
>>>     $this->emit(array($value), $id);
>>>     file_put_contents("/tmp/storm_slow.log", "nextTuple()->value[$value]
>>> id[$id]\n", FILE_APPEND);
>>>     sleep(1);
>>>   }
>>>   protected function ack($id) {
>>>     file_put_contents("/tmp/storm_slow.log", "ack($id)\n", FILE_APPEND);
>>>   }
>>>
>>>   protected function fail($id) {
>>>     file_put_contents("/tmp/storm_slow.log", "fail($id)\n", FILE_APPEND);
>>>   }
>>> }
>>>
>>> (new slowSpout())->run();
>>> ===========
>>>
>>> slowBolt.php
>>> ============
>>> <?php
>>> require_once "storm.php";
>>> class slowBolt extends \BasicBolt {
>>>   protected function process(\Tuple $t) {
>>>     $sleep = rand(1, 180);
>>>     file_put_contents("/tmp/storm_slow.log", "process(".print_r($t,
>>> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND);
>>>     sleep($sleep);
>>>   }
>>> }
>>> (new slowBolt())->run();
>>> ============
>>>
>>> storm.php  (from  https://github.com/lazyshot/storm-php, and I think I
>>> added more error checking on reads/writes to standard in/out, added sync()
>>> to the ShellSpout to make new classes easier to write, and the new
>>> heartbeat protocol)
>>> =========
>>> <?php
>>> interface iShellBolt {
>>> }
>>>
>>> interface iShellSpout {
>>> }
>>>
>>> class Tuple {
>>>     public $id, $component, $stream, $task, $values;
>>>
>>>     public function __construct($id, $component, $stream, $task,
>>> $values) {
>>>         $this->id = $id;
>>>         $this->component = $component;
>>>         $this->stream = $stream;
>>>         $this->task = $task;
>>>         $this->values = $values;
>>>     }
>>> }
>>>
>>> abstract class ShellComponent {
>>>     protected $pid;
>>>     protected $stormConf;
>>>     protected $topologyContext;
>>>
>>>     protected $stormInc = null;
>>>
>>>     public function __construct() {
>>>         $this->pid = getmypid();
>>>         $this->sendCommand(array("pid" => $this->pid));
>>>
>>>         $handshake = $this->parseMessage($this->waitForMessage());
>>>
>>>         $this->stormConf = $handshake['conf'];
>>>         $this->topologyContext = $handshake['context'];
>>>         $pidDir = $handshake['pidDir'];
>>>
>>>         @fclose(@fopen($pidDir . "/" . $this->pid, "w"));
>>>     }
>>>
>>>     protected function readLine() {
>>>         $raw = fgets(STDIN);
>>>
>>>         if ($raw === false) {
>>>             throw new Exception("STDIN is broken");
>>>         }
>>>
>>>         $line = trim($raw);
>>>
>>>         return $line;
>>>     }
>>>
>>>     protected function waitForMessage() {
>>>         $message = '';
>>>         while (true) {
>>>             $line = trim($this->readLine());
>>>
>>>             if (strlen($line) == 0) {
>>>                 continue;
>>>             } else if ($line == 'end') {
>>>                 break;
>>>             } else if ($line == 'sync') {
>>>                 $message = '';
>>>                 continue;
>>>             }
>>>
>>>             $message .= $line . "\n";
>>>         }
>>>
>>>         return trim($message);
>>>     }
>>>
>>>     protected function sendCommand(array $command) {
>>>         $this->sendMessage(json_encode($command));
>>>     }
>>>
>>>     protected function sendLog($message) {
>>>         return $this->sendCommand(array(
>>>             'command' => 'log',
>>>             'msg' => $message
>>>         ));
>>>     }
>>>
>>>     protected function parseMessage($message) {
>>>         $msg = json_decode($message, true);
>>>
>>>         if ($msg) {
>>>             return $msg;
>>>         } else {
>>>             return $message;
>>>         }
>>>     }
>>>
>>>     protected function sendMessage($message) {
>>>         $message = "$message\nend\n";
>>>         $bytesWritten = fwrite(STDOUT, $message);
>>>         fflush(STDOUT);
>>>         if ($bytesWritten === false) {
>>>             throw new Exception("STDOUT is broken");
>>>         }
>>>         if ($bytesWritten != strlen($message)) {
>>>             throw new Exception("Unable to write all bytes to STDOUT
>>> (message=$message, bytesWritten=$bytesWritten)");
>>>         }
>>>     }
>>>
>>>     final protected function sync() {
>>>         $command = array(
>>>             'command' => 'sync',
>>>         );
>>>
>>>         $this->sendCommand($command);
>>>     }
>>>
>>> }
>>>
>>> abstract class ShellBolt extends ShellComponent implements iShellBolt {
>>>
>>>     public $anchor_tuple = null;
>>>
>>>     public function __construct() {
>>>         parent::__construct();
>>>
>>>         $this->init($this->stormConf, $this->topologyContext);
>>>     }
>>>
>>>     public function run() {
>>>         try {
>>>             while (true) {
>>>                 $command = $this->parseMessage($this->waitForMessage());
>>>
>>>                 if (is_array($command)) {
>>>                     if (isset($command['tuple'])) {
>>>                         $tupleMap = array_merge(array(
>>>                                 'id' => null,
>>>                                 'comp' => null,
>>>                                 'stream' => null,
>>>                                 'task' => null,
>>>                                 'tuple' => null
>>>                             ),
>>>
>>>                             $command);
>>>
>>>                         if($tupleMap['task'] == -1 &&
>>> $tupleMap['stream'] == "__heartbeat") {
>>>                             $this->sync();
>>>                         } else {
>>>                             $tuple = new Tuple($tupleMap['id'],
>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>>> $tupleMap['tuple']);
>>>                             $this->process($tuple);
>>>                         }
>>>                     }
>>>                 }
>>>             }
>>>         } catch (Exception $e) {
>>>             $this->sendLog((string)$e);
>>>         }
>>>     }
>>>
>>>     abstract protected function process(Tuple $tuple);
>>>
>>>     protected function init($conf, $topology) {
>>>         return;
>>>     }
>>>
>>>     protected function emitTuple(array $tuple, $stream = null, $anchors
>>> = array(), $directTask = null) {
>>>         if ($this->anchor_tuple !== null) {
>>>             $anchors = array($this->anchor_tuple);
>>>         }
>>>
>>>         $command = array(
>>>             'command' => 'emit'
>>>         );
>>>
>>>         if ($stream !== null) {
>>>             $command['stream'] = $stream;
>>>         }
>>>
>>>         $command['anchors'] = array_map(function ($a) {
>>>             return $a->id;
>>>         }, $anchors);
>>>
>>>         if ($directTask !== null) {
>>>             $command['task'] = $directTask;
>>>         }
>>>
>>>         $command['tuple'] = $tuple;
>>>
>>>         $this->sendCommand($command);
>>>     }
>>>
>>>     protected function emit($tuple, $stream = null, $anchors = array()) {
>>>         $this->emitTuple($tuple, $stream, $anchors);
>>>     }
>>>
>>>     protected function emitDirect($directTask, $tuple, $stream = null,
>>> $anchors = array()) {
>>>         $this->emitTuple($tuple, $stream, $anchors, $directTask);
>>>     }
>>>
>>>     protected function ack(Tuple $tuple) {
>>>         $command = array(
>>>             'command' => 'ack',
>>>             'id' => $tuple->id
>>>         );
>>>
>>>         $this->sendCommand($command);
>>>     }
>>>
>>>     protected function fail(Tuple $tuple) {
>>>         $command = array(
>>>             'command' => 'fail',
>>>             'id' => $tuple->id
>>>         );
>>>
>>>         $this->sendCommand($command);
>>>     }
>>> }
>>>
>>> abstract class BasicBolt extends ShellBolt {
>>>     public function run() {
>>>         try {
>>>             while (true) {
>>>                 $command = $this->parseMessage($this->waitForMessage());
>>>
>>>                 if (is_array($command)) {
>>>                     if (isset($command['tuple'])) {
>>>                         $tupleMap = array_merge(array(
>>>                                 'id' => null,
>>>                                 'comp' => null,
>>>                                 'stream' => null,
>>>                                 'task' => null,
>>>                                 'tuple' => null
>>>                             ),
>>>
>>>                             $command);
>>>
>>>                         if($tupleMap['task'] == -1 &&
>>> $tupleMap['stream'] == "__heartbeat") {
>>>                             $this->sync();
>>>                         } else {
>>>                             $tuple = new Tuple($tupleMap['id'],
>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>>> $tupleMap['tuple']);
>>>
>>>                             $this->anchor_tuple = $tuple;
>>>
>>>                             try {
>>>                                 $processed = $this->process($tuple);
>>>
>>>                                 $this->ack($tuple);
>>>                             } catch (BoltProcessException $e) {
>>>                                 $this->fail($tuple);
>>>                             }
>>>                         }
>>>                     }
>>>                 }
>>>             }
>>>         } catch (Exception $e) {
>>>             $this->sendLog((string)$e);
>>>         }
>>>
>>>     }
>>> }
>>>
>>> abstract class ShellSpout extends ShellComponent implements iShellSpout {
>>>     protected $tuples = array();
>>>
>>>     public function __construct() {
>>>         parent::__construct();
>>>
>>>         $this->init($this->stormConf, $this->topologyContext);
>>>     }
>>>
>>>
>>>     abstract protected function nextTuple();
>>>
>>>     abstract protected function ack($tuple_id);
>>>
>>>     abstract protected function fail($tuple_id);
>>>
>>>     public function run() {
>>>         try {
>>>             while (true) {
>>>                 $command = $this->parseMessage($this->waitForMessage());
>>>
>>>                 if (is_array($command)) {
>>>                     if (isset($command['command'])) {
>>>                         if ($command['command'] == 'ack') {
>>>                             $this->ack($command['id']);
>>>                             $this->sync();
>>>                         } else if ($command['command'] == 'fail') {
>>>                             $this->fail($command['id']);
>>>                             $this->sync();
>>>                         } else if ($command['command'] == 'next') {
>>>                             $this->nextTuple();
>>>                             $this->sync();
>>>                         }
>>>                     }
>>>                 }
>>>             }
>>>         } catch (Exception $e) {
>>>             $this->sendLog((string)$e);
>>>             $this->sync();
>>>         }
>>>     }
>>>
>>>     protected function init($stormConf, $topologyContext) {
>>>         return;
>>>     }
>>>
>>>     final protected function emit(array $tuple, $messageId = null,
>>> $streamId = null) {
>>>         return $this->emitTuple($tuple, $messageId, $streamId, null);
>>>     }
>>>
>>>     final protected function emitDirect($directTask, array $tuple,
>>> $messageId = null, $streamId = null) {
>>>         return $this->emitTuple($tuple, $messageId, $streamId,
>>> $directTask);
>>>     }
>>>
>>>     final private function emitTuple(array $tuple, $messageId = null,
>>> $streamId = null, $directTask = null) {
>>>         $command = array(
>>>             'command' => 'emit'
>>>         );
>>>
>>>         if ($messageId !== null) {
>>>             $command['id'] = $messageId;
>>>         }
>>>
>>>         if ($streamId !== null) {
>>>             $command['stream'] = $streamId;
>>>         }
>>>
>>>         if ($directTask !== null) {
>>>             $command['task'] = $directTask;
>>>         }
>>>
>>>         $command['tuple'] = $tuple;
>>>
>>>         return $this->sendCommand($command);
>>>     }
>>> }
>>>
>>> class BoltProcessException extends Exception {
>>> }
>>>
>>> =========================
>>>
>>>
>>> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman <
>>> oberman@civicscience.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> For reference, I'm talking about 0.9.3 ShellSpout, line 234.
>>>>
>>>> I'll try to cover the important facts that led to this issue:
>>>>
>>>> -I was on 0.9.2 using multilang to bridge to PHP to get to some
>>>> existing business logic
>>>>
>>>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition
>>>> to the ShellBolt protocol)
>>>>
>>>> -I have some odd topologies where I try to do some legacy background
>>>> processing.  This processing takes a highly variable amount time in the
>>>> Bolts, from milliseconds to minutes.  But, eventually due to randomness the
>>>> spout's "pending" pool fills up, causing the spout to block on nextTuple,
>>>> which eventually causes a heartbeat timeout. (I believe my only fix is to
>>>> increase the heartbeat timeout at the topology level. that's not the
>>>> purpose of this email, though confirmation of this as my only workaround
>>>> would be appreciated!  I feel like this wasn't anticipated when the
>>>> heartbeat patch was designed, as it was assumed the spout's nextTuple
>>>> wouldn't block I guess?)
>>>>
>>>> -The purpose of this email is the fact that the topology "jams up" when
>>>> the ShellSpout has a heartbeat timeout.  I can see my PHP spout/bolt still
>>>> running (I added logging to them), but Storm itself is doing nothing.
>>>>
>>>> -I added logging to ShellSpout and recompiled, because I saw the log
>>>> message on like 233 (Halting process: ShellSpout died) but as noted the PHP
>>>> process was still running, so I was curious if _process.destroy(); failed.
>>>> But, my logging didn't appear.  I assumed I was compiling/deploying wrong.
>>>> Eventually I commented out line 234: _collector.reportError(exception);
>>>>  and everything started working!!!
>>>>
>>>> Does this make *any* sense?  Why would
>>>> _collector.reportError(exception); block and never return (I waited quite a
>>>> long time, 10's of minutes).  When I comment out line 234, Storm
>>>> immediately kills my bad tasks and respawns almost instantly.
>>>>
>>>> I feel fairly confident that this will be recreatable.  My topology:
>>>> -1 spout (ShellSpout)
>>>> -1 bolt (ShellBolt)
>>>> -The ShellSpout has a heartbeat timeout due to slow tasks in ShellBolt
>>>> + the pending queue is full
>>>>
>>>> Thanks for any feedback!
>>>>
>>>> will
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>
>>
>> --
>> Alex Sobrino Beltrán
>> Registered Linux User #273657
>>
>> http://v5tech.es
>>
>
>
>
>

Re: ShellSpout hangs on reportError?

Posted by William Oberman <ob...@civicscience.com>.
I'm glad to hear I'm not the only one!

(no new news yet)

On Wed, Feb 11, 2015 at 3:09 AM, Alex Sobrino <al...@v5tech.es> wrote:

> Hi William,
>
> I'm having the same problem running a multilang topology (written in
> python). If you find a solution, please post it here, it will sure help us.
>
> To upgrade from 0.9.2-incubating we updated storm.py (
> https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py)
> and pom.xml.
>
> Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it
> works like hell.
>
> Best regards,
>
> On Tue, Feb 10, 2015 at 8:59 PM, William Oberman <oberman@civicscience.com
> > wrote:
>
>> I'm not sure the best way to share a test case.  I'll copy and paste code
>> below....  If you run the below code (and find the worker that was running
>> it's log file), you should see in ~30 seconds:
>> ====
>> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process:
>> ShellSpout died.
>> java.lang.RuntimeException: subprocess heartbeat timeout
>>         at
>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>> [storm-core-0.9.3.jar:0.9.3]
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> [na:1.7.0_71]
>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR]
>> java.lang.RuntimeException: subprocess heartbeat timeout
>>         at
>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>> [storm-core-0.9.3.jar:0.9.3]
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> [na:1.7.0_71]
>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>> =======
>>
>> But, the topology will run in a kind of weird zombie state forever.  More
>> specifically I see the multilang bolt process all tuples in the pending
>> queue, and then an infinite loop of nextTuple()/fail() from the multilang
>> spout.  But, as noted in my original email, if I comment out:
>>  _collector.reportError(exception);
>> in the Java ShellSpout then the worker will immediately die and respawn.
>>
>> If no one can help, the next step for me is rough, as I'll have to learn
>> how to actually develop and debug storm itself, which is usually at least
>> 10x harder than just using something :-)
>>
>> In any case, my test code:
>>
>> Topology = 1 process with two tasks (multilang spout and bolt), and small
>> pool of pending messages (yes, using the word count example in
>> storm-starter as a starting point....)
>> =============
>> public class SlowTopology {
>>   public static class SlowPhpBolt extends ShellBolt implements IRichBolt {
>>
>>     public SlowPhpBolt() {
>>       super("php", "slowBolt.php");
>>     }
>>
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>       declarer.declare(new Fields());
>>     }
>>
>>     @Override
>>     public Map<String, Object> getComponentConfiguration() {
>>       return null;
>>     }
>>   }
>>
>>   public static class SlowPhpSpout extends ShellSpout implements
>> IRichSpout {
>>
>>       public SlowPhpSpout() {
>>           super("php", "slowSpout.php");
>>       }
>>
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>         ofd.declare(new Fields("output"));
>>     }
>>
>>     @Override
>>     public Map<String, Object> getComponentConfiguration() {
>>         return null;
>>     }
>>   }
>>
>>   public static void main(String[] args) throws Exception {
>>
>>     TopologyBuilder builder = new TopologyBuilder();
>>
>>     builder.setSpout("spout", new SlowPhpSpout(),
>> 1).setNumTasks(1).setMaxSpoutPending(3);
>>     builder.setBolt("bolt", new SlowPhpBolt(),
>> 1).setNumTasks(1).shuffleGrouping("spout");
>>
>>     Config conf = new Config();
>>     conf.setDebug(true);
>>
>>     if (args != null && args.length > 0) {
>>       conf.setNumWorkers(1);
>>       StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
>> builder.createTopology());
>>     }
>>     else {
>>       LocalCluster cluster = new LocalCluster();
>>       cluster.submitTopology("slow", conf, builder.createTopology());
>>       Thread.sleep(10000);
>>       cluster.shutdown();
>>     }
>>   }
>> }
>> ===========
>>
>> slowSpout.php
>> ==========
>> <?php
>> require_once "storm.php";
>> class slowSpout extends \ShellSpout {
>>   protected function nextTuple() {
>>     $value = rand(0,100);
>>     $id = rand(0, 100);
>>     $this->emit(array($value), $id);
>>     file_put_contents("/tmp/storm_slow.log", "nextTuple()->value[$value]
>> id[$id]\n", FILE_APPEND);
>>     sleep(1);
>>   }
>>   protected function ack($id) {
>>     file_put_contents("/tmp/storm_slow.log", "ack($id)\n", FILE_APPEND);
>>   }
>>
>>   protected function fail($id) {
>>     file_put_contents("/tmp/storm_slow.log", "fail($id)\n", FILE_APPEND);
>>   }
>> }
>>
>> (new slowSpout())->run();
>> ===========
>>
>> slowBolt.php
>> ============
>> <?php
>> require_once "storm.php";
>> class slowBolt extends \BasicBolt {
>>   protected function process(\Tuple $t) {
>>     $sleep = rand(1, 180);
>>     file_put_contents("/tmp/storm_slow.log", "process(".print_r($t,
>> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND);
>>     sleep($sleep);
>>   }
>> }
>> (new slowBolt())->run();
>> ============
>>
>> storm.php  (from  https://github.com/lazyshot/storm-php, and I think I
>> added more error checking on reads/writes to standard in/out, added sync()
>> to the ShellSpout to make new classes easier to write, and the new
>> heartbeat protocol)
>> =========
>> <?php
>> interface iShellBolt {
>> }
>>
>> interface iShellSpout {
>> }
>>
>> class Tuple {
>>     public $id, $component, $stream, $task, $values;
>>
>>     public function __construct($id, $component, $stream, $task, $values)
>> {
>>         $this->id = $id;
>>         $this->component = $component;
>>         $this->stream = $stream;
>>         $this->task = $task;
>>         $this->values = $values;
>>     }
>> }
>>
>> abstract class ShellComponent {
>>     protected $pid;
>>     protected $stormConf;
>>     protected $topologyContext;
>>
>>     protected $stormInc = null;
>>
>>     public function __construct() {
>>         $this->pid = getmypid();
>>         $this->sendCommand(array("pid" => $this->pid));
>>
>>         $handshake = $this->parseMessage($this->waitForMessage());
>>
>>         $this->stormConf = $handshake['conf'];
>>         $this->topologyContext = $handshake['context'];
>>         $pidDir = $handshake['pidDir'];
>>
>>         @fclose(@fopen($pidDir . "/" . $this->pid, "w"));
>>     }
>>
>>     protected function readLine() {
>>         $raw = fgets(STDIN);
>>
>>         if ($raw === false) {
>>             throw new Exception("STDIN is broken");
>>         }
>>
>>         $line = trim($raw);
>>
>>         return $line;
>>     }
>>
>>     protected function waitForMessage() {
>>         $message = '';
>>         while (true) {
>>             $line = trim($this->readLine());
>>
>>             if (strlen($line) == 0) {
>>                 continue;
>>             } else if ($line == 'end') {
>>                 break;
>>             } else if ($line == 'sync') {
>>                 $message = '';
>>                 continue;
>>             }
>>
>>             $message .= $line . "\n";
>>         }
>>
>>         return trim($message);
>>     }
>>
>>     protected function sendCommand(array $command) {
>>         $this->sendMessage(json_encode($command));
>>     }
>>
>>     protected function sendLog($message) {
>>         return $this->sendCommand(array(
>>             'command' => 'log',
>>             'msg' => $message
>>         ));
>>     }
>>
>>     protected function parseMessage($message) {
>>         $msg = json_decode($message, true);
>>
>>         if ($msg) {
>>             return $msg;
>>         } else {
>>             return $message;
>>         }
>>     }
>>
>>     protected function sendMessage($message) {
>>         $message = "$message\nend\n";
>>         $bytesWritten = fwrite(STDOUT, $message);
>>         fflush(STDOUT);
>>         if ($bytesWritten === false) {
>>             throw new Exception("STDOUT is broken");
>>         }
>>         if ($bytesWritten != strlen($message)) {
>>             throw new Exception("Unable to write all bytes to STDOUT
>> (message=$message, bytesWritten=$bytesWritten)");
>>         }
>>     }
>>
>>     final protected function sync() {
>>         $command = array(
>>             'command' => 'sync',
>>         );
>>
>>         $this->sendCommand($command);
>>     }
>>
>> }
>>
>> abstract class ShellBolt extends ShellComponent implements iShellBolt {
>>
>>     public $anchor_tuple = null;
>>
>>     public function __construct() {
>>         parent::__construct();
>>
>>         $this->init($this->stormConf, $this->topologyContext);
>>     }
>>
>>     public function run() {
>>         try {
>>             while (true) {
>>                 $command = $this->parseMessage($this->waitForMessage());
>>
>>                 if (is_array($command)) {
>>                     if (isset($command['tuple'])) {
>>                         $tupleMap = array_merge(array(
>>                                 'id' => null,
>>                                 'comp' => null,
>>                                 'stream' => null,
>>                                 'task' => null,
>>                                 'tuple' => null
>>                             ),
>>
>>                             $command);
>>
>>                         if($tupleMap['task'] == -1 && $tupleMap['stream']
>> == "__heartbeat") {
>>                             $this->sync();
>>                         } else {
>>                             $tuple = new Tuple($tupleMap['id'],
>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>> $tupleMap['tuple']);
>>                             $this->process($tuple);
>>                         }
>>                     }
>>                 }
>>             }
>>         } catch (Exception $e) {
>>             $this->sendLog((string)$e);
>>         }
>>     }
>>
>>     abstract protected function process(Tuple $tuple);
>>
>>     protected function init($conf, $topology) {
>>         return;
>>     }
>>
>>     protected function emitTuple(array $tuple, $stream = null, $anchors =
>> array(), $directTask = null) {
>>         if ($this->anchor_tuple !== null) {
>>             $anchors = array($this->anchor_tuple);
>>         }
>>
>>         $command = array(
>>             'command' => 'emit'
>>         );
>>
>>         if ($stream !== null) {
>>             $command['stream'] = $stream;
>>         }
>>
>>         $command['anchors'] = array_map(function ($a) {
>>             return $a->id;
>>         }, $anchors);
>>
>>         if ($directTask !== null) {
>>             $command['task'] = $directTask;
>>         }
>>
>>         $command['tuple'] = $tuple;
>>
>>         $this->sendCommand($command);
>>     }
>>
>>     protected function emit($tuple, $stream = null, $anchors = array()) {
>>         $this->emitTuple($tuple, $stream, $anchors);
>>     }
>>
>>     protected function emitDirect($directTask, $tuple, $stream = null,
>> $anchors = array()) {
>>         $this->emitTuple($tuple, $stream, $anchors, $directTask);
>>     }
>>
>>     protected function ack(Tuple $tuple) {
>>         $command = array(
>>             'command' => 'ack',
>>             'id' => $tuple->id
>>         );
>>
>>         $this->sendCommand($command);
>>     }
>>
>>     protected function fail(Tuple $tuple) {
>>         $command = array(
>>             'command' => 'fail',
>>             'id' => $tuple->id
>>         );
>>
>>         $this->sendCommand($command);
>>     }
>> }
>>
>> abstract class BasicBolt extends ShellBolt {
>>     public function run() {
>>         try {
>>             while (true) {
>>                 $command = $this->parseMessage($this->waitForMessage());
>>
>>                 if (is_array($command)) {
>>                     if (isset($command['tuple'])) {
>>                         $tupleMap = array_merge(array(
>>                                 'id' => null,
>>                                 'comp' => null,
>>                                 'stream' => null,
>>                                 'task' => null,
>>                                 'tuple' => null
>>                             ),
>>
>>                             $command);
>>
>>                         if($tupleMap['task'] == -1 && $tupleMap['stream']
>> == "__heartbeat") {
>>                             $this->sync();
>>                         } else {
>>                             $tuple = new Tuple($tupleMap['id'],
>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>> $tupleMap['tuple']);
>>
>>                             $this->anchor_tuple = $tuple;
>>
>>                             try {
>>                                 $processed = $this->process($tuple);
>>
>>                                 $this->ack($tuple);
>>                             } catch (BoltProcessException $e) {
>>                                 $this->fail($tuple);
>>                             }
>>                         }
>>                     }
>>                 }
>>             }
>>         } catch (Exception $e) {
>>             $this->sendLog((string)$e);
>>         }
>>
>>     }
>> }
>>
>> abstract class ShellSpout extends ShellComponent implements iShellSpout {
>>     protected $tuples = array();
>>
>>     public function __construct() {
>>         parent::__construct();
>>
>>         $this->init($this->stormConf, $this->topologyContext);
>>     }
>>
>>
>>     abstract protected function nextTuple();
>>
>>     abstract protected function ack($tuple_id);
>>
>>     abstract protected function fail($tuple_id);
>>
>>     public function run() {
>>         try {
>>             while (true) {
>>                 $command = $this->parseMessage($this->waitForMessage());
>>
>>                 if (is_array($command)) {
>>                     if (isset($command['command'])) {
>>                         if ($command['command'] == 'ack') {
>>                             $this->ack($command['id']);
>>                             $this->sync();
>>                         } else if ($command['command'] == 'fail') {
>>                             $this->fail($command['id']);
>>                             $this->sync();
>>                         } else if ($command['command'] == 'next') {
>>                             $this->nextTuple();
>>                             $this->sync();
>>                         }
>>                     }
>>                 }
>>             }
>>         } catch (Exception $e) {
>>             $this->sendLog((string)$e);
>>             $this->sync();
>>         }
>>     }
>>
>>     protected function init($stormConf, $topologyContext) {
>>         return;
>>     }
>>
>>     final protected function emit(array $tuple, $messageId = null,
>> $streamId = null) {
>>         return $this->emitTuple($tuple, $messageId, $streamId, null);
>>     }
>>
>>     final protected function emitDirect($directTask, array $tuple,
>> $messageId = null, $streamId = null) {
>>         return $this->emitTuple($tuple, $messageId, $streamId,
>> $directTask);
>>     }
>>
>>     final private function emitTuple(array $tuple, $messageId = null,
>> $streamId = null, $directTask = null) {
>>         $command = array(
>>             'command' => 'emit'
>>         );
>>
>>         if ($messageId !== null) {
>>             $command['id'] = $messageId;
>>         }
>>
>>         if ($streamId !== null) {
>>             $command['stream'] = $streamId;
>>         }
>>
>>         if ($directTask !== null) {
>>             $command['task'] = $directTask;
>>         }
>>
>>         $command['tuple'] = $tuple;
>>
>>         return $this->sendCommand($command);
>>     }
>> }
>>
>> class BoltProcessException extends Exception {
>> }
>>
>> =========================
>>
>>
>> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman <oberman@civicscience.com
>> > wrote:
>>
>>> Hi,
>>>
>>> For reference, I'm talking about 0.9.3 ShellSpout, line 234.
>>>
>>> I'll try to cover the important facts that led to this issue:
>>>
>>> -I was on 0.9.2 using multilang to bridge to PHP to get to some existing
>>> business logic
>>>
>>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition to
>>> the ShellBolt protocol)
>>>
>>> -I have some odd topologies where I try to do some legacy background
>>> processing.  This processing takes a highly variable amount time in the
>>> Bolts, from milliseconds to minutes.  But, eventually due to randomness the
>>> spout's "pending" pool fills up, causing the spout to block on nextTuple,
>>> which eventually causes a heartbeat timeout. (I believe my only fix is to
>>> increase the heartbeat timeout at the topology level. that's not the
>>> purpose of this email, though confirmation of this as my only workaround
>>> would be appreciated!  I feel like this wasn't anticipated when the
>>> heartbeat patch was designed, as it was assumed the spout's nextTuple
>>> wouldn't block I guess?)
>>>
>>> -The purpose of this email is the fact that the topology "jams up" when
>>> the ShellSpout has a heartbeat timeout.  I can see my PHP spout/bolt still
>>> running (I added logging to them), but Storm itself is doing nothing.
>>>
>>> -I added logging to ShellSpout and recompiled, because I saw the log
>>> message on like 233 (Halting process: ShellSpout died) but as noted the PHP
>>> process was still running, so I was curious if _process.destroy(); failed.
>>> But, my logging didn't appear.  I assumed I was compiling/deploying wrong.
>>> Eventually I commented out line 234: _collector.reportError(exception);
>>>  and everything started working!!!
>>>
>>> Does this make *any* sense?  Why would
>>> _collector.reportError(exception); block and never return (I waited quite a
>>> long time, 10's of minutes).  When I comment out line 234, Storm
>>> immediately kills my bad tasks and respawns almost instantly.
>>>
>>> I feel fairly confident that this will be recreatable.  My topology:
>>> -1 spout (ShellSpout)
>>> -1 bolt (ShellBolt)
>>> -The ShellSpout has a heartbeat timeout due to slow tasks in ShellBolt +
>>> the pending queue is full
>>>
>>> Thanks for any feedback!
>>>
>>> will
>>>
>>>
>>>
>>
>>
>>
>
>
> --
> Alex Sobrino Beltrán
> Registered Linux User #273657
>
> http://v5tech.es
>

Re: ShellSpout hangs on reportError?

Posted by Alex Sobrino <al...@v5tech.es>.
Hi William,

I'm having the same problem running a multilang topology (written in
python). If you find a solution, please post it here, it will sure help us.

To upgrade from 0.9.2-incubating we updated storm.py (
https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py)
and pom.xml.

Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it
works like hell.

Best regards,

On Tue, Feb 10, 2015 at 8:59 PM, William Oberman <ob...@civicscience.com>
wrote:

> I'm not sure the best way to share a test case.  I'll copy and paste code
> below....  If you run the below code (and find the worker that was running
> it's log file), you should see in ~30 seconds:
> ====
> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process:
> ShellSpout died.
> java.lang.RuntimeException: subprocess heartbeat timeout
>         at
> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
> [storm-core-0.9.3.jar:0.9.3]
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> [na:1.7.0_71]
>         at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> [na:1.7.0_71]
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> [na:1.7.0_71]
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [na:1.7.0_71]
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> [na:1.7.0_71]
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> [na:1.7.0_71]
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR]
> java.lang.RuntimeException: subprocess heartbeat timeout
>         at
> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
> [storm-core-0.9.3.jar:0.9.3]
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> [na:1.7.0_71]
>         at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> [na:1.7.0_71]
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> [na:1.7.0_71]
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [na:1.7.0_71]
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> [na:1.7.0_71]
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> [na:1.7.0_71]
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> =======
>
> But, the topology will run in a kind of weird zombie state forever.  More
> specifically I see the multilang bolt process all tuples in the pending
> queue, and then an infinite loop of nextTuple()/fail() from the multilang
> spout.  But, as noted in my original email, if I comment out:
>  _collector.reportError(exception);
> in the Java ShellSpout then the worker will immediately die and respawn.
>
> If no one can help, the next step for me is rough, as I'll have to learn
> how to actually develop and debug storm itself, which is usually at least
> 10x harder than just using something :-)
>
> In any case, my test code:
>
> Topology = 1 process with two tasks (multilang spout and bolt), and small
> pool of pending messages (yes, using the word count example in
> storm-starter as a starting point....)
> =============
> public class SlowTopology {
>   public static class SlowPhpBolt extends ShellBolt implements IRichBolt {
>
>     public SlowPhpBolt() {
>       super("php", "slowBolt.php");
>     }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>       declarer.declare(new Fields());
>     }
>
>     @Override
>     public Map<String, Object> getComponentConfiguration() {
>       return null;
>     }
>   }
>
>   public static class SlowPhpSpout extends ShellSpout implements
> IRichSpout {
>
>       public SlowPhpSpout() {
>           super("php", "slowSpout.php");
>       }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>         ofd.declare(new Fields("output"));
>     }
>
>     @Override
>     public Map<String, Object> getComponentConfiguration() {
>         return null;
>     }
>   }
>
>   public static void main(String[] args) throws Exception {
>
>     TopologyBuilder builder = new TopologyBuilder();
>
>     builder.setSpout("spout", new SlowPhpSpout(),
> 1).setNumTasks(1).setMaxSpoutPending(3);
>     builder.setBolt("bolt", new SlowPhpBolt(),
> 1).setNumTasks(1).shuffleGrouping("spout");
>
>     Config conf = new Config();
>     conf.setDebug(true);
>
>     if (args != null && args.length > 0) {
>       conf.setNumWorkers(1);
>       StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
> builder.createTopology());
>     }
>     else {
>       LocalCluster cluster = new LocalCluster();
>       cluster.submitTopology("slow", conf, builder.createTopology());
>       Thread.sleep(10000);
>       cluster.shutdown();
>     }
>   }
> }
> ===========
>
> slowSpout.php
> ==========
> <?php
> require_once "storm.php";
> class slowSpout extends \ShellSpout {
>   protected function nextTuple() {
>     $value = rand(0,100);
>     $id = rand(0, 100);
>     $this->emit(array($value), $id);
>     file_put_contents("/tmp/storm_slow.log", "nextTuple()->value[$value]
> id[$id]\n", FILE_APPEND);
>     sleep(1);
>   }
>   protected function ack($id) {
>     file_put_contents("/tmp/storm_slow.log", "ack($id)\n", FILE_APPEND);
>   }
>
>   protected function fail($id) {
>     file_put_contents("/tmp/storm_slow.log", "fail($id)\n", FILE_APPEND);
>   }
> }
>
> (new slowSpout())->run();
> ===========
>
> slowBolt.php
> ============
> <?php
> require_once "storm.php";
> class slowBolt extends \BasicBolt {
>   protected function process(\Tuple $t) {
>     $sleep = rand(1, 180);
>     file_put_contents("/tmp/storm_slow.log", "process(".print_r($t,
> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND);
>     sleep($sleep);
>   }
> }
> (new slowBolt())->run();
> ============
>
> storm.php  (from  https://github.com/lazyshot/storm-php, and I think I
> added more error checking on reads/writes to standard in/out, added sync()
> to the ShellSpout to make new classes easier to write, and the new
> heartbeat protocol)
> =========
> <?php
> interface iShellBolt {
> }
>
> interface iShellSpout {
> }
>
> class Tuple {
>     public $id, $component, $stream, $task, $values;
>
>     public function __construct($id, $component, $stream, $task, $values) {
>         $this->id = $id;
>         $this->component = $component;
>         $this->stream = $stream;
>         $this->task = $task;
>         $this->values = $values;
>     }
> }
>
> abstract class ShellComponent {
>     protected $pid;
>     protected $stormConf;
>     protected $topologyContext;
>
>     protected $stormInc = null;
>
>     public function __construct() {
>         $this->pid = getmypid();
>         $this->sendCommand(array("pid" => $this->pid));
>
>         $handshake = $this->parseMessage($this->waitForMessage());
>
>         $this->stormConf = $handshake['conf'];
>         $this->topologyContext = $handshake['context'];
>         $pidDir = $handshake['pidDir'];
>
>         @fclose(@fopen($pidDir . "/" . $this->pid, "w"));
>     }
>
>     protected function readLine() {
>         $raw = fgets(STDIN);
>
>         if ($raw === false) {
>             throw new Exception("STDIN is broken");
>         }
>
>         $line = trim($raw);
>
>         return $line;
>     }
>
>     protected function waitForMessage() {
>         $message = '';
>         while (true) {
>             $line = trim($this->readLine());
>
>             if (strlen($line) == 0) {
>                 continue;
>             } else if ($line == 'end') {
>                 break;
>             } else if ($line == 'sync') {
>                 $message = '';
>                 continue;
>             }
>
>             $message .= $line . "\n";
>         }
>
>         return trim($message);
>     }
>
>     protected function sendCommand(array $command) {
>         $this->sendMessage(json_encode($command));
>     }
>
>     protected function sendLog($message) {
>         return $this->sendCommand(array(
>             'command' => 'log',
>             'msg' => $message
>         ));
>     }
>
>     protected function parseMessage($message) {
>         $msg = json_decode($message, true);
>
>         if ($msg) {
>             return $msg;
>         } else {
>             return $message;
>         }
>     }
>
>     protected function sendMessage($message) {
>         $message = "$message\nend\n";
>         $bytesWritten = fwrite(STDOUT, $message);
>         fflush(STDOUT);
>         if ($bytesWritten === false) {
>             throw new Exception("STDOUT is broken");
>         }
>         if ($bytesWritten != strlen($message)) {
>             throw new Exception("Unable to write all bytes to STDOUT
> (message=$message, bytesWritten=$bytesWritten)");
>         }
>     }
>
>     final protected function sync() {
>         $command = array(
>             'command' => 'sync',
>         );
>
>         $this->sendCommand($command);
>     }
>
> }
>
> abstract class ShellBolt extends ShellComponent implements iShellBolt {
>
>     public $anchor_tuple = null;
>
>     public function __construct() {
>         parent::__construct();
>
>         $this->init($this->stormConf, $this->topologyContext);
>     }
>
>     public function run() {
>         try {
>             while (true) {
>                 $command = $this->parseMessage($this->waitForMessage());
>
>                 if (is_array($command)) {
>                     if (isset($command['tuple'])) {
>                         $tupleMap = array_merge(array(
>                                 'id' => null,
>                                 'comp' => null,
>                                 'stream' => null,
>                                 'task' => null,
>                                 'tuple' => null
>                             ),
>
>                             $command);
>
>                         if($tupleMap['task'] == -1 && $tupleMap['stream']
> == "__heartbeat") {
>                             $this->sync();
>                         } else {
>                             $tuple = new Tuple($tupleMap['id'],
> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
> $tupleMap['tuple']);
>                             $this->process($tuple);
>                         }
>                     }
>                 }
>             }
>         } catch (Exception $e) {
>             $this->sendLog((string)$e);
>         }
>     }
>
>     abstract protected function process(Tuple $tuple);
>
>     protected function init($conf, $topology) {
>         return;
>     }
>
>     protected function emitTuple(array $tuple, $stream = null, $anchors =
> array(), $directTask = null) {
>         if ($this->anchor_tuple !== null) {
>             $anchors = array($this->anchor_tuple);
>         }
>
>         $command = array(
>             'command' => 'emit'
>         );
>
>         if ($stream !== null) {
>             $command['stream'] = $stream;
>         }
>
>         $command['anchors'] = array_map(function ($a) {
>             return $a->id;
>         }, $anchors);
>
>         if ($directTask !== null) {
>             $command['task'] = $directTask;
>         }
>
>         $command['tuple'] = $tuple;
>
>         $this->sendCommand($command);
>     }
>
>     protected function emit($tuple, $stream = null, $anchors = array()) {
>         $this->emitTuple($tuple, $stream, $anchors);
>     }
>
>     protected function emitDirect($directTask, $tuple, $stream = null,
> $anchors = array()) {
>         $this->emitTuple($tuple, $stream, $anchors, $directTask);
>     }
>
>     protected function ack(Tuple $tuple) {
>         $command = array(
>             'command' => 'ack',
>             'id' => $tuple->id
>         );
>
>         $this->sendCommand($command);
>     }
>
>     protected function fail(Tuple $tuple) {
>         $command = array(
>             'command' => 'fail',
>             'id' => $tuple->id
>         );
>
>         $this->sendCommand($command);
>     }
> }
>
> abstract class BasicBolt extends ShellBolt {
>     public function run() {
>         try {
>             while (true) {
>                 $command = $this->parseMessage($this->waitForMessage());
>
>                 if (is_array($command)) {
>                     if (isset($command['tuple'])) {
>                         $tupleMap = array_merge(array(
>                                 'id' => null,
>                                 'comp' => null,
>                                 'stream' => null,
>                                 'task' => null,
>                                 'tuple' => null
>                             ),
>
>                             $command);
>
>                         if($tupleMap['task'] == -1 && $tupleMap['stream']
> == "__heartbeat") {
>                             $this->sync();
>                         } else {
>                             $tuple = new Tuple($tupleMap['id'],
> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
> $tupleMap['tuple']);
>
>                             $this->anchor_tuple = $tuple;
>
>                             try {
>                                 $processed = $this->process($tuple);
>
>                                 $this->ack($tuple);
>                             } catch (BoltProcessException $e) {
>                                 $this->fail($tuple);
>                             }
>                         }
>                     }
>                 }
>             }
>         } catch (Exception $e) {
>             $this->sendLog((string)$e);
>         }
>
>     }
> }
>
> abstract class ShellSpout extends ShellComponent implements iShellSpout {
>     protected $tuples = array();
>
>     public function __construct() {
>         parent::__construct();
>
>         $this->init($this->stormConf, $this->topologyContext);
>     }
>
>
>     abstract protected function nextTuple();
>
>     abstract protected function ack($tuple_id);
>
>     abstract protected function fail($tuple_id);
>
>     public function run() {
>         try {
>             while (true) {
>                 $command = $this->parseMessage($this->waitForMessage());
>
>                 if (is_array($command)) {
>                     if (isset($command['command'])) {
>                         if ($command['command'] == 'ack') {
>                             $this->ack($command['id']);
>                             $this->sync();
>                         } else if ($command['command'] == 'fail') {
>                             $this->fail($command['id']);
>                             $this->sync();
>                         } else if ($command['command'] == 'next') {
>                             $this->nextTuple();
>                             $this->sync();
>                         }
>                     }
>                 }
>             }
>         } catch (Exception $e) {
>             $this->sendLog((string)$e);
>             $this->sync();
>         }
>     }
>
>     protected function init($stormConf, $topologyContext) {
>         return;
>     }
>
>     final protected function emit(array $tuple, $messageId = null,
> $streamId = null) {
>         return $this->emitTuple($tuple, $messageId, $streamId, null);
>     }
>
>     final protected function emitDirect($directTask, array $tuple,
> $messageId = null, $streamId = null) {
>         return $this->emitTuple($tuple, $messageId, $streamId,
> $directTask);
>     }
>
>     final private function emitTuple(array $tuple, $messageId = null,
> $streamId = null, $directTask = null) {
>         $command = array(
>             'command' => 'emit'
>         );
>
>         if ($messageId !== null) {
>             $command['id'] = $messageId;
>         }
>
>         if ($streamId !== null) {
>             $command['stream'] = $streamId;
>         }
>
>         if ($directTask !== null) {
>             $command['task'] = $directTask;
>         }
>
>         $command['tuple'] = $tuple;
>
>         return $this->sendCommand($command);
>     }
> }
>
> class BoltProcessException extends Exception {
> }
>
> =========================
>
>
> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman <ob...@civicscience.com>
> wrote:
>
>> Hi,
>>
>> For reference, I'm talking about 0.9.3 ShellSpout, line 234.
>>
>> I'll try to cover the important facts that led to this issue:
>>
>> -I was on 0.9.2 using multilang to bridge to PHP to get to some existing
>> business logic
>>
>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition to
>> the ShellBolt protocol)
>>
>> -I have some odd topologies where I try to do some legacy background
>> processing.  This processing takes a highly variable amount time in the
>> Bolts, from milliseconds to minutes.  But, eventually due to randomness the
>> spout's "pending" pool fills up, causing the spout to block on nextTuple,
>> which eventually causes a heartbeat timeout. (I believe my only fix is to
>> increase the heartbeat timeout at the topology level. that's not the
>> purpose of this email, though confirmation of this as my only workaround
>> would be appreciated!  I feel like this wasn't anticipated when the
>> heartbeat patch was designed, as it was assumed the spout's nextTuple
>> wouldn't block I guess?)
>>
>> -The purpose of this email is the fact that the topology "jams up" when
>> the ShellSpout has a heartbeat timeout.  I can see my PHP spout/bolt still
>> running (I added logging to them), but Storm itself is doing nothing.
>>
>> -I added logging to ShellSpout and recompiled, because I saw the log
>> message on like 233 (Halting process: ShellSpout died) but as noted the PHP
>> process was still running, so I was curious if _process.destroy(); failed.
>> But, my logging didn't appear.  I assumed I was compiling/deploying wrong.
>> Eventually I commented out line 234: _collector.reportError(exception);
>>  and everything started working!!!
>>
>> Does this make *any* sense?  Why would _collector.reportError(exception);
>> block and never return (I waited quite a long time, 10's of minutes).  When
>> I comment out line 234, Storm immediately kills my bad tasks and respawns
>> almost instantly.
>>
>> I feel fairly confident that this will be recreatable.  My topology:
>> -1 spout (ShellSpout)
>> -1 bolt (ShellBolt)
>> -The ShellSpout has a heartbeat timeout due to slow tasks in ShellBolt +
>> the pending queue is full
>>
>> Thanks for any feedback!
>>
>> will
>>
>>
>>
>
>
>


-- 
Alex Sobrino Beltrán
Registered Linux User #273657

http://v5tech.es

Re: ShellSpout hangs on reportError?

Posted by William Oberman <ob...@civicscience.com>.
I'm not sure the best way to share a test case.  I'll copy and paste code
below....  If you run the below code (and find the worker that was running
it's log file), you should see in ~30 seconds:
====
2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process:
ShellSpout died.
java.lang.RuntimeException: subprocess heartbeat timeout
        at
backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
[storm-core-0.9.3.jar:0.9.3]
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
[na:1.7.0_71]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
[na:1.7.0_71]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
[na:1.7.0_71]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[na:1.7.0_71]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_71]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_71]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR]
java.lang.RuntimeException: subprocess heartbeat timeout
        at
backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
[storm-core-0.9.3.jar:0.9.3]
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
[na:1.7.0_71]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
[na:1.7.0_71]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
[na:1.7.0_71]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[na:1.7.0_71]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_71]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_71]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
=======

But, the topology will run in a kind of weird zombie state forever.  More
specifically I see the multilang bolt process all tuples in the pending
queue, and then an infinite loop of nextTuple()/fail() from the multilang
spout.  But, as noted in my original email, if I comment out:
 _collector.reportError(exception);
in the Java ShellSpout then the worker will immediately die and respawn.

If no one can help, the next step for me is rough, as I'll have to learn
how to actually develop and debug storm itself, which is usually at least
10x harder than just using something :-)

In any case, my test code:

Topology = 1 process with two tasks (multilang spout and bolt), and small
pool of pending messages (yes, using the word count example in
storm-starter as a starting point....)
=============
public class SlowTopology {
  public static class SlowPhpBolt extends ShellBolt implements IRichBolt {

    public SlowPhpBolt() {
      super("php", "slowBolt.php");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields());
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
      return null;
    }
  }

  public static class SlowPhpSpout extends ShellSpout implements IRichSpout
{

      public SlowPhpSpout() {
          super("php", "slowSpout.php");
      }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
        ofd.declare(new Fields("output"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
  }

  public static void main(String[] args) throws Exception {

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new SlowPhpSpout(),
1).setNumTasks(1).setMaxSpoutPending(3);
    builder.setBolt("bolt", new SlowPhpBolt(),
1).setNumTasks(1).shuffleGrouping("spout");

    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(1);
      StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
builder.createTopology());
    }
    else {
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("slow", conf, builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
    }
  }
}
===========

slowSpout.php
==========
<?php
require_once "storm.php";
class slowSpout extends \ShellSpout {
  protected function nextTuple() {
    $value = rand(0,100);
    $id = rand(0, 100);
    $this->emit(array($value), $id);
    file_put_contents("/tmp/storm_slow.log", "nextTuple()->value[$value]
id[$id]\n", FILE_APPEND);
    sleep(1);
  }
  protected function ack($id) {
    file_put_contents("/tmp/storm_slow.log", "ack($id)\n", FILE_APPEND);
  }

  protected function fail($id) {
    file_put_contents("/tmp/storm_slow.log", "fail($id)\n", FILE_APPEND);
  }
}

(new slowSpout())->run();
===========

slowBolt.php
============
<?php
require_once "storm.php";
class slowBolt extends \BasicBolt {
  protected function process(\Tuple $t) {
    $sleep = rand(1, 180);
    file_put_contents("/tmp/storm_slow.log", "process(".print_r($t,
true)."), sleeping for sleep[$sleep]\n", FILE_APPEND);
    sleep($sleep);
  }
}
(new slowBolt())->run();
============

storm.php  (from  https://github.com/lazyshot/storm-php, and I think I
added more error checking on reads/writes to standard in/out, added sync()
to the ShellSpout to make new classes easier to write, and the new
heartbeat protocol)
=========
<?php
interface iShellBolt {
}

interface iShellSpout {
}

class Tuple {
    public $id, $component, $stream, $task, $values;

    public function __construct($id, $component, $stream, $task, $values) {
        $this->id = $id;
        $this->component = $component;
        $this->stream = $stream;
        $this->task = $task;
        $this->values = $values;
    }
}

abstract class ShellComponent {
    protected $pid;
    protected $stormConf;
    protected $topologyContext;

    protected $stormInc = null;

    public function __construct() {
        $this->pid = getmypid();
        $this->sendCommand(array("pid" => $this->pid));

        $handshake = $this->parseMessage($this->waitForMessage());

        $this->stormConf = $handshake['conf'];
        $this->topologyContext = $handshake['context'];
        $pidDir = $handshake['pidDir'];

        @fclose(@fopen($pidDir . "/" . $this->pid, "w"));
    }

    protected function readLine() {
        $raw = fgets(STDIN);

        if ($raw === false) {
            throw new Exception("STDIN is broken");
        }

        $line = trim($raw);

        return $line;
    }

    protected function waitForMessage() {
        $message = '';
        while (true) {
            $line = trim($this->readLine());

            if (strlen($line) == 0) {
                continue;
            } else if ($line == 'end') {
                break;
            } else if ($line == 'sync') {
                $message = '';
                continue;
            }

            $message .= $line . "\n";
        }

        return trim($message);
    }

    protected function sendCommand(array $command) {
        $this->sendMessage(json_encode($command));
    }

    protected function sendLog($message) {
        return $this->sendCommand(array(
            'command' => 'log',
            'msg' => $message
        ));
    }

    protected function parseMessage($message) {
        $msg = json_decode($message, true);

        if ($msg) {
            return $msg;
        } else {
            return $message;
        }
    }

    protected function sendMessage($message) {
        $message = "$message\nend\n";
        $bytesWritten = fwrite(STDOUT, $message);
        fflush(STDOUT);
        if ($bytesWritten === false) {
            throw new Exception("STDOUT is broken");
        }
        if ($bytesWritten != strlen($message)) {
            throw new Exception("Unable to write all bytes to STDOUT
(message=$message, bytesWritten=$bytesWritten)");
        }
    }

    final protected function sync() {
        $command = array(
            'command' => 'sync',
        );

        $this->sendCommand($command);
    }

}

abstract class ShellBolt extends ShellComponent implements iShellBolt {

    public $anchor_tuple = null;

    public function __construct() {
        parent::__construct();

        $this->init($this->stormConf, $this->topologyContext);
    }

    public function run() {
        try {
            while (true) {
                $command = $this->parseMessage($this->waitForMessage());

                if (is_array($command)) {
                    if (isset($command['tuple'])) {
                        $tupleMap = array_merge(array(
                                'id' => null,
                                'comp' => null,
                                'stream' => null,
                                'task' => null,
                                'tuple' => null
                            ),

                            $command);

                        if($tupleMap['task'] == -1 && $tupleMap['stream']
== "__heartbeat") {
                            $this->sync();
                        } else {
                            $tuple = new Tuple($tupleMap['id'],
$tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
$tupleMap['tuple']);
                            $this->process($tuple);
                        }
                    }
                }
            }
        } catch (Exception $e) {
            $this->sendLog((string)$e);
        }
    }

    abstract protected function process(Tuple $tuple);

    protected function init($conf, $topology) {
        return;
    }

    protected function emitTuple(array $tuple, $stream = null, $anchors =
array(), $directTask = null) {
        if ($this->anchor_tuple !== null) {
            $anchors = array($this->anchor_tuple);
        }

        $command = array(
            'command' => 'emit'
        );

        if ($stream !== null) {
            $command['stream'] = $stream;
        }

        $command['anchors'] = array_map(function ($a) {
            return $a->id;
        }, $anchors);

        if ($directTask !== null) {
            $command['task'] = $directTask;
        }

        $command['tuple'] = $tuple;

        $this->sendCommand($command);
    }

    protected function emit($tuple, $stream = null, $anchors = array()) {
        $this->emitTuple($tuple, $stream, $anchors);
    }

    protected function emitDirect($directTask, $tuple, $stream = null,
$anchors = array()) {
        $this->emitTuple($tuple, $stream, $anchors, $directTask);
    }

    protected function ack(Tuple $tuple) {
        $command = array(
            'command' => 'ack',
            'id' => $tuple->id
        );

        $this->sendCommand($command);
    }

    protected function fail(Tuple $tuple) {
        $command = array(
            'command' => 'fail',
            'id' => $tuple->id
        );

        $this->sendCommand($command);
    }
}

abstract class BasicBolt extends ShellBolt {
    public function run() {
        try {
            while (true) {
                $command = $this->parseMessage($this->waitForMessage());

                if (is_array($command)) {
                    if (isset($command['tuple'])) {
                        $tupleMap = array_merge(array(
                                'id' => null,
                                'comp' => null,
                                'stream' => null,
                                'task' => null,
                                'tuple' => null
                            ),

                            $command);

                        if($tupleMap['task'] == -1 && $tupleMap['stream']
== "__heartbeat") {
                            $this->sync();
                        } else {
                            $tuple = new Tuple($tupleMap['id'],
$tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
$tupleMap['tuple']);

                            $this->anchor_tuple = $tuple;

                            try {
                                $processed = $this->process($tuple);

                                $this->ack($tuple);
                            } catch (BoltProcessException $e) {
                                $this->fail($tuple);
                            }
                        }
                    }
                }
            }
        } catch (Exception $e) {
            $this->sendLog((string)$e);
        }

    }
}

abstract class ShellSpout extends ShellComponent implements iShellSpout {
    protected $tuples = array();

    public function __construct() {
        parent::__construct();

        $this->init($this->stormConf, $this->topologyContext);
    }


    abstract protected function nextTuple();

    abstract protected function ack($tuple_id);

    abstract protected function fail($tuple_id);

    public function run() {
        try {
            while (true) {
                $command = $this->parseMessage($this->waitForMessage());

                if (is_array($command)) {
                    if (isset($command['command'])) {
                        if ($command['command'] == 'ack') {
                            $this->ack($command['id']);
                            $this->sync();
                        } else if ($command['command'] == 'fail') {
                            $this->fail($command['id']);
                            $this->sync();
                        } else if ($command['command'] == 'next') {
                            $this->nextTuple();
                            $this->sync();
                        }
                    }
                }
            }
        } catch (Exception $e) {
            $this->sendLog((string)$e);
            $this->sync();
        }
    }

    protected function init($stormConf, $topologyContext) {
        return;
    }

    final protected function emit(array $tuple, $messageId = null,
$streamId = null) {
        return $this->emitTuple($tuple, $messageId, $streamId, null);
    }

    final protected function emitDirect($directTask, array $tuple,
$messageId = null, $streamId = null) {
        return $this->emitTuple($tuple, $messageId, $streamId, $directTask);
    }

    final private function emitTuple(array $tuple, $messageId = null,
$streamId = null, $directTask = null) {
        $command = array(
            'command' => 'emit'
        );

        if ($messageId !== null) {
            $command['id'] = $messageId;
        }

        if ($streamId !== null) {
            $command['stream'] = $streamId;
        }

        if ($directTask !== null) {
            $command['task'] = $directTask;
        }

        $command['tuple'] = $tuple;

        return $this->sendCommand($command);
    }
}

class BoltProcessException extends Exception {
}

=========================


On Fri, Feb 6, 2015 at 9:48 AM, William Oberman <ob...@civicscience.com>
wrote:

> Hi,
>
> For reference, I'm talking about 0.9.3 ShellSpout, line 234.
>
> I'll try to cover the important facts that led to this issue:
>
> -I was on 0.9.2 using multilang to bridge to PHP to get to some existing
> business logic
>
> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition to
> the ShellBolt protocol)
>
> -I have some odd topologies where I try to do some legacy background
> processing.  This processing takes a highly variable amount time in the
> Bolts, from milliseconds to minutes.  But, eventually due to randomness the
> spout's "pending" pool fills up, causing the spout to block on nextTuple,
> which eventually causes a heartbeat timeout. (I believe my only fix is to
> increase the heartbeat timeout at the topology level. that's not the
> purpose of this email, though confirmation of this as my only workaround
> would be appreciated!  I feel like this wasn't anticipated when the
> heartbeat patch was designed, as it was assumed the spout's nextTuple
> wouldn't block I guess?)
>
> -The purpose of this email is the fact that the topology "jams up" when
> the ShellSpout has a heartbeat timeout.  I can see my PHP spout/bolt still
> running (I added logging to them), but Storm itself is doing nothing.
>
> -I added logging to ShellSpout and recompiled, because I saw the log
> message on like 233 (Halting process: ShellSpout died) but as noted the PHP
> process was still running, so I was curious if _process.destroy(); failed.
> But, my logging didn't appear.  I assumed I was compiling/deploying wrong.
> Eventually I commented out line 234: _collector.reportError(exception);
>  and everything started working!!!
>
> Does this make *any* sense?  Why would _collector.reportError(exception);
> block and never return (I waited quite a long time, 10's of minutes).  When
> I comment out line 234, Storm immediately kills my bad tasks and respawns
> almost instantly.
>
> I feel fairly confident that this will be recreatable.  My topology:
> -1 spout (ShellSpout)
> -1 bolt (ShellBolt)
> -The ShellSpout has a heartbeat timeout due to slow tasks in ShellBolt +
> the pending queue is full
>
> Thanks for any feedback!
>
> will
>
>
>