You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Hugo Sequeira <hu...@gmail.com> on 2014/10/22 10:13:15 UTC

Tuples turn empty after rebalacing

Hi all,

I have a really strange behaviour on my topology.

a) I have a Spout that is reading messages from RMQ and sends those to a
DataPointConverterBolt to convert it in JSON etc.
This is its code:

public class DataPointConverterBolt extends BaseBasicBolt {

...

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {

synchronized (collector) {

if (tuple.contains("object")) {
 LOGGER.error("TUPLE:");
LOGGER.error(tuple.getValues().toString());

if (tuple.getValueByField("object") instanceof JSONObject) {

JSONObject message = (JSONObject) tuple
.getValueByField("object");

... more JSON operations

b) Now, if I deploy the topology with 1 worker everything works well and a
typical debug is something like this:

... other threads/executors output ...
c.a.e.b.DataPointConverterBolt  [ERROR] TUPLE:
c.a.e.b.DataPointConverterBolt  [ERROR]
[*{"value":16,"sourceTimestamp":1413905723923}*,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@6bad662f,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@450fcee3]
... other threads/executors output ...
c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
c.a.e.b.DataPointConverterBolt [ERROR]
[*{"value":21,"sourceTimestamp":14139057238641000}*,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@325f046d,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@7c1ba9f0]
... etc

c) Then, if I rebalance the topology to 2 or 3 workers (JVMs) then the bolt
with this task starts receiving only empty tuples from the spout:

c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
c.a.e.b.DataPointConverterBolt [ERROR] [*{}*,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@4b6abe5a,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@4cf72d64]

c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
c.a.e.b.DataPointConverterBolt [ERROR] [*{}*,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@77c527b5,
io.latent.storm.rabbitmq.

....

Any thoughts on what can produce this empty tuples?

Thank you for your support
Best regards
Hugo

Re: Tuples turn empty after rebalacing

Posted by Hugo Sequeira <hu...@gmail.com>.
Yupi, I fixed! After days of trying to find the problem. I will leave here
the solution. The problem was actually in the Spout and related with
serialization.

The Spout was actually the one converting to JSON and the Bolt from JSON to
Java. I changed the Spout to transfer String and the Bolt to convert
String->JSON->Java Objects.

So, transferring raw Java types between components worked. Thus, I guess
the problem was somehow related with serialization of objects between
components. This was falling and so the Tuple was being sent as default,
empty (""). Strange enough I couldn't find any Exceptions in the logs.

I guess that with only one JVM the serialization was not in use or the
types were being found by the classes autonomously.

2014-10-22 10:48 GMT+02:00 Itai Frenkel <It...@forter.com>:

>  Does the code work correctly when the same worker runs more than one
> Spout ?
>  ------------------------------
> *From:* Hugo Sequeira <hu...@gmail.com>
> *Sent:* Wednesday, October 22, 2014 11:13 AM
> *To:* user@storm.apache.org
> *Subject:* Tuples turn empty after rebalacing
>
>  Hi all,
>
>  I have a really strange behaviour on my topology.
>
>  a) I have a Spout that is reading messages from RMQ and sends those to a
> DataPointConverterBolt to convert it in JSON etc.
> This is its code:
>
>  public class DataPointConverterBolt extends BaseBasicBolt {
>
>  ...
>
>  @Override
> public void execute(Tuple tuple, BasicOutputCollector collector) {
>
>  synchronized (collector) {
>
>  if (tuple.contains("object")) {
>  LOGGER.error("TUPLE:");
> LOGGER.error(tuple.getValues().toString());
>
>  if (tuple.getValueByField("object") instanceof JSONObject) {
>
>  JSONObject message = (JSONObject) tuple
> .getValueByField("object");
>
>  ... more JSON operations
>
>  b) Now, if I deploy the topology with 1 worker everything works well and
> a typical debug is something like this:
>
>  ... other threads/executors output ...
> c.a.e.b.DataPointConverterBolt  [ERROR] TUPLE:
>  c.a.e.b.DataPointConverterBolt  [ERROR]
>  [*{"value":16,"sourceTimestamp":1413905723923}*,
> io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@6bad662f,
> io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@450fcee3]
>  ... other threads/executors output ...
>  c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
>  c.a.e.b.DataPointConverterBolt [ERROR]
>  [*{"value":21,"sourceTimestamp":14139057238641000}*,
> io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@325f046d,
> io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@7c1ba9f0]
>  ... etc
>
>  c) Then, if I rebalance the topology to 2 or 3 workers (JVMs) then the
> bolt with this task starts receiving only empty tuples from the spout:
>
>  c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
> c.a.e.b.DataPointConverterBolt [ERROR] [*{}*,
> io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@4b6abe5a,
> io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@4cf72d64]
>
>  c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
>  c.a.e.b.DataPointConverterBolt [ERROR] [*{}*,
> io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@77c527b5,
> io.latent.storm.rabbitmq.
>
>  ....
>
>  Any thoughts on what can produce this empty tuples?
>
>  Thank you for your support
> Best regards
> Hugo
>
>
>
>

Re: Tuples turn empty after rebalacing

Posted by Itai Frenkel <It...@forter.com>.
Does the code work correctly when the same worker runs more than one Spout ?

________________________________
From: Hugo Sequeira <hu...@gmail.com>
Sent: Wednesday, October 22, 2014 11:13 AM
To: user@storm.apache.org
Subject: Tuples turn empty after rebalacing

Hi all,

I have a really strange behaviour on my topology.

a) I have a Spout that is reading messages from RMQ and sends those to a DataPointConverterBolt to convert it in JSON etc.
This is its code:

public class DataPointConverterBolt extends BaseBasicBolt {

...

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {

synchronized (collector) {

if (tuple.contains("object")) {
LOGGER.error("TUPLE:");
LOGGER.error(tuple.getValues().toString());

if (tuple.getValueByField("object") instanceof JSONObject) {

JSONObject message = (JSONObject) tuple
.getValueByField("object");

... more JSON operations

b) Now, if I deploy the topology with 1 worker everything works well and a typical debug is something like this:

... other threads/executors output ...
c.a.e.b.DataPointConverterBolt  [ERROR] TUPLE:
c.a.e.b.DataPointConverterBolt  [ERROR]
[{"value":16,"sourceTimestamp":1413905723923}, io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@6bad662f, io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@450fcee3]
... other threads/executors output ...
c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
c.a.e.b.DataPointConverterBolt [ERROR]
[{"value":21,"sourceTimestamp":14139057238641000}, io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@325f046d, io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@7c1ba9f0]
... etc

c) Then, if I rebalance the topology to 2 or 3 workers (JVMs) then the bolt with this task starts receiving only empty tuples from the spout:

c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
c.a.e.b.DataPointConverterBolt [ERROR] [{}, io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@4b6abe5a, io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@4cf72d64]

c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
c.a.e.b.DataPointConverterBolt [ERROR] [{}, io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@77c527b5, io.latent.storm.rabbitmq.

....

Any thoughts on what can produce this empty tuples?

Thank you for your support
Best regards
Hugo