You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Adrian Landman <ad...@gmail.com> on 2014/07/28 18:54:37 UTC

Bolt vs Spout

I am writing a topology that pulls messages from a topic, does some work,
and then writes them back on a different topic.  I have been having some
issues so I created my own small topology that just pulls a message, prints
the contents, and then stores them back on a new topic.  I finally got this
to work, but it raised a question.

To create the spout I need to either pass in the kafka location sans port
(e.g. localhost) or use 2181 as the port.

To create the producer bolt I need to pass in the broker port (e.g. 9092)
or I get an array out of bounds exception when creating the Producer.

When we were using kafka7 and
https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka/src/jvm/storm/kafka
for our storm/kafka integration I believe that we used the same broker list
for both our spout and our producer.  Is there anyway to do the same with
kafka8 and the new storm-kafka project?  Also, if w want to pass in a list,
I know that in the KafkaBolt we can set metadata.broker.list to a comma
separated list of brokers (1.1.1.1:9092, 1.1.1.2:9092) but can we do the
same for the spout?  Or is there any reason to?  ZkHost takes in a String,
but I didn't see anything that specified the format.

Re: Bolt vs Spout

Posted by Harsha <st...@harsha.io>.
Hi Adrian,

             KafkaSpout is a consumer in this case you would be
connecting to zookeeper and KafkaBolt which is a
producer(kafka) needs to connect to a list of broker
(localhost:9092). KafkaSpout uses SpoutConfig in which you can
add to List<String>zkServers and for KafkaBolt you can create a
Properties object with "metadata.broker.list" and pass a comma
separated strings.

-Harsha



On Tue, Jul 29, 2014, at 05:11 AM, Adrian Landman wrote:

I feel like you missed the issue in my question.  For the
connection string for ZkHosts, if I pass in localhost:9092 with
a default kafka configuration, it won't connect.  Instead I
have to pass in localhost:2181.  Is this expected behavior?
Also, if I wanted to pass in more than one host, what should
separate the entries?  Commas?



On Mon, Jul 28, 2014 at 4:17 PM, Parth Brahmbhatt
<[1...@hortonworks.com> wrote:

For setting a list of brokers in kafkaSpout, I believe there
are 2 options:

If you use StaticHosts then you need to add
GlobalPartitionInformation in which you have to specify each
partition and its corresponding broker host

GlobalPartitionInformation partitions = new
GlobalPartitionInformation();
partitions.addPartition(0,new Broker("10.22.2.79", 9092));
//add more partitions here.
BrokerHosts hosts = new StaticHosts(partitions);

Alternatively, If you use ZkHosts then you need to pass in the
complete zookeeper connection string e.g. lolcalhost:9092 ,
optionally there is a constructor that allows you to specify a
second argument which is zkroot , by default it is set
to /brokers which should work with default kafka installation.
The code in ZlHosts looks under zkroot/topics/<topic that you
set in SpoutConfig>/partitions to figure out the number of
partitions and leader for each partition.

Thanks
Parth




On Mon, Jul 28, 2014 at 9:54 AM, Adrian Landman
<[2...@gmail.com> wrote:

I am writing a topology that pulls messages from a topic, does
some work, and then writes them back on a different topic.  I
have been having some issues so I created my own small topology
that just pulls a message, prints the contents, and then stores
them back on a new topic.  I finally got this to work, but it
raised a question.

To create the spout I need to either pass in the kafka location
sans port (e.g. localhost) or use 2181 as the port.

To create the producer bolt I need to pass in the broker port
(e.g. 9092) or I get an array out of bounds exception when
creating the Producer.

When we were using kafka7 and
[3]https://github.com/nathanmarz/storm-contrib/tree/master/stor
m-kafka/src/jvm/storm/kafka
for our storm/kafka integration I believe that we used the same
broker list for both our spout and our producer.  Is there
anyway to do the same with kafka8 and the new storm-kafka
project?  Also, if w want to pass in a list, I know that in the
KafkaBolt we can set metadata.broker.list to a comma separated
list of brokers ([4]1.1.1.1:9092, [5]1.1.1.2:9092) but can we
do the same for the spout?  Or is there any reason to?  ZkHost
takes in a String, but I didn't see anything that specified the
format.




--
Thanks
Parth



CONFIDENTIALITY NOTICE

NOTICE: This message is intended for the use of the individual
or entity to which it is addressed and may contain information
that is confidential, privileged and exempt from disclosure
under applicable law. If the reader of this message is not the
intended recipient, you are hereby notified that any printing,
copying, dissemination, distribution, disclosure or forwarding
of this communication is strictly prohibited. If you have
received this communication in error, please contact the sender
immediately and delete it from your system. Thank You.

References

1. mailto:pbrahmbhatt@hortonworks.com
2. mailto:adrian.landman@gmail.com
3. https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka/src/jvm/storm/kafka
4. http://1.1.1.1:9092/
5. http://1.1.1.2:9092/

Re: Bolt vs Spout

Posted by Adrian Landman <ad...@gmail.com>.
I feel like you missed the issue in my question.  For the connection string
for ZkHosts, if I pass in localhost:9092 with a default kafka
configuration, it won't connect.  Instead I have to pass in localhost:2181.
 Is this expected behavior?  Also, if I wanted to pass in more than one
host, what should separate the entries?  Commas?


On Mon, Jul 28, 2014 at 4:17 PM, Parth Brahmbhatt <
pbrahmbhatt@hortonworks.com> wrote:

> For setting a list of brokers in kafkaSpout, I believe there are 2 options:
>
> If you use StaticHosts then you need to add GlobalPartitionInformation in
> which you have to specify each partition and its corresponding broker host
>
> GlobalPartitionInformation partitions = new GlobalPartitionInformation();
> partitions.addPartition(0,new Broker("10.22.2.79", 9092));
> //add more partitions here.
> BrokerHosts hosts = new StaticHosts(partitions);
>
> Alternatively, If you use ZkHosts then you need to pass in the complete
> zookeeper connection string e.g. lolcalhost:9092 , optionally there is a
> constructor that allows you to specify a second argument which is zkroot ,
> by default it is set to /brokers which should work with default kafka
> installation. The code in ZlHosts looks under zkroot/topics/<topic that you
> set in SpoutConfig>/partitions to figure out the number of partitions and
> leader for each partition.
>
> Thanks
> Parth
>
>
>
>
> On Mon, Jul 28, 2014 at 9:54 AM, Adrian Landman <ad...@gmail.com>
> wrote:
>
>> I am writing a topology that pulls messages from a topic, does some work,
>> and then writes them back on a different topic.  I have been having some
>> issues so I created my own small topology that just pulls a message, prints
>> the contents, and then stores them back on a new topic.  I finally got this
>> to work, but it raised a question.
>>
>> To create the spout I need to either pass in the kafka location sans port
>> (e.g. localhost) or use 2181 as the port.
>>
>> To create the producer bolt I need to pass in the broker port (e.g. 9092)
>> or I get an array out of bounds exception when creating the Producer.
>>
>> When we were using kafka7 and
>>
>> https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka/src/jvm/storm/kafka
>> for our storm/kafka integration I believe that we used the same broker
>> list for both our spout and our producer.  Is there anyway to do the same
>> with kafka8 and the new storm-kafka project?  Also, if w want to pass in a
>> list, I know that in the KafkaBolt we can set metadata.broker.list to a
>> comma separated list of brokers (1.1.1.1:9092, 1.1.1.2:9092) but can we
>> do the same for the spout?  Or is there any reason to?  ZkHost takes in a
>> String, but I didn't see anything that specified the format.
>>
>
>
>
> --
> Thanks
> Parth
>
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity
> to which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.

Re: Bolt vs Spout

Posted by Parth Brahmbhatt <pb...@hortonworks.com>.
For setting a list of brokers in kafkaSpout, I believe there are 2 options:

If you use StaticHosts then you need to add GlobalPartitionInformation in
which you have to specify each partition and its corresponding broker host

GlobalPartitionInformation partitions = new GlobalPartitionInformation();
partitions.addPartition(0,new Broker("10.22.2.79", 9092));
//add more partitions here.
BrokerHosts hosts = new StaticHosts(partitions);

Alternatively, If you use ZkHosts then you need to pass in the complete
zookeeper connection string e.g. lolcalhost:9092 , optionally there is a
constructor that allows you to specify a second argument which is zkroot ,
by default it is set to /brokers which should work with default kafka
installation. The code in ZlHosts looks under zkroot/topics/<topic that you
set in SpoutConfig>/partitions to figure out the number of partitions and
leader for each partition.

Thanks
Parth




On Mon, Jul 28, 2014 at 9:54 AM, Adrian Landman <ad...@gmail.com>
wrote:

> I am writing a topology that pulls messages from a topic, does some work,
> and then writes them back on a different topic.  I have been having some
> issues so I created my own small topology that just pulls a message, prints
> the contents, and then stores them back on a new topic.  I finally got this
> to work, but it raised a question.
>
> To create the spout I need to either pass in the kafka location sans port
> (e.g. localhost) or use 2181 as the port.
>
> To create the producer bolt I need to pass in the broker port (e.g. 9092)
> or I get an array out of bounds exception when creating the Producer.
>
> When we were using kafka7 and
>
> https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka/src/jvm/storm/kafka
> for our storm/kafka integration I believe that we used the same broker
> list for both our spout and our producer.  Is there anyway to do the same
> with kafka8 and the new storm-kafka project?  Also, if w want to pass in a
> list, I know that in the KafkaBolt we can set metadata.broker.list to a
> comma separated list of brokers (1.1.1.1:9092, 1.1.1.2:9092) but can we
> do the same for the spout?  Or is there any reason to?  ZkHost takes in a
> String, but I didn't see anything that specified the format.
>



-- 
Thanks
Parth

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.