You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Filipa Moura <fi...@gmail.com> on 2014/10/28 18:35:36 UTC

Using Multischeme for KafkaSpout

Hi,
I wrote a multi scheme implementation for KafkaSpout that deserializes the
byte[] into a Java object. All good so far. However, I want that my bolt
that reads from this Spout to be grouped by an id. What is the best way to
do this?

This used to work fine:

List<Object> objects = new ArrayList<Object>();

objects.add(object);

result.add(objects);


and

@Override

    public Fields getOutputFields() {

        return new Fields(OUTPUT_FIELD_OBJECT);

    }




- I tried declaring 2 fields for the scheme, OBJECT and OBJECT_ID and I
create a list for each and add then to the result which is Iterable
<List<Object>> but it is not working..

I think it crashes around here

List<Object> objects = new ArrayList<Object>();

objects.add(object);

result.add(objects);

result.add(ImmutableList.of((Object) object.getId()));


and here are the output fields:

@Override

    public Fields getOutputFields() {

        return new Fields(OUTPUT_FIELD_OBJECT, OUTPUT_FIELD_OBJECT_ID);

    }


and here is the topology:

 topologyBuilder.setSpout(TopologyParameter.KAFKA_SPOUT.getValue(),
kafkaSpout, 2);


        // aggregator bold + write to hdfs

        topologyBuilder.setBolt(TopologyParameter.AGGREGATOR_BOLT.getValue(),
aggregatorBolt).fieldsGrouping(

                        TopologyParameter.KAFKA_SPOUT.getValue(), new
Fields(Mx3AdvertiserScheme.OUTPUT_FIELD_OBJECT_ID));


38455 [Thread-12-mx3_kafka_spout] ERROR backtype.storm.util - Async loop
died!

java.lang.IndexOutOfBoundsException: Index: 1, Size: 1

at java.util.ArrayList.rangeCheck(ArrayList.java:604) ~[na:1.7.0_17]

at java.util.ArrayList.get(ArrayList.java:382) ~[na:1.7.0_17]

at backtype.storm.tuple.Fields.select(Fields.java:51)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]

at
backtype.storm.daemon.executor$mk_fields_grouper$fn__3056.invoke(executor.clj:36)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]

at backtype.storm.daemon.task$mk_tasks_fn$fn__3012.invoke(task.clj:158)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]

at
backtype.storm.daemon.executor$fn__3284$fn__3299$send_spout_msg__3317.invoke(executor.clj:480)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]

at
backtype.storm.daemon.executor$fn__3284$fn$reify__3326.emit(executor.clj:524)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]

at
backtype.storm.spout.SpoutOutputCollector.emit(SpoutOutputCollector.java:49)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]

at
backtype.storm.spout.SpoutOutputCollector.emit(SpoutOutputCollector.java:63)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]

at storm.kafka.PartitionManager.next(PartitionManager.java:133)
~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]

at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141)
~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]

at
backtype.storm.daemon.executor$fn__3284$fn__3299$fn__3328.invoke(executor.clj:563)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]

at backtype.storm.util$async_loop$fn__452.invoke(util.clj:431)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]

at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.0.jar:na]

 at java.lang.Thread.run(Thread.java:722) [na:1.7.0_17]



I cannot find any examples of this anywhere.. Can I not output 2 fields
using MultiScheme?


Thank you for your help,

Filipa