You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Munagala Ramanath <ra...@datatorrent.com> on 2016/04/17 06:36:11 UTC

Using the new Kafka input operator

I was just trying to put together a small application to read from Kafka
using the new 0.9 input
operator from Apex Malhar but ran into a strange problem so I thought it
would be useful to share
it and the resolution with the list.

Here is my application code (*LineOutputOperator* is a trivial extension of
the *AbstractFileOutputOperator*):

  *public void populateDAG(DAG dag, Configuration conf)*
*  {*
*    KafkaSinglePortInputOperator in = dag.addOperator("in", new
KafkaSinglePortInputOperator());*
*    in.setInitialPartitionCount(1);*
*    in.setTopics("test");*
*
in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name
<http://AbstractKafkaInputOperator.InitialOffset.EARLIEST.name>());*
*    in.setClusters("localhost:2181");*

*    LineOutputOperator out = dag.addOperator("out", new
LineOutputOperator());*
*    out.setFilePath("/tmp/FromKafka");*
*    out.setFileName("test");*
*    out.setMaxLength(1024);        // max size of rolling output file*

*    // create stream connecting input adapter to output adapter*
*    dag.addStream("data", in.outputPort, out.input);*
*  }*

This was failing when deploying to the cluster where it was stuck in the
ACCEPTED state. There
were no errors in either the Resource Manager or the Application Master
logs.

Turned out the problem is that the *setClusters()* call needs a *broker
address*, not the zookeeper address. So simply changing the port from 2181
to 9092 fixed the problem (thanks to Thomas for
pointing this out). This was not obvious since the Kafka command line
producer script takes a
zookeeper address.

There is also a programmatic way to get the broker info from zookeeper as
described in:
http://stackoverflow.com/questions/29490113/kafka-get-broker-host-from-zookeeper

Hopefully, this will be useful to others who might run into the same
problem.

Ram

Re: Using the new Kafka input operator

Posted by Thomas Weise <th...@datatorrent.com>.
Ram,

Kafka is moving away from client side use of ZooKeeper. The change in the
new input operator reflects that. The client will locate the partitions
through initial broker(s) now.

Thomas




On Sat, Apr 16, 2016 at 9:36 PM, Munagala Ramanath <ra...@datatorrent.com>
wrote:

> I was just trying to put together a small application to read from Kafka
> using the new 0.9 input
> operator from Apex Malhar but ran into a strange problem so I thought it
> would be useful to share
> it and the resolution with the list.
>
> Here is my application code (*LineOutputOperator* is a trivial extension of
> the *AbstractFileOutputOperator*):
>
>   *public void populateDAG(DAG dag, Configuration conf)*
> *  {*
> *    KafkaSinglePortInputOperator in = dag.addOperator("in", new
> KafkaSinglePortInputOperator());*
> *    in.setInitialPartitionCount(1);*
> *    in.setTopics("test");*
> *
> in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name
> <http://AbstractKafkaInputOperator.InitialOffset.EARLIEST.name>());*
> *    in.setClusters("localhost:2181");*
>
> *    LineOutputOperator out = dag.addOperator("out", new
> LineOutputOperator());*
> *    out.setFilePath("/tmp/FromKafka");*
> *    out.setFileName("test");*
> *    out.setMaxLength(1024);        // max size of rolling output file*
>
> *    // create stream connecting input adapter to output adapter*
> *    dag.addStream("data", in.outputPort, out.input);*
> *  }*
>
> This was failing when deploying to the cluster where it was stuck in the
> ACCEPTED state. There
> were no errors in either the Resource Manager or the Application Master
> logs.
>
> Turned out the problem is that the *setClusters()* call needs a *broker
> address*, not the zookeeper address. So simply changing the port from 2181
> to 9092 fixed the problem (thanks to Thomas for
> pointing this out). This was not obvious since the Kafka command line
> producer script takes a
> zookeeper address.
>
> There is also a programmatic way to get the broker info from zookeeper as
> described in:
>
> http://stackoverflow.com/questions/29490113/kafka-get-broker-host-from-zookeeper
>
> Hopefully, this will be useful to others who might run into the same
> problem.
>
> Ram
>