You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Nick Beenham <ni...@gmail.com> on 2014/11/05 20:33:52 UTC

Invalid Topology Error

Hi All,

I'm getting an invalid topology error when trying to emit from one bolt to
another.

>From the create topology method:

private static String ODU_BOLT = "ODU";

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);
builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).shuffleGrouping("msgs");
//Bolts for message routing
builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT);

The emit method inside the CSG_BOLT

outputCollector.emit("ODU", tuple.getValues());

But I'm getting the below error...

331  [main] WARN  backtype.storm.StormSubmitter - Topology submission
exception: Component: [ODU] subscribes from non-existent stream: [default]
of component [CSG]
Exception in thread "main" InvalidTopologyException(msg:Component: [ODU]
subscribes from non-existent stream: [default] of component [CSG])


Regards,

Nick

Re: Invalid Topology Error

Posted by Nick Beenham <ni...@gmail.com>.
Yep that's it Nathan, I think I have my solution.

I have ...

TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);

        //builder.setBolt("print", new TestBolt()).shuffleGrouping("msgs");

        builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).shuffleGrouping("msgs");

        //Bolts for message routing

        builder.setBolt("ODU", new ODUBolt()).fieldsGrouping(CSG_BOLT,
"ODU", new Fields("ODU"));
        builder.setBolt(ADU_BOLT, new
ADUBolt()).fieldsGrouping(CSG_BOLT, "ADU", new Fields("ADU"));
        builder.setBolt(ASNS_BOLT, new
ASNSBolt()).fieldsGrouping(CSG_BOLT, "ASNS", new Fields("ASNS"));

and within the bolt:

	@Override
	public void execute(Tuple tuple, BasicOutputCollector outputCollector) {

		System.out.println("CSG Message received... ");
		byte[] data = tuple.getBinaryByField("bytes");		
		//
		System.out.println("CSG Message length is: " + data.length);

		ReflectData reflectData = ReflectData.AllowNull.get();
		Schema schema = reflectData.getSchema(Msg.class);

		DatumReader<Msg> reader = new ReflectDatumReader<Msg>(schema);
		Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
		try {
			Msg msg = reader.read(null, decoder);
			System.out.println(msg.toString());
			route = this.getRoute(msg);

			if(route == ORDERDETAILUPDATED){
				outputCollector.emit("ODU", tuple.getValues());
			}
			else if(route == ACCOUNTDETAILUPDATED){
				outputCollector.emit("ADU", tuple.getValues());
			}else if (route == ASNSent){
				outputCollector.emit("ASNS", tuple.getValues());
			}
			else {
				System.out.println("Message doesn't match a known type!");
			}	
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}
	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputDeclarer) {
		outputDeclarer.declareStream("ODU", new Fields("ODU"));
		outputDeclarer.declareStream("ADU", new Fields("ADU"));
		outputDeclarer.declareStream("ASNS", new Fields("ASNS"));
	}


As far as I can see it does the job. Let me know if you see something
horrible.

Nick
On Wed Nov 05 2014 at 4:33:50 PM Nathan Leung <nc...@gmail.com> wrote:

