You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Henry Kim <he...@resonate.com> on 2016/10/28 15:14:19 UTC

Kafka Connect Hdfs Sink not sinking

Hi,


I'm was attempting to follow the hdfs-connector quick start guide (http://docs.confluent.io/3.0.0/connect/connect-hdfs/docs/hdfs_connector.html#quickstart), but I'm unable to consume messages using Kafka Connect (hdfs-connector). I did confirm that I am able to consume the messages via console.


Here is the last final logs I receive from the app.


[2016-10-28 10:56:47,288] INFO Hadoop configuration directory /etc/hadoop/conf (io.confluent.connect.hdfs.DataWriter:94)
[2016-10-28 10:56:47,608] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62)
[2016-10-28 10:56:48,408] INFO Sink task WorkerSinkTask{id=hdfs-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:208)
[2016-10-28 10:56:48,408] TRACE hdfs-sink-0 polling consumer with timeout 58820 ms (org.apache.kafka.connect.runtime.WorkerSinkTask:221)
[2016-10-28 10:56:56,022] INFO Reflections took 9680 ms to scan 253 urls, producing 12411 keys and 81532 values  (org.reflections.Reflections:229)

At this point it hangs. I attempted to trace the code back to the source and found that the WorkerSinkTask is stuck here at pollConsumer()


log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
assert messageBatch.isEmpty() || msgs.isEmpty();
log.trace("{} polling returned {} messages", id, msgs.count());

Anyone have any ideas?


/etc/kafka-connect-hdfs/quickstart-hdfs.properties
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=helloworld
hdfs.url=hdfs://localhost:8020
flush.size=3
rotate.interval.ms=500



/etc/schema-registry/connect-avro-standalone.properties
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://0.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://0.0.0.0:8081

# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets




- Henry Kim

Re: Kafka Connect Hdfs Sink not sinking

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
And you're using the same version of the broker and clients, and there are
no other error messages in the log? I see you're using AvroConverter, do
you also have schema registry listening on the appropriate port? Normally
I'd expect to at least see some sort of exception or error message within
about a minute or so even if the consumer was getting stuck.

In the log messages you pasted before, it indicates the consumer is going
to poll for about 1 minute before it wakes up to do anything else. Does it
hang indefinitely or does it log something else after 1 minute? Immediately
after that poll() call, if it returns correctly after the timeout it should
log another message that says something like "hdfs-sink-0 polling returned
N messages". If it is never returning, could you maybe use jstack to get a
dump of the threads so we can see what it might be getting hung on?

-Ewen

On Wed, Nov 2, 2016 at 11:03 AM, Henry Kim <he...@resonate.com> wrote:

> I confirmed that I am able to write to the topic. I currently believe the
> issue with Connect and not the hdfs sink.
>
>
> When attempting to run some other quickstart applications (i.e.
> http://docs.confluent.io/3.0.0/connect/intro.html#quickstart) I am unable
> to get any kafka-connect sink to properly function. All reportedly hangs
> while attempting to consume.
>
>
> connect-console-sink, connect-file-sink, etc...
>
> ________________________________
> From: Ewen Cheslack-Postava <ew...@confluent.io>
> Sent: Wednesday, November 2, 2016 1:49:33 AM
> To: users@kafka.apache.org
> Subject: Re: Kafka Connect Hdfs Sink not sinking
>
> Are you writing new data into the topic that the HDFS sink is trying to
> read data from? This line
>
> [2016-10-28 10:56:48,408] TRACE hdfs-sink-0 polling consumer with timeout
> 58820 ms (org.apache.kafka.connect.runtime.WorkerSinkTask:221)
>
> indicates it's going to wait for about 60s until some data arrives for the
> sink connector to write. If nothing arrives, you may perceive this as a
> hang because it will wait up to 60s before taking any additional step
> (which will be minimal if no more data arrives).
>
> What else do you have going on in this system while the HDFS connector is
> running?
>
> -Ewen
>
> On Fri, Oct 28, 2016 at 8:14 AM, Henry Kim <he...@resonate.com> wrote:
>
> > Hi,
> >
> >
> > I'm was attempting to follow the hdfs-connector quick start guide (
> > http://docs.confluent.io/3.0.0/connect/connect-hdfs/docs/
> > hdfs_connector.html#quickstart), but I'm unable to consume messages
> using
> > Kafka Connect (hdfs-connector). I did confirm that I am able to consume
> the
> > messages via console.
> >
> >
> > Here is the last final logs I receive from the app.
> >
> >
> > [2016-10-28 10:56:47,288] INFO Hadoop configuration directory
> > /etc/hadoop/conf (io.confluent.connect.hdfs.DataWriter:94)
> > [2016-10-28 10:56:47,608] WARN Unable to load native-hadoop library for
> > your platform... using builtin-java classes where applicable
> > (org.apache.hadoop.util.NativeCodeLoader:62)
> > [2016-10-28 10:56:48,408] INFO Sink task WorkerSinkTask{id=hdfs-sink-0}
> > finished initialization and start (org.apache.kafka.connect.
> > runtime.WorkerSinkTask:208)
> > [2016-10-28 10:56:48,408] TRACE hdfs-sink-0 polling consumer with timeout
> > 58820 ms (org.apache.kafka.connect.runtime.WorkerSinkTask:221)
> > [2016-10-28 10:56:56,022] INFO Reflections took 9680 ms to scan 253 urls,
> > producing 12411 keys and 81532 values  (org.reflections.Reflections:229)
> >
> > At this point it hangs. I attempted to trace the code back to the source
> > and found that the WorkerSinkTask is stuck here at pollConsumer()
> >
> >
> > log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
> > ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
> > assert messageBatch.isEmpty() || msgs.isEmpty();
> > log.trace("{} polling returned {} messages", id, msgs.count());
> >
> > Anyone have any ideas?
> >
> >
> > /etc/kafka-connect-hdfs/quickstart-hdfs.properties
> > name=hdfs-sink
> > connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
> > tasks.max=1
> > topics=helloworld
> > hdfs.url=hdfs://localhost:8020
> > flush.size=3
> > rotate.interval.ms=500
> >
> >
> >
> > /etc/schema-registry/connect-avro-standalone.properties
> > bootstrap.servers=localhost:9092
> >
> > # The converters specify the format of data in Kafka and how to translate
> > it into Connect data.
> > # Every Connect user will need to configure these based on the format
> they
> > want their data in
> > # when loaded from or stored into Kafka
> > key.converter=io.confluent.connect.avro.AvroConverter
> > key.converter.schema.registry.url=http://0.0.0.0:8081
> > value.converter=io.confluent.connect.avro.AvroConverter
> > value.converter.schema.registry.url=http://0.0.0.0:8081
> >
> > # The internal converter used for offsets and config data is configurable
> > and must be specified,
> > # but most users will always want to use the built-in default. Offset and
> > config data is never
> > # visible outside of Connect in this format.
> > internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> > internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> > internal.key.converter.schemas.enable=false
> > internal.value.converter.schemas.enable=false
> >
> > # Local storage file for offset data
> > offset.storage.file.filename=/tmp/connect.offsets
> >
> >
> >
> >
> > - Henry Kim
> >
>
>
>
> --
> Thanks,
> Ewen
>



-- 
Thanks,
Ewen

Re: Kafka Connect Hdfs Sink not sinking

Posted by Henry Kim <he...@resonate.com>.
I confirmed that I am able to write to the topic. I currently believe the issue with Connect and not the hdfs sink.


When attempting to run some other quickstart applications (i.e. http://docs.confluent.io/3.0.0/connect/intro.html#quickstart) I am unable to get any kafka-connect sink to properly function. All reportedly hangs while attempting to consume.


connect-console-sink, connect-file-sink, etc...

________________________________
From: Ewen Cheslack-Postava <ew...@confluent.io>
Sent: Wednesday, November 2, 2016 1:49:33 AM
To: users@kafka.apache.org
Subject: Re: Kafka Connect Hdfs Sink not sinking

Are you writing new data into the topic that the HDFS sink is trying to
read data from? This line

[2016-10-28 10:56:48,408] TRACE hdfs-sink-0 polling consumer with timeout
58820 ms (org.apache.kafka.connect.runtime.WorkerSinkTask:221)

indicates it's going to wait for about 60s until some data arrives for the
sink connector to write. If nothing arrives, you may perceive this as a
hang because it will wait up to 60s before taking any additional step
(which will be minimal if no more data arrives).

What else do you have going on in this system while the HDFS connector is
running?

-Ewen

On Fri, Oct 28, 2016 at 8:14 AM, Henry Kim <he...@resonate.com> wrote:

> Hi,
>
>
> I'm was attempting to follow the hdfs-connector quick start guide (
> http://docs.confluent.io/3.0.0/connect/connect-hdfs/docs/
> hdfs_connector.html#quickstart), but I'm unable to consume messages using
> Kafka Connect (hdfs-connector). I did confirm that I am able to consume the
> messages via console.
>
>
> Here is the last final logs I receive from the app.
>
>
> [2016-10-28 10:56:47,288] INFO Hadoop configuration directory
> /etc/hadoop/conf (io.confluent.connect.hdfs.DataWriter:94)
> [2016-10-28 10:56:47,608] WARN Unable to load native-hadoop library for
> your platform... using builtin-java classes where applicable
> (org.apache.hadoop.util.NativeCodeLoader:62)
> [2016-10-28 10:56:48,408] INFO Sink task WorkerSinkTask{id=hdfs-sink-0}
> finished initialization and start (org.apache.kafka.connect.
> runtime.WorkerSinkTask:208)
> [2016-10-28 10:56:48,408] TRACE hdfs-sink-0 polling consumer with timeout
> 58820 ms (org.apache.kafka.connect.runtime.WorkerSinkTask:221)
> [2016-10-28 10:56:56,022] INFO Reflections took 9680 ms to scan 253 urls,
> producing 12411 keys and 81532 values  (org.reflections.Reflections:229)
>
> At this point it hangs. I attempted to trace the code back to the source
> and found that the WorkerSinkTask is stuck here at pollConsumer()
>
>
> log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
> ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
> assert messageBatch.isEmpty() || msgs.isEmpty();
> log.trace("{} polling returned {} messages", id, msgs.count());
>
> Anyone have any ideas?
>
>
> /etc/kafka-connect-hdfs/quickstart-hdfs.properties
> name=hdfs-sink
> connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
> tasks.max=1
> topics=helloworld
> hdfs.url=hdfs://localhost:8020
> flush.size=3
> rotate.interval.ms=500
>
>
>
> /etc/schema-registry/connect-avro-standalone.properties
> bootstrap.servers=localhost:9092
>
> # The converters specify the format of data in Kafka and how to translate
> it into Connect data.
> # Every Connect user will need to configure these based on the format they
> want their data in
> # when loaded from or stored into Kafka
> key.converter=io.confluent.connect.avro.AvroConverter
> key.converter.schema.registry.url=http://0.0.0.0:8081
> value.converter=io.confluent.connect.avro.AvroConverter
> value.converter.schema.registry.url=http://0.0.0.0:8081
>
> # The internal converter used for offsets and config data is configurable
> and must be specified,
> # but most users will always want to use the built-in default. Offset and
> config data is never
> # visible outside of Connect in this format.
> internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> internal.key.converter.schemas.enable=false
> internal.value.converter.schemas.enable=false
>
> # Local storage file for offset data
> offset.storage.file.filename=/tmp/connect.offsets
>
>
>
>
> - Henry Kim
>



--
Thanks,
Ewen

Re: Kafka Connect Hdfs Sink not sinking

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Are you writing new data into the topic that the HDFS sink is trying to
read data from? This line

[2016-10-28 10:56:48,408] TRACE hdfs-sink-0 polling consumer with timeout
58820 ms (org.apache.kafka.connect.runtime.WorkerSinkTask:221)

indicates it's going to wait for about 60s until some data arrives for the
sink connector to write. If nothing arrives, you may perceive this as a
hang because it will wait up to 60s before taking any additional step
(which will be minimal if no more data arrives).

What else do you have going on in this system while the HDFS connector is
running?

-Ewen

On Fri, Oct 28, 2016 at 8:14 AM, Henry Kim <he...@resonate.com> wrote:

> Hi,
>
>
> I'm was attempting to follow the hdfs-connector quick start guide (
> http://docs.confluent.io/3.0.0/connect/connect-hdfs/docs/
> hdfs_connector.html#quickstart), but I'm unable to consume messages using
> Kafka Connect (hdfs-connector). I did confirm that I am able to consume the
> messages via console.
>
>
> Here is the last final logs I receive from the app.
>
>
> [2016-10-28 10:56:47,288] INFO Hadoop configuration directory
> /etc/hadoop/conf (io.confluent.connect.hdfs.DataWriter:94)
> [2016-10-28 10:56:47,608] WARN Unable to load native-hadoop library for
> your platform... using builtin-java classes where applicable
> (org.apache.hadoop.util.NativeCodeLoader:62)
> [2016-10-28 10:56:48,408] INFO Sink task WorkerSinkTask{id=hdfs-sink-0}
> finished initialization and start (org.apache.kafka.connect.
> runtime.WorkerSinkTask:208)
> [2016-10-28 10:56:48,408] TRACE hdfs-sink-0 polling consumer with timeout
> 58820 ms (org.apache.kafka.connect.runtime.WorkerSinkTask:221)
> [2016-10-28 10:56:56,022] INFO Reflections took 9680 ms to scan 253 urls,
> producing 12411 keys and 81532 values  (org.reflections.Reflections:229)
>
> At this point it hangs. I attempted to trace the code back to the source
> and found that the WorkerSinkTask is stuck here at pollConsumer()
>
>
> log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
> ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
> assert messageBatch.isEmpty() || msgs.isEmpty();
> log.trace("{} polling returned {} messages", id, msgs.count());
>
> Anyone have any ideas?
>
>
> /etc/kafka-connect-hdfs/quickstart-hdfs.properties
> name=hdfs-sink
> connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
> tasks.max=1
> topics=helloworld
> hdfs.url=hdfs://localhost:8020
> flush.size=3
> rotate.interval.ms=500
>
>
>
> /etc/schema-registry/connect-avro-standalone.properties
> bootstrap.servers=localhost:9092
>
> # The converters specify the format of data in Kafka and how to translate
> it into Connect data.
> # Every Connect user will need to configure these based on the format they
> want their data in
> # when loaded from or stored into Kafka
> key.converter=io.confluent.connect.avro.AvroConverter
> key.converter.schema.registry.url=http://0.0.0.0:8081
> value.converter=io.confluent.connect.avro.AvroConverter
> value.converter.schema.registry.url=http://0.0.0.0:8081
>
> # The internal converter used for offsets and config data is configurable
> and must be specified,
> # but most users will always want to use the built-in default. Offset and
> config data is never
> # visible outside of Connect in this format.
> internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> internal.key.converter.schemas.enable=false
> internal.value.converter.schemas.enable=false
>
> # Local storage file for offset data
> offset.storage.file.filename=/tmp/connect.offsets
>
>
>
>
> - Henry Kim
>



-- 
Thanks,
Ewen