You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Bharat Jayaraman Karthick <bh...@teclever.com> on 2016/04/29 19:09:48 UTC
Unable to merge DRPC Spout with any other spout using Trident
Hi,
We have a use case which requires a bolt consuming streams emitted by Kafka
spout & a DRPC spout. I used TridentTopology and tried to merge the streaam
but got error message "Cannot join DRPC stream with streams originating
from other spouts".
To check i used TopologyBuilder to merge these two streams and was able to
merge tuples / group the tuples emitted by these two streams.
Can you help me understand why TridentTopology throws the error message
when we try to merge DRPC stream with any other streaam emitted by any
spout.
For Trident, i used the following topology
TridentTopology topology = new TridentTopology();
SkuUpdatesKafkaEmulateSpout spout = new SkuUpdatesKafkaEmulateSpout(10);
Stream kafkaStream = topology.newStream("kafka_stream", spout);
Stream drpcStream = topology.newDRPCStream("drpc_stream", drpc)
.each(new Fields("args"), new DRPC_ArgsSplit(), new
Fields("sku", "new_value"));
Stream merged = topology.merge(kafkaStream, drpcStream);
merged.persistentAggregate(new MemoryMapState.Factory(), new
Fields("sku"), new Sum(), new Fields("value"))
.parallelismHint(2);
return topology.build();
For TopologyBuilder, i used the following topology
TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
conf.setDebug(true);
LocalDRPC drpc = new LocalDRPC();
DRPCSpout spout1 = new DRPCSpout("processOrder", drpc);
DemoSpout spout2 = new DemoSpout();
builder.setSpout("drpc", spout1);
builder.setSpout("demospout", spout2);
builder.setBolt("scraperBolt", new
XMLScrapperBolt()).shuffleGrouping("drpc").shuffleGrouping("demospout");
builder.setBolt("returnBolt", new
DRPCBolt()).shuffleGrouping("scraperBolt", "drpc-stream");
builder.setBolt("return", new
ReturnResults()).shuffleGrouping("returnBolt");
Regards,
Bharat Karthick J