You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Filipa Moura <fi...@gmail.com> on 2014/10/28 18:35:36 UTC
Using Multischeme for KafkaSpout
Hi,
I wrote a multi scheme implementation for KafkaSpout that deserializes the
byte[] into a Java object. All good so far. However, I want that my bolt
that reads from this Spout to be grouped by an id. What is the best way to
do this?
This used to work fine:
List<Object> objects = new ArrayList<Object>();
objects.add(object);
result.add(objects);
and
@Override
public Fields getOutputFields() {
return new Fields(OUTPUT_FIELD_OBJECT);
}
- I tried declaring 2 fields for the scheme, OBJECT and OBJECT_ID and I
create a list for each and add then to the result which is Iterable
<List<Object>> but it is not working..
I think it crashes around here
List<Object> objects = new ArrayList<Object>();
objects.add(object);
result.add(objects);
result.add(ImmutableList.of((Object) object.getId()));
and here are the output fields:
@Override
public Fields getOutputFields() {
return new Fields(OUTPUT_FIELD_OBJECT, OUTPUT_FIELD_OBJECT_ID);
}
and here is the topology:
topologyBuilder.setSpout(TopologyParameter.KAFKA_SPOUT.getValue(),
kafkaSpout, 2);
// aggregator bold + write to hdfs
topologyBuilder.setBolt(TopologyParameter.AGGREGATOR_BOLT.getValue(),
aggregatorBolt).fieldsGrouping(
TopologyParameter.KAFKA_SPOUT.getValue(), new
Fields(Mx3AdvertiserScheme.OUTPUT_FIELD_OBJECT_ID));
38455 [Thread-12-mx3_kafka_spout] ERROR backtype.storm.util - Async loop
died!
java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at java.util.ArrayList.rangeCheck(ArrayList.java:604) ~[na:1.7.0_17]
at java.util.ArrayList.get(ArrayList.java:382) ~[na:1.7.0_17]
at backtype.storm.tuple.Fields.select(Fields.java:51)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$mk_fields_grouper$fn__3056.invoke(executor.clj:36)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.task$mk_tasks_fn$fn__3012.invoke(task.clj:158)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$fn__3284$fn__3299$send_spout_msg__3317.invoke(executor.clj:480)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$fn__3284$fn$reify__3326.emit(executor.clj:524)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.spout.SpoutOutputCollector.emit(SpoutOutputCollector.java:49)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.spout.SpoutOutputCollector.emit(SpoutOutputCollector.java:63)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.PartitionManager.next(PartitionManager.java:133)
~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141)
~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$fn__3284$fn__3299$fn__3328.invoke(executor.clj:563)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:431)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.0.jar:na]
at java.lang.Thread.run(Thread.java:722) [na:1.7.0_17]
I cannot find any examples of this anywhere.. Can I not output 2 fields
using MultiScheme?
Thank you for your help,
Filipa