You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Jacques <ar...@gmail.com> on 2014/07/29 05:22:50 UTC
Merging trident streams blocks the FixedBatchSpout
Hi guys,
I need some help understanding why merging two streams blocks one of the spouts of class FixedBatchSpout.
I’ve been stuck 2 days on this, help would be much appreciated (see details below).
Thanks,
Jacques
Short Description: I’m trying to merge two streams s1 and s2, but calling topology.merge(s1, s2) blocks the FixedBatchSpout from which s1 originates, whereas the BaseRichSpout from s2 seems to work properly.
Details: In the below main method, just adding the line topology.merge(s1, s2); prevents the FixedBatchSpout to emit past its first batch. This happens with multireduce as well.
FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("sentence"), 2,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy”));
FixedLoopSpout spout2 = new FixedLoopSpout(new Fields("sentence"),
new Values("THE COW JUMPED OVER THE MOON"),
new Values("THE MAN WENT TO THE STORE AND BOUGHT SOME CANDY"));
Stream s1 = topology.newStream("hello", spout1);
Stream s2 = topology.newStream("world", spout2);
topology.merge(s1, s2);
public class FixedLoopSpout extends BaseRichSpout {
Values[] values;
List<Values> loop = new LinkedList<Values>();
Iterator<Values> head;
private SpoutOutputCollector collector;
private final Fields outputFields;
private long emitted = 0;
public FixedLoopSpout(Fields outputFields, Values... values) {
this.outputFields = outputFields;
this.values = values;
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
for (Values value: this.values) {
this.loop.add(value);
}
this.head = this.loop.iterator();
}
public void nextTuple() {
if (!this.head.hasNext()) {
// wrap
this.head = this.loop.iterator();
}
this.collector.emit(this.head.next(), this.emitted++);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(this.outputFields);
}
}
Should I be merging transactional and non-transactional streams?
Posted by Jacques <ar...@gmail.com>.
Hello,
Is there a reason why I shouldn’t merge a transactional and a non-transactional trident stream?
Thanks,
Jacques
Begin forwarded message:
> From: Jacques <ar...@gmail.com>
> Subject: Merging trident streams blocks the FixedBatchSpout
> Date: July 28, 2014 at 8:22:50 PM PDT
> To: user@storm.incubator.apache.org
>
> Hi guys,
> I need some help understanding why merging two streams blocks one of the spouts of class FixedBatchSpout.
> I’ve been stuck 2 days on this, help would be much appreciated (see details below).
>
> Thanks,
> Jacques
>
>
> Short Description: I’m trying to merge two streams s1 and s2, but calling topology.merge(s1, s2) blocks the FixedBatchSpout from which s1 originates, whereas the BaseRichSpout from s2 seems to work properly.
>
> Details: In the below main method, just adding the line topology.merge(s1, s2); prevents the FixedBatchSpout to emit past its first batch. This happens with multireduce as well.
>
> FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("sentence"), 2,
> new Values("the cow jumped over the moon"),
> new Values("the man went to the store and bought some candy”));
>
> FixedLoopSpout spout2 = new FixedLoopSpout(new Fields("sentence"),
> new Values("THE COW JUMPED OVER THE MOON"),
> new Values("THE MAN WENT TO THE STORE AND BOUGHT SOME CANDY"));
>
> Stream s1 = topology.newStream("hello", spout1);
> Stream s2 = topology.newStream("world", spout2);
> topology.merge(s1, s2);
>
>
> public class FixedLoopSpout extends BaseRichSpout {
>
> Values[] values;
> List<Values> loop = new LinkedList<Values>();
> Iterator<Values> head;
> private SpoutOutputCollector collector;
> private final Fields outputFields;
>
> private long emitted = 0;
>
> public FixedLoopSpout(Fields outputFields, Values... values) {
> this.outputFields = outputFields;
> this.values = values;
> }
>
> public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
> this.collector = collector;
> for (Values value: this.values) {
> this.loop.add(value);
> }
> this.head = this.loop.iterator();
> }
>
> public void nextTuple() {
> if (!this.head.hasNext()) {
> // wrap
> this.head = this.loop.iterator();
> }
> this.collector.emit(this.head.next(), this.emitted++);
> try {
> Thread.sleep(100);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> }
>
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(this.outputFields);
> }
> }