You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Simon Cooper <si...@featurespace.co.uk> on 2014/05/12 11:35:10 UTC

Emitter.emitPartitionBatchNew not being called after Coordinator.isReady returns true

I've got a very very strange problem with one of my topologies. We've tried deploying to a clustered environment, and the trident topology we've got isn't running the Emitter when the Coordinator returns true from isReady(). At all. The logging message right at the start of the method is not being printed to the worker logs. The getOrderedPartitions and refreshPartitions are called, but not emitPartitionBatchNew

This is the output of the debug logs when an event is sent to the kafka topic:

2014-05-09 12:19:57 u.c.f.t.k.t.TridentKafkaSpout$Coordinator [DEBUG] Spout ready to read from EventsIn at offset 253
2014-05-09 12:19:57 u.c.f.t.k.t.TridentKafkaSpout$Coordinator [DEBUG] Coordinator triggering batch 171
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: $mastercoord-bg0 $batch [171:0]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source: $mastercoord-bg0:1, stream: $batch, id: {-8335840769555710216=-5840416573241105698}, [171:0]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: $mastercoord-bg0 __ack_init [-8335840769555710216 -5840416573241105698 1]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-8335840769555710216 -5840416573241105698 1]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: $spoutcoord-spout0 $batch [171:0, [0]]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: $spoutcoord-spout0 __ack_ack [-8335840769555710216 -6305116143407737306]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source: $spoutcoord-spout0:2, stream: $batch, id: {-8335840769555710216=472058086231717112}, [171:0, [0]]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source: $spoutcoord-spout0:2, stream: __ack_ack, id: {}, [-8335840769555710216 -6305116143407737306]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting direct: 4; spout0 $coord-bg0 [171:0, 0]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: spout0 __ack_ack [-8335840769555710216 -7843298785019326174]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source: spout0:5, stream: $coord-bg0, id: {-8335840769555710216=-7662286408926976550}, [171:0, 0]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: b-0-events-events-events-Entities-consumer-partition-Entities-bin-partition-consumer-output __ack_ack [-8335840769555710216 -7662286408926976550]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source: spout0:5, stream: __ack_ack, id: {}, [-8335840769555710216 -7843298785019326174]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source: b-0-events-events-events-Entities-consumer-partition-Entities-bin-partition-consumer-output:4, stream: __ack_ack, id: {}, [-8335840769555710216 -7662286408926976550]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting direct: 1; __acker __ack_ack [-8335840769555710216]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source: __acker:3, stream: __ack_ack, id: {}, [-8335840769555710216]
2014-05-09 12:19:57 b.s.d.executor [INFO] Acking message 171:0
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: $mastercoord-bg0 $commit [171:0]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source: $mastercoord-bg0:1, stream: $commit, id: {-4294881045221605715=-9054173560631528320}, [171:0]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: b-0-events-events-events-Entities-consumer-partition-Entities-bin-partition-consumer-output __ack_ack [-4294881045221605715 -9054173560631528320]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source: b-0-events-events-events-Entities-consumer-partition-Entities-bin-partition-consumer-output:4, stream: __ack_ack, id: {}, [-4294881045221605715 -9054173560631528320]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: $mastercoord-bg0 __ack_init [-4294881045221605715 -9054173560631528320 1]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-4294881045221605715 -9054173560631528320 1]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting direct: 1; __acker __ack_ack [-4294881045221605715]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source: __acker:3, stream: __ack_ack, id: {}, [-4294881045221605715]
2014-05-09 12:19:57 b.s.d.executor [INFO] Acking message 171:0
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: $mastercoord-bg0 $success [171:0]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source: $mastercoord-bg0:1, stream: $success, id: {}, [171:0]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source: $mastercoord-bg0:1, stream: $success, id: {}, [171:0]

And this is the storm UI for the topology:
Spouts (All time)
Id

Executors

Tasks

Emitted

Transferred

Complete latency (ms)

Acked

Failed

Last error

$mastercoord-bg0

1

1

20

20

0.000

0

0

Bolts (All time)
Id

Executors

Tasks

Emitted

Transferred

Capacity (last 10m)

Execute latency (ms)

Executed

Process latency (ms)

Acked

Failed

Last error

$spoutcoord-spout0

1

1

20

0

0.000

0.000

0

0.000

0

0

__acker

1

1

0

0

0.000

0.000

0

0.000

20

0

b-0-events-events-events-Entities-consumer-partition-Entities-bin-partition-consumer-output

1

1

20

20

0.000

0.000

120

0.000

0

0

spout0

1

1

20

20

0.000

0.000

120

0.000

0

0



TridentKafkaSpout is our own kafka spout, implemented as a PartitionedTridentSpout. The coordinator does a long poll of the input topic, and returns true from isReady when the long poll returns with new data in the topic (the two debug messages at the top indicate this is happening)

There's only a single non-spout bolt in the topology, called 'b-0-events-events-events-Entities-consumer-partition-Entities-bin-partition-consumer-output'. From what I can see, emiPartitionBatchNew simply isn't being called, even though it's a new batch, and the implementation of TridentKafkaSpout ensures there's always 1 partition in the batch.

Can anyone shed any light as to what's going on, why the emitter is not being called to output data? Or provide any useful pointers to how I might debug this problem more? I've turned on all the logging I can, and there's no obvious next step I can explore to fix this issue.

Many thanks,
SimonC