> If you mean a situation where bolt a is subscribed to by bolts b and c,
> and you want certain tuples to go to b and others to c, streams are again
> your answer.
> On Nov 5, 2014 3:34 PM, "Nick Beenham" <ni...@gmail.com> wrote:
>
>> Thanks Nathan, I'm moving forward again. Though I'm working out how to
>> stop them emitting to all the bolts and doing some proper routing :)
>>
>> On Wed Nov 05 2014 at 3:07:00 PM Nathan Leung <nc...@gmail.com> wrote:
>>
>>> I think CSG needs to use the appropriate declare method: https://storm.
>>> incubator.apache.org/apidocs/backtype/storm/topology/
>>> OutputFieldsDeclarer.html
>>>
>>> On Wed, Nov 5, 2014 at 3:02 PM, Nick Beenham <ni...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Nathan. The purpose is to do some rudimentary routing, I receive
>>>> messages of various types and need to treat them differently.
>>>>
>>>> When I follow your example:
>>>> //Bolts for message routing
>>>>         builder.setBolt("ODU", new ODUBolt()).shuffleGrouping(CSG_BOLT,
>>>> "ODU");
>>>>
>>>> I get the error
>>>> 328  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>>>> exception: Component: [ODU] subscribes from non-existent stream: [ODU] of
>>>> component [CSG]
>>>> Exception in thread "main" InvalidTopologyException(msg:Component:
>>>> [ODU] subscribes from non-existent stream: [ODU] of component [CSG])
>>>>
>>>> Am I missing another piece?
>>>>
>>>> Nick
>>>>
>>>>
>>>> On Wed Nov 05 2014 at 2:47:06 PM Nathan Leung <nc...@gmail.com>
>>>> wrote:
>>>>
>>>>> Your emit is on stream "ODU", but when you subscribed you did not
>>>>> specify any streams (hence the message saying [ODU] subscribes from stream
>>>>> [default] of [CSG]).  You need to do the emit without a stream, e.g.
>>>>> emit(tuple.getValues()), which will use the default stream.  You only need
>>>>> to specify the stream in the emit if you subscribe to a specific stream
>>>>> when you create the topology, e.g. builder.setBolt(ODU_BOLT, new
>>>>> ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit
>>>>> on this stream you would do emit("<some stream>", tuple.getValues());.
>>>>>
>>>>> On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <ni...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I'm getting an invalid topology error when trying to emit from one
>>>>>> bolt to another.
>>>>>>
>>>>>> From the create topology method:
>>>>>>
>>>>>> private static String ODU_BOLT = "ODU";
>>>>>>
>>>>>> TopologyBuilder builder = new TopologyBuilder();
>>>>>> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);
>>>>>> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).
>>>>>> shuffleGrouping("msgs");
>>>>>> //Bolts for message routing
>>>>>> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT);
>>>>>>
>>>>>> The emit method inside the CSG_BOLT
>>>>>>
>>>>>> outputCollector.emit("ODU", tuple.getValues());
>>>>>>
>>>>>> But I'm getting the below error...
>>>>>>
>>>>>> 331  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>>>>>> exception: Component: [ODU] subscribes from non-existent stream: [default]
>>>>>> of component [CSG]
>>>>>> Exception in thread "main" InvalidTopologyException(msg:Component:
>>>>>> [ODU] subscribes from non-existent stream: [default] of component [CSG])
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Nick
>>>>>>
>>>>>
>>>>>
>>>

Re: Invalid Topology Error

Posted by Nathan Leung <nc...@gmail.com>.
If you mean a situation where bolt a is subscribed to by bolts b and c, and
you want certain tuples to go to b and others to c, streams are again your
answer.
On Nov 5, 2014 3:34 PM, "Nick Beenham" <ni...@gmail.com> wrote:

> Thanks Nathan, I'm moving forward again. Though I'm working out how to
> stop them emitting to all the bolts and doing some proper routing :)
>
> On Wed Nov 05 2014 at 3:07:00 PM Nathan Leung <nc...@gmail.com> wrote:
>
>> I think CSG needs to use the appropriate declare method:
>> https://storm.incubator.apache.org/apidocs/backtype/storm/topology/OutputFieldsDeclarer.html
>>
>> On Wed, Nov 5, 2014 at 3:02 PM, Nick Beenham <ni...@gmail.com>
>> wrote:
>>
>>> Thanks Nathan. The purpose is to do some rudimentary routing, I receive
>>> messages of various types and need to treat them differently.
>>>
>>> When I follow your example:
>>> //Bolts for message routing
>>>         builder.setBolt("ODU", new ODUBolt()).shuffleGrouping(CSG_BOLT,
>>> "ODU");
>>>
>>> I get the error
>>> 328  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>>> exception: Component: [ODU] subscribes from non-existent stream: [ODU] of
>>> component [CSG]
>>> Exception in thread "main" InvalidTopologyException(msg:Component: [ODU]
>>> subscribes from non-existent stream: [ODU] of component [CSG])
>>>
>>> Am I missing another piece?
>>>
>>> Nick
>>>
>>>
>>> On Wed Nov 05 2014 at 2:47:06 PM Nathan Leung <nc...@gmail.com> wrote:
>>>
>>>> Your emit is on stream "ODU", but when you subscribed you did not
>>>> specify any streams (hence the message saying [ODU] subscribes from stream
>>>> [default] of [CSG]).  You need to do the emit without a stream, e.g.
>>>> emit(tuple.getValues()), which will use the default stream.  You only need
>>>> to specify the stream in the emit if you subscribe to a specific stream
>>>> when you create the topology, e.g. builder.setBolt(ODU_BOLT, new
>>>> ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit
>>>> on this stream you would do emit("<some stream>", tuple.getValues());.
>>>>
>>>> On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <ni...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I'm getting an invalid topology error when trying to emit from one
>>>>> bolt to another.
>>>>>
>>>>> From the create topology method:
>>>>>
>>>>> private static String ODU_BOLT = "ODU";
>>>>>
>>>>> TopologyBuilder builder = new TopologyBuilder();
>>>>> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);
>>>>> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).shuffleGrouping("msgs");
>>>>> //Bolts for message routing
>>>>> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT);
>>>>>
>>>>> The emit method inside the CSG_BOLT
>>>>>
>>>>> outputCollector.emit("ODU", tuple.getValues());
>>>>>
>>>>> But I'm getting the below error...
>>>>>
>>>>> 331  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>>>>> exception: Component: [ODU] subscribes from non-existent stream: [default]
>>>>> of component [CSG]
>>>>> Exception in thread "main" InvalidTopologyException(msg:Component:
>>>>> [ODU] subscribes from non-existent stream: [default] of component [CSG])
>>>>>
>>>>>
>>>>> Regards,
>>>>>
>>>>> Nick
>>>>>
>>>>
>>>>
>>

Re: Invalid Topology Error

Posted by Nick Beenham <ni...@gmail.com>.
if anyone has an example that would be really helpful :)

On Wed Nov 05 2014 at 3:33:51 PM Nick Beenham <ni...@gmail.com>
wrote:

> Thanks Nathan, I'm moving forward again. Though I'm working out how to
> stop them emitting to all the bolts and doing some proper routing :)
>
> On Wed Nov 05 2014 at 3:07:00 PM Nathan Leung <nc...@gmail.com> wrote:
>
>> I think CSG needs to use the appropriate declare method: https://storm.
>> incubator.apache.org/apidocs/backtype/storm/topology/
>> OutputFieldsDeclarer.html
>>
>> On Wed, Nov 5, 2014 at 3:02 PM, Nick Beenham <ni...@gmail.com>
>> wrote:
>>
>>> Thanks Nathan. The purpose is to do some rudimentary routing, I receive
>>> messages of various types and need to treat them differently.
>>>
>>> When I follow your example:
>>> //Bolts for message routing
>>>         builder.setBolt("ODU", new ODUBolt()).shuffleGrouping(CSG_BOLT,
>>> "ODU");
>>>
>>> I get the error
>>> 328  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>>> exception: Component: [ODU] subscribes from non-existent stream: [ODU] of
>>> component [CSG]
>>> Exception in thread "main" InvalidTopologyException(msg:Component:
>>> [ODU] subscribes from non-existent stream: [ODU] of component [CSG])
>>>
>>> Am I missing another piece?
>>>
>>> Nick
>>>
>>>
>>> On Wed Nov 05 2014 at 2:47:06 PM Nathan Leung <nc...@gmail.com> wrote:
>>>
>>>> Your emit is on stream "ODU", but when you subscribed you did not
>>>> specify any streams (hence the message saying [ODU] subscribes from stream
>>>> [default] of [CSG]).  You need to do the emit without a stream, e.g.
>>>> emit(tuple.getValues()), which will use the default stream.  You only need
>>>> to specify the stream in the emit if you subscribe to a specific stream
>>>> when you create the topology, e.g. builder.setBolt(ODU_BOLT, new
>>>> ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit
>>>> on this stream you would do emit("<some stream>", tuple.getValues());.
>>>>
>>>> On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <ni...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I'm getting an invalid topology error when trying to emit from one
>>>>> bolt to another.
>>>>>
>>>>> From the create topology method:
>>>>>
>>>>> private static String ODU_BOLT = "ODU";
>>>>>
>>>>> TopologyBuilder builder = new TopologyBuilder();
>>>>> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);
>>>>> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).
>>>>> shuffleGrouping("msgs");
>>>>> //Bolts for message routing
>>>>> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT);
>>>>>
>>>>> The emit method inside the CSG_BOLT
>>>>>
>>>>> outputCollector.emit("ODU", tuple.getValues());
>>>>>
>>>>> But I'm getting the below error...
>>>>>
>>>>> 331  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>>>>> exception: Component: [ODU] subscribes from non-existent stream: [default]
>>>>> of component [CSG]
>>>>> Exception in thread "main" InvalidTopologyException(msg:Component:
>>>>> [ODU] subscribes from non-existent stream: [default] of component [CSG])
>>>>>
>>>>>
>>>>> Regards,
>>>>>
>>>>> Nick
>>>>>
>>>>
>>>>
>>

Re: Invalid Topology Error

Posted by Nick Beenham <ni...@gmail.com>.
Thanks Nathan, I'm moving forward again. Though I'm working out how to stop
them emitting to all the bolts and doing some proper routing :)

On Wed Nov 05 2014 at 3:07:00 PM Nathan Leung <nc...@gmail.com> wrote:

> I think CSG needs to use the appropriate declare method:
> https://storm.incubator.apache.org/apidocs/backtype/storm/topology/OutputFieldsDeclarer.html
>
> On Wed, Nov 5, 2014 at 3:02 PM, Nick Beenham <ni...@gmail.com>
> wrote:
>
>> Thanks Nathan. The purpose is to do some rudimentary routing, I receive
>> messages of various types and need to treat them differently.
>>
>> When I follow your example:
>> //Bolts for message routing
>>         builder.setBolt("ODU", new ODUBolt()).shuffleGrouping(CSG_BOLT,
>> "ODU");
>>
>> I get the error
>> 328  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>> exception: Component: [ODU] subscribes from non-existent stream: [ODU] of
>> component [CSG]
>> Exception in thread "main" InvalidTopologyException(msg:Component: [ODU]
>> subscribes from non-existent stream: [ODU] of component [CSG])
>>
>> Am I missing another piece?
>>
>> Nick
>>
>>
>> On Wed Nov 05 2014 at 2:47:06 PM Nathan Leung <nc...@gmail.com> wrote:
>>
>>> Your emit is on stream "ODU", but when you subscribed you did not
>>> specify any streams (hence the message saying [ODU] subscribes from stream
>>> [default] of [CSG]).  You need to do the emit without a stream, e.g.
>>> emit(tuple.getValues()), which will use the default stream.  You only need
>>> to specify the stream in the emit if you subscribe to a specific stream
>>> when you create the topology, e.g. builder.setBolt(ODU_BOLT, new
>>> ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit on
>>> this stream you would do emit("<some stream>", tuple.getValues());.
>>>
>>> On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <ni...@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I'm getting an invalid topology error when trying to emit from one bolt
>>>> to another.
>>>>
>>>> From the create topology method:
>>>>
>>>> private static String ODU_BOLT = "ODU";
>>>>
>>>> TopologyBuilder builder = new TopologyBuilder();
>>>> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);
>>>> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).shuffleGrouping("msgs");
>>>> //Bolts for message routing
>>>> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT);
>>>>
>>>> The emit method inside the CSG_BOLT
>>>>
>>>> outputCollector.emit("ODU", tuple.getValues());
>>>>
>>>> But I'm getting the below error...
>>>>
>>>> 331  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>>>> exception: Component: [ODU] subscribes from non-existent stream: [default]
>>>> of component [CSG]
>>>> Exception in thread "main" InvalidTopologyException(msg:Component:
>>>> [ODU] subscribes from non-existent stream: [default] of component [CSG])
>>>>
>>>>
>>>> Regards,
>>>>
>>>> Nick
>>>>
>>>
>>>
>

Re: Invalid Topology Error

Posted by Nathan Leung <nc...@gmail.com>.
I think CSG needs to use the appropriate declare method:
https://storm.incubator.apache.org/apidocs/backtype/storm/topology/OutputFieldsDeclarer.html

On Wed, Nov 5, 2014 at 3:02 PM, Nick Beenham <ni...@gmail.com> wrote:

> Thanks Nathan. The purpose is to do some rudimentary routing, I receive
> messages of various types and need to treat them differently.
>
> When I follow your example:
> //Bolts for message routing
>         builder.setBolt("ODU", new ODUBolt()).shuffleGrouping(CSG_BOLT,
> "ODU");
>
> I get the error
> 328  [main] WARN  backtype.storm.StormSubmitter - Topology submission
> exception: Component: [ODU] subscribes from non-existent stream: [ODU] of
> component [CSG]
> Exception in thread "main" InvalidTopologyException(msg:Component: [ODU]
> subscribes from non-existent stream: [ODU] of component [CSG])
>
> Am I missing another piece?
>
> Nick
>
>
> On Wed Nov 05 2014 at 2:47:06 PM Nathan Leung <nc...@gmail.com> wrote:
>
>> Your emit is on stream "ODU", but when you subscribed you did not specify
>> any streams (hence the message saying [ODU] subscribes from stream
>> [default] of [CSG]).  You need to do the emit without a stream, e.g.
>> emit(tuple.getValues()), which will use the default stream.  You only need
>> to specify the stream in the emit if you subscribe to a specific stream
>> when you create the topology, e.g. builder.setBolt(ODU_BOLT, new
>> ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit on
>> this stream you would do emit("<some stream>", tuple.getValues());.
>>
>> On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <ni...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>>
>>> I'm getting an invalid topology error when trying to emit from one bolt
>>> to another.
>>>
>>> From the create topology method:
>>>
>>> private static String ODU_BOLT = "ODU";
>>>
>>> TopologyBuilder builder = new TopologyBuilder();
>>> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);
>>> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).shuffleGrouping("msgs");
>>> //Bolts for message routing
>>> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT);
>>>
>>> The emit method inside the CSG_BOLT
>>>
>>> outputCollector.emit("ODU", tuple.getValues());
>>>
>>> But I'm getting the below error...
>>>
>>> 331  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>>> exception: Component: [ODU] subscribes from non-existent stream: [default]
>>> of component [CSG]
>>> Exception in thread "main" InvalidTopologyException(msg:Component: [ODU]
>>> subscribes from non-existent stream: [default] of component [CSG])
>>>
>>>
>>> Regards,
>>>
>>> Nick
>>>
>>
>>

Re: Invalid Topology Error

Posted by Nick Beenham <ni...@gmail.com>.
Thanks Nathan. The purpose is to do some rudimentary routing, I receive
messages of various types and need to treat them differently.

When I follow your example:
//Bolts for message routing
        builder.setBolt("ODU", new ODUBolt()).shuffleGrouping(CSG_BOLT,
"ODU");

I get the error
328  [main] WARN  backtype.storm.StormSubmitter - Topology submission
exception: Component: [ODU] subscribes from non-existent stream: [ODU] of
component [CSG]
Exception in thread "main" InvalidTopologyException(msg:Component: [ODU]
subscribes from non-existent stream: [ODU] of component [CSG])

Am I missing another piece?

Nick


On Wed Nov 05 2014 at 2:47:06 PM Nathan Leung <nc...@gmail.com> wrote:

> Your emit is on stream "ODU", but when you subscribed you did not specify
> any streams (hence the message saying [ODU] subscribes from stream
> [default] of [CSG]).  You need to do the emit without a stream, e.g.
> emit(tuple.getValues()), which will use the default stream.  You only need
> to specify the stream in the emit if you subscribe to a specific stream
> when you create the topology, e.g. builder.setBolt(ODU_BOLT, new
> ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit on
> this stream you would do emit("<some stream>", tuple.getValues());.
>
> On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <ni...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I'm getting an invalid topology error when trying to emit from one bolt
>> to another.
>>
>> From the create topology method:
>>
>> private static String ODU_BOLT = "ODU";
>>
>> TopologyBuilder builder = new TopologyBuilder();
>> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);
>> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).shuffleGrouping("msgs");
>> //Bolts for message routing
>> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT);
>>
>> The emit method inside the CSG_BOLT
>>
>> outputCollector.emit("ODU", tuple.getValues());
>>
>> But I'm getting the below error...
>>
>> 331  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>> exception: Component: [ODU] subscribes from non-existent stream: [default]
>> of component [CSG]
>> Exception in thread "main" InvalidTopologyException(msg:Component: [ODU]
>> subscribes from non-existent stream: [default] of component [CSG])
>>
>>
>> Regards,
>>
>> Nick
>>
>
>

Re: Invalid Topology Error

Posted by Nathan Leung <nc...@gmail.com>.
Your emit is on stream "ODU", but when you subscribed you did not specify
any streams (hence the message saying [ODU] subscribes from stream
[default] of [CSG]).  You need to do the emit without a stream, e.g.
emit(tuple.getValues()), which will use the default stream.  You only need
to specify the stream in the emit if you subscribe to a specific stream
when you create the topology, e.g. builder.setBolt(ODU_BOLT, new
ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit on
this stream you would do emit("<some stream>", tuple.getValues());.

On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <ni...@gmail.com> wrote:

> Hi All,
>
> I'm getting an invalid topology error when trying to emit from one bolt to
> another.
>
> From the create topology method:
>
> private static String ODU_BOLT = "ODU";
>
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);
> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).shuffleGrouping("msgs");
> //Bolts for message routing
> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT);
>
> The emit method inside the CSG_BOLT
>
> outputCollector.emit("ODU", tuple.getValues());
>
> But I'm getting the below error...
>
> 331  [main] WARN  backtype.storm.StormSubmitter - Topology submission
> exception: Component: [ODU] subscribes from non-existent stream: [default]
> of component [CSG]
> Exception in thread "main" InvalidTopologyException(msg:Component: [ODU]
> subscribes from non-existent stream: [default] of component [CSG])
>
>
> Regards,
>
> Nick
>