You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Jacques <ar...@gmail.com> on 2014/07/29 05:22:50 UTC

Merging trident streams blocks the FixedBatchSpout

Hi guys,
I need some help understanding why merging two streams blocks one of the spouts of class FixedBatchSpout.
I’ve been stuck 2 days on this, help would be much appreciated (see details below).

Thanks,
Jacques


Short Description: I’m trying to merge two streams s1 and s2, but calling topology.merge(s1, s2) blocks the FixedBatchSpout from which s1 originates, whereas the BaseRichSpout from s2 seems to work properly.

Details: In the below main method, just adding the line topology.merge(s1, s2); prevents the FixedBatchSpout to emit past its first batch. This happens with multireduce as well.

FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("sentence"), 2,
				new Values("the cow jumped over the moon"),
				new Values("the man went to the store and bought some candy”));

FixedLoopSpout spout2 = new FixedLoopSpout(new Fields("sentence"),
				new Values("THE COW JUMPED OVER THE MOON"),
				new Values("THE MAN WENT TO THE STORE AND BOUGHT SOME CANDY"));

Stream s1 = topology.newStream("hello", spout1);
Stream s2 = topology.newStream("world", spout2);
topology.merge(s1, s2);


public class FixedLoopSpout extends BaseRichSpout {

	Values[] values;
	List<Values> loop = new LinkedList<Values>();
	Iterator<Values> head;
	private SpoutOutputCollector collector;
	private final Fields outputFields;

	private long emitted = 0;

	public FixedLoopSpout(Fields outputFields, Values... values) {
		this.outputFields = outputFields;
		this.values = values;
	}

	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector;
		for (Values value: this.values) {
			this.loop.add(value);
		}
		this.head = this.loop.iterator();
	}

	public void nextTuple() {
		if (!this.head.hasNext()) {
			// wrap
			this.head = this.loop.iterator();
		}
		this.collector.emit(this.head.next(), this.emitted++);
		try {
			Thread.sleep(100);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(this.outputFields);
	}
}

Should I be merging transactional and non-transactional streams?

Posted by Jacques <ar...@gmail.com>.
Hello,
Is there a reason why I shouldn’t merge a transactional and a non-transactional trident stream?

Thanks,
Jacques

Begin forwarded message:

> From: Jacques <ar...@gmail.com>
> Subject: Merging trident streams blocks the FixedBatchSpout
> Date: July 28, 2014 at 8:22:50 PM PDT
> To: user@storm.incubator.apache.org
> 
> Hi guys,
> I need some help understanding why merging two streams blocks one of the spouts of class FixedBatchSpout.
> I’ve been stuck 2 days on this, help would be much appreciated (see details below).
> 
> Thanks,
> Jacques
> 
> 
> Short Description: I’m trying to merge two streams s1 and s2, but calling topology.merge(s1, s2) blocks the FixedBatchSpout from which s1 originates, whereas the BaseRichSpout from s2 seems to work properly.
> 
> Details: In the below main method, just adding the line topology.merge(s1, s2); prevents the FixedBatchSpout to emit past its first batch. This happens with multireduce as well.
> 
> FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("sentence"), 2,
> 				new Values("the cow jumped over the moon"),
> 				new Values("the man went to the store and bought some candy”));
> 
> FixedLoopSpout spout2 = new FixedLoopSpout(new Fields("sentence"),
> 				new Values("THE COW JUMPED OVER THE MOON"),
> 				new Values("THE MAN WENT TO THE STORE AND BOUGHT SOME CANDY"));
> 
> Stream s1 = topology.newStream("hello", spout1);
> Stream s2 = topology.newStream("world", spout2);
> topology.merge(s1, s2);
> 
> 
> public class FixedLoopSpout extends BaseRichSpout {
> 
> 	Values[] values;
> 	List<Values> loop = new LinkedList<Values>();
> 	Iterator<Values> head;
> 	private SpoutOutputCollector collector;
> 	private final Fields outputFields;
> 
> 	private long emitted = 0;
> 
> 	public FixedLoopSpout(Fields outputFields, Values... values) {
> 		this.outputFields = outputFields;
> 		this.values = values;
> 	}
> 
> 	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
> 		this.collector = collector;
> 		for (Values value: this.values) {
> 			this.loop.add(value);
> 		}
> 		this.head = this.loop.iterator();
> 	}
> 
> 	public void nextTuple() {
> 		if (!this.head.hasNext()) {
> 			// wrap
> 			this.head = this.loop.iterator();
> 		}
> 		this.collector.emit(this.head.next(), this.emitted++);
> 		try {
> 			Thread.sleep(100);
> 		} catch (InterruptedException e) {
> 			e.printStackTrace();
> 		}
> 	}
> 
> 	public void declareOutputFields(OutputFieldsDeclarer declarer) {
> 		declarer.declare(this.outputFields);
> 	}
> }