You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Aleksey Makeyev <ma...@corp.mail.ru> on 2015/06/11 17:19:38 UTC

Race condition in PartitionedTridentSpoutExecutor?

Hello.

 I encountered an error in the following situation. I use topology.max.spout.pending = 20, so there could be multiple concurrent trident transactions. My custom spout implements IPartitionedTridentSpout. I added some partitions at code level and resubmitted topology. Then I saw that one partition ("telemetry,chronos") emitted data from the same offset: first time as part of transaction 10544171:1 in thread #48 of worker 28219, then as part of transaction 10544172:1 in thread #50 of worker 28220. After this thread #48 was scheduled to process another set of partitions and partition "telemetry,chronos" was "moved" to thread #50 of worker 28220. As a result, data was processed twice:

 2015-06-10 12:24:56 r.m.g.w.s.r.t.MTStreamSpout$Emitter [INFO] thread #48 (Thread-19-spout0) of process 28219@el7-makeev: emitPartitionBatchNew: partition=telemetry,chronos tx=10544171:1 cur pos: {"fileName":"telemetry-2015-06-10-12_00001","offset":627325755,"length":0} (of class: class org.json.simple.JSONObject)
 2015-06-10 12:24:58 r.m.g.w.s.r.t.MTStreamSpout$Emitter [INFO] thread #48 (Thread-19-spout0) of process 28219@el7-makeev: partition=telemetry,chronos tx=10544171:1 flushed 14873 tuples, new pos: {fileName=telemetry-2015-06-10-12_00001, offset=627325755, length=8388607}

 2015-06-10 12:24:56 r.m.g.w.s.r.t.MTStreamSpout$Emitter [INFO] thread #50 (Thread-21-spout0) of process 28220@el7-makeev: emitPartitionBatchNew: partition=telemetry,chronos tx=10544172:1 cur pos: {"fileName":"telemetry-2015-06-10-12_00001","offset":627325755,"length":0} (of class: class org.json.simple.JSONObject)
 2015-06-10 12:24:57 r.m.g.w.s.r.t.MTStreamSpout$Emitter [INFO] thread #50 (Thread-21-spout0) of process 28220@el7-makeev: partition=telemetry,chronos tx=10544172:1 flushed 14873 tuples, new pos: {fileName=telemetry-2015-06-10-12_00001, offset=627325755, length=8388607}

 Clearly, partitions were rebalanced across tasks, because I added some partitions. Could it be possible that there is a race condition in PartitionedTridentSpoutExecutor$Emitter.emitBatch in case of multiple concurrent trident transactions? e.g. tx 10544171 received old partitions from the coordinator and used offset 627325755 (which was also saved to zookeeper by tx 10544170), and has not yet saved new offset to state in zookeeper. But tx 10544172 with new partitions has concurrently read state of partition from zookeeper (which is still offset of tx 10544170) - 10544171 and 10544172 saw the same offset.

 -- 
 Best regards
 Aleksey Makeyev