You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Neeraj Kumar <ni...@gmail.com> on 2016/03/30 11:12:50 UTC

Integrating Kafkaspout and python bolt

Hi

I am trying implement a topology in which a kafka spout provides input to a
python bolt. The input will be a sentence and python bolt will split it
into words.

However when I submit the topology I get the below error in storm UI.

As I am new to storm I am not able to figure out its meaning.

Sequence of events are:-

1. Submit topology.
2. Provide input to kafka topic from kafka terminal.
3. But by the time I provide input to kafka topic, storm shows the below
error.

I am using the storm.py and splitsentence.py file from storm starter kit.

Please help me in this issue.

Error
------------
java.lang.RuntimeException: backtype.storm.multilang.NoOutputException:
Pipe to subprocess seems to be broken! No output read. Serializer
Exception: at
backtype.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:101) at
backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:318) at
java.lang.Thread.run(Thread.java:745)

java.lang.Exception: Shell Process Exception: Traceback (most recent call
last): File
"/home/hduser1/apache-storm-0.9.6/storm-local/supervisor/stormdist/fashionsearch-topology-40-1459326815/resources/storm.py",
line 217, in run tup = readTuple() File
"/home/hduser1/apache-storm-0.9.6/storm-local/supervisor/stormdist/fashionsearch-topology-40-1459326815/resources/storm.py",
line 74, in readTuple cmd = readCommand() File
"/home/hduser1/apache-storm-0.9.6/storm-local/supervisor/stormdist/fashionsearch-topology-40-1459326815/resources/storm.py",
line 67, in readCommand msg = readMsg() File
"/home/hduser1/apache-storm-0.9.6/storm-local/supervisor/stormdist/fashionsearch-topology-40-1459326815/resources/storm.py",
line 42, in readMsg return json_decode(msg[0:-1]) File
"/home/hduser1/apache-storm-0.9.6/storm-local/supervisor/stormdist/fashionsearch-topology-40-1459326815/resources/storm.py",
line 30, in <lambda> json_decode = lambda x: json.loads(x) File
"/usr/lib/python2.7/json/__init__.py", line 338, in loads return
_default_decoder.decode(s) File "/usr/lib/python2.7/json/decoder.py", line
366, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File
"/usr/lib/python2.7/json/decoder.py", line 384, in raw_decode raise
ValueError("No JSON object could be decoded") ValueError: No JSON object
could be decoded at
backtype.storm.task.ShellBolt.handleError(ShellBolt.java:188) at
backtype.storm.task.ShellBolt.access$1100(ShellBolt.java:69) at
backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:331) at
java.lang.Thread.run(Thread.java:745)

java.io.IOException: Stream closed at
java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:433) at
java.io.OutputStream.write(OutputStream.java:116) at
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at
java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at
java.io.DataOutputStream.flush(DataOutputStream.java:123) at
backtype.storm.multilang.JsonSerializer.writeString(JsonSerializer.java:96)
at
backtype.storm.multilang.JsonSerializer.writeMessage(JsonSerializer.java:89)
at
backtype.storm.multilang.JsonSerializer.writeBoltMsg(JsonSerializer.java:74)
at backtype.storm.utils.ShellProcess.writeBoltMsg(ShellProcess.java:106) at
backtype.storm.task.ShellBolt$BoltWriterRunnable.run(ShellBolt.java:361) at
java.lang.Thread.run(Thread.java:745)

Thanks
Neeraj