You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Amandeep Singh <am...@gmail.com> on 2018/06/15 13:39:36 UTC
Error in Kafka Stream
Hi,
I am getting the below error while processign data with kafka stream. The
application was runnign for a couple of hours and the '
WatchlistUpdate-StreamThread-9 ' thread was assigned to the same partition
since beginning. I am assuming it was able to successfully commit offsets
for those couple of hours and the directory '
/opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/UI-Watchlist-ES-App/0_2
' did exist for that period.
And then I start getting the below error after every 30 secs (probably
because if offset commit interval) and messages are being missed from
processing.
Can you please help?
2018-06-15 08:47:58 [WatchlistUpdate-StreamThread-9] WARN
o.a.k.s.p.i.ProcessorStateManager:246
- task [0_2] Failed
to write checkpoint file to
/opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/UI-Watchlist-ES-App/0_2/.che
ckpoint:
java.io.FileNotFoundException:
/opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/UI-Watchlist-ES-App/0_2/.
checkpoint.tmp (No such file or directory)
at java.io.FileOutputStream.open0(Native Method) ~[na:1.8.0_141]
at java.io.FileOutputStream.open(FileOutputStream.java:270)
~[na:1.8.0_141]
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
~[na:1.8.0_141]
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
~[na:1.8.0_141]
at
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
~[kafka-streams-
1.0.0.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:3
20) ~[kafka-streams-1.0.0.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:306)
[kafka-streams-1.0.0.ja
r:na]
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:2
08) [kafka-streams-1.0.0.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299)
[kafka-streams-1.0.0.j
ar:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289)
[kafka-streams-1.0.0.j
ar:na]
at
org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
[kafka-streams-1
.0.0.jar:na]
at
org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451)
[ka
fka-streams-1.0.0.jar:na]
at
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380)
[kafka-streams-1
.0.0.jar:na]
at
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309)
[kafka-streams-1.
0.0.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
[kafka-strea
ms-1.0.0.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
[kafka-streams-1.
0.0.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
[kafka-streams-1.
0.0.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
[kafka-streams-1.0.0.
jar:na]
Stream config:
2018-06-15 08:09:28 [main] INFO o.a.k.c.consumer.ConsumerConfig:223 -
ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [XYZ]
check.crcs = true
client.id = WatchlistUpdate-StreamThread-9-consumer
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = UI-Watchlist-ES-App
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = false
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy =
[org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
Regards,
Amandeep Singh
Re: Error in Kafka Stream
Posted by Guozhang Wang <wa...@gmail.com>.
Interesting, it indeed seem like a lurking issue in Kafka Streams.
Which Kafka version are you using?
Guozhang
On Mon, Jun 18, 2018 at 12:32 AM, Amandeep Singh <am...@gmail.com>
wrote:
> Hi Guozhang,
>
> The file system is XFS and the folder is not a temp folder. The issue goes
> away when I restart the streams. I forgot to mention i am running 3
> multiple instances of consumer on 3 machines.
> Also, this issue seems to be reported by other users too:
> https://issues.apache.org/jira/browse/KAFKA-5998
>
>
>
> Regards,
> Amandeep Singh
> +91-7838184964
>
>
> On Mon, Jun 18, 2018 at 6:45 AM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Amandeep,
> >
> > What file system are you using? Also is `/opt/info` a temp folder that
> can
> > be auto-cleared from time to time?
> >
> >
> > Guozhang
> >
> > On Fri, Jun 15, 2018 at 6:39 AM, Amandeep Singh <am...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > >
> > >
> > > I am getting the below error while processign data with kafka stream.
> > The
> > > application was runnign for a couple of hours and the '
> > > WatchlistUpdate-StreamThread-9 ' thread was assigned to the same
> > partition
> > > since beginning. I am assuming it was able to successfully commit
> offsets
> > > for those couple of hours and the directory '
> > > /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> > > UI-Watchlist-ES-App/0_2
> > > ' did exist for that period.
> > >
> > > And then I start getting the below error after every 30 secs (probably
> > > because if offset commit interval) and messages are being missed from
> > > processing.
> > >
> > > Can you please help?
> > >
> > >
> > > 2018-06-15 08:47:58 [WatchlistUpdate-StreamThread-9] WARN
> > > o.a.k.s.p.i.ProcessorStateManager:246
> > > - task [0_2] Failed
> > >
> > > to write checkpoint file to
> > > /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> > > UI-Watchlist-ES-App/0_2/.che
> > >
> > > ckpoint:
> > >
> > > java.io.FileNotFoundException:
> > > /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> > > UI-Watchlist-ES-App/0_2/.
> > >
> > > checkpoint.tmp (No such file or directory)
> > >
> > > at java.io.FileOutputStream.open0(Native Method)
> ~[na:1.8.0_141]
> > >
> > > at java.io.FileOutputStream.open(FileOutputStream.java:270)
> > > ~[na:1.8.0_141]
> > >
> > > at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> > > ~[na:1.8.0_141]
> > >
> > > at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
> > > ~[na:1.8.0_141]
> > >
> > > at
> > > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
> > > OffsetCheckpoint.java:73)
> > > ~[kafka-streams-
> > >
> > > 1.0.0.jar:na]
> > >
> > > at
> > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > > checkpoint(ProcessorStateManager.java:3
> > >
> > > 20) ~[kafka-streams-1.0.0.jar:na]
> > >
> > > at
> > > org.apache.kafka.streams.processor.internals.
> StreamTask$1.run(StreamTask.
> > > java:306)
> > > [kafka-streams-1.0.0.ja
> > >
> > > r:na]
> > >
> > > at
> > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > > measureLatencyNs(StreamsMetricsImpl.java:2
> > >
> > > 08) [kafka-streams-1.0.0.jar:na]
> > >
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.
> > > java:299)
> > > [kafka-streams-1.0.0.j
> > >
> > > ar:na]
> > >
> > > at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.
> > > java:289)
> > > [kafka-streams-1.0.0.j
> > >
> > > ar:na]
> > >
> > > at
> > > org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(
> > > AssignedTasks.java:87)
> > > [kafka-streams-1
> > >
> > > .0.0.jar:na]
> > >
> > > at
> > > org.apache.kafka.streams.processor.internals.AssignedTasks.
> > > applyToRunningTasks(AssignedTasks.java:451)
> > > [ka
> > >
> > > fka-streams-1.0.0.jar:na]
> > >
> > > at
> > > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(
> > > AssignedTasks.java:380)
> > > [kafka-streams-1
> > >
> > > .0.0.jar:na]
> > >
> > > at
> > > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(
> > > TaskManager.java:309)
> > > [kafka-streams-1.
> > >
> > > 0.0.jar:na]
> > >
> > > at
> > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> > > StreamThread.java:1018)
> > > [kafka-strea
> > >
> > > ms-1.0.0.jar:na]
> > >
> > > at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > > StreamThread.java:835)
> > > [kafka-streams-1.
> > >
> > > 0.0.jar:na]
> > >
> > > at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > StreamThread.java:774)
> > > [kafka-streams-1.
> > >
> > > 0.0.jar:na]
> > >
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.run(StreamThread.java:744)
> > > [kafka-streams-1.0.0.
> > >
> > > jar:na]
> > >
> > >
> > > Stream config:
> > >
> > > 2018-06-15 08:09:28 [main] INFO o.a.k.c.consumer.ConsumerConfig:223 -
> > > ConsumerConfig values:
> > >
> > > auto.commit.interval.ms = 5000
> > >
> > > auto.offset.reset = earliest
> > >
> > > bootstrap.servers = [XYZ]
> > >
> > > check.crcs = true
> > >
> > > client.id = WatchlistUpdate-StreamThread-9-consumer
> > >
> > > connections.max.idle.ms = 540000
> > >
> > > enable.auto.commit = false
> > >
> > > exclude.internal.topics = true
> > >
> > > fetch.max.bytes = 52428800
> > >
> > > fetch.max.wait.ms = 500
> > >
> > > fetch.min.bytes = 1
> > >
> > > group.id = UI-Watchlist-ES-App
> > >
> > > heartbeat.interval.ms = 3000
> > >
> > > interceptor.classes = null
> > >
> > > internal.leave.group.on.close = false
> > >
> > > isolation.level = read_uncommitted
> > >
> > > key.deserializer = class
> > > org.apache.kafka.common.serialization.ByteArrayDeserializer
> > >
> > > max.partition.fetch.bytes = 1048576
> > >
> > > max.poll.interval.ms = 2147483647
> > >
> > > max.poll.records = 1000
> > >
> > > metadata.max.age.ms = 300000
> > >
> > > metric.reporters = []
> > >
> > > metrics.num.samples = 2
> > >
> > > metrics.recording.level = INFO
> > >
> > > metrics.sample.window.ms = 30000
> > >
> > > partition.assignment.strategy =
> > > [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
> > >
> > > receive.buffer.bytes = 65536
> > >
> > > reconnect.backoff.max.ms = 1000
> > >
> > > reconnect.backoff.ms = 50
> > >
> > > request.timeout.ms = 305000
> > >
> > > retry.backoff.ms = 100
> > >
> > > sasl.jaas.config = null
> > >
> > > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > >
> > > sasl.kerberos.min.time.before.relogin = 60000
> > >
> > > sasl.kerberos.service.name = null
> > >
> > > sasl.kerberos.ticket.renew.jitter = 0.05
> > >
> > > sasl.kerberos.ticket.renew.window.factor = 0.8
> > >
> > > sasl.mechanism = GSSAPI
> > >
> > > security.protocol = PLAINTEXT
> > >
> > > send.buffer.bytes = 131072
> > >
> > > session.timeout.ms = 10000
> > >
> > > ssl.cipher.suites = null
> > >
> > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > >
> > > ssl.endpoint.identification.algorithm = null
> > >
> > > ssl.key.password = null
> > >
> > > ssl.keymanager.algorithm = SunX509
> > >
> > > ssl.keystore.location = null
> > >
> > > ssl.keystore.password = null
> > >
> > > ssl.keystore.type = JKS
> > >
> > > ssl.protocol = TLS
> > >
> > > ssl.provider = null
> > >
> > > ssl.secure.random.implementation = null
> > >
> > > ssl.trustmanager.algorithm = PKIX
> > >
> > > ssl.truststore.location = null
> > >
> > > ssl.truststore.password = null
> > >
> > > ssl.truststore.type = JKS
> > >
> > > value.deserializer = class
> > > org.apache.kafka.common.serialization.ByteArrayDeserializer
> > >
> > > Regards,
> > > Amandeep Singh
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
--
-- Guozhang
Re: Error in Kafka Stream
Posted by Amandeep Singh <am...@gmail.com>.
Hi Guozhang,
The file system is XFS and the folder is not a temp folder. The issue goes
away when I restart the streams. I forgot to mention i am running 3
multiple instances of consumer on 3 machines.
Also, this issue seems to be reported by other users too:
https://issues.apache.org/jira/browse/KAFKA-5998
Regards,
Amandeep Singh
+91-7838184964
On Mon, Jun 18, 2018 at 6:45 AM Guozhang Wang <wa...@gmail.com> wrote:
> Hello Amandeep,
>
> What file system are you using? Also is `/opt/info` a temp folder that can
> be auto-cleared from time to time?
>
>
> Guozhang
>
> On Fri, Jun 15, 2018 at 6:39 AM, Amandeep Singh <am...@gmail.com>
> wrote:
>
> > Hi,
> >
> >
> >
> > I am getting the below error while processign data with kafka stream.
> The
> > application was runnign for a couple of hours and the '
> > WatchlistUpdate-StreamThread-9 ' thread was assigned to the same
> partition
> > since beginning. I am assuming it was able to successfully commit offsets
> > for those couple of hours and the directory '
> > /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> > UI-Watchlist-ES-App/0_2
> > ' did exist for that period.
> >
> > And then I start getting the below error after every 30 secs (probably
> > because if offset commit interval) and messages are being missed from
> > processing.
> >
> > Can you please help?
> >
> >
> > 2018-06-15 08:47:58 [WatchlistUpdate-StreamThread-9] WARN
> > o.a.k.s.p.i.ProcessorStateManager:246
> > - task [0_2] Failed
> >
> > to write checkpoint file to
> > /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> > UI-Watchlist-ES-App/0_2/.che
> >
> > ckpoint:
> >
> > java.io.FileNotFoundException:
> > /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> > UI-Watchlist-ES-App/0_2/.
> >
> > checkpoint.tmp (No such file or directory)
> >
> > at java.io.FileOutputStream.open0(Native Method) ~[na:1.8.0_141]
> >
> > at java.io.FileOutputStream.open(FileOutputStream.java:270)
> > ~[na:1.8.0_141]
> >
> > at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> > ~[na:1.8.0_141]
> >
> > at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
> > ~[na:1.8.0_141]
> >
> > at
> > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
> > OffsetCheckpoint.java:73)
> > ~[kafka-streams-
> >
> > 1.0.0.jar:na]
> >
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > checkpoint(ProcessorStateManager.java:3
> >
> > 20) ~[kafka-streams-1.0.0.jar:na]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.
> > java:306)
> > [kafka-streams-1.0.0.ja
> >
> > r:na]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:2
> >
> > 08) [kafka-streams-1.0.0.jar:na]
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
> > java:299)
> > [kafka-streams-1.0.0.j
> >
> > ar:na]
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
> > java:289)
> > [kafka-streams-1.0.0.j
> >
> > ar:na]
> >
> > at
> > org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(
> > AssignedTasks.java:87)
> > [kafka-streams-1
> >
> > .0.0.jar:na]
> >
> > at
> > org.apache.kafka.streams.processor.internals.AssignedTasks.
> > applyToRunningTasks(AssignedTasks.java:451)
> > [ka
> >
> > fka-streams-1.0.0.jar:na]
> >
> > at
> > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(
> > AssignedTasks.java:380)
> > [kafka-streams-1
> >
> > .0.0.jar:na]
> >
> > at
> > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(
> > TaskManager.java:309)
> > [kafka-streams-1.
> >
> > 0.0.jar:na]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> > StreamThread.java:1018)
> > [kafka-strea
> >
> > ms-1.0.0.jar:na]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > StreamThread.java:835)
> > [kafka-streams-1.
> >
> > 0.0.jar:na]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:774)
> > [kafka-streams-1.
> >
> > 0.0.jar:na]
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:744)
> > [kafka-streams-1.0.0.
> >
> > jar:na]
> >
> >
> > Stream config:
> >
> > 2018-06-15 08:09:28 [main] INFO o.a.k.c.consumer.ConsumerConfig:223 -
> > ConsumerConfig values:
> >
> > auto.commit.interval.ms = 5000
> >
> > auto.offset.reset = earliest
> >
> > bootstrap.servers = [XYZ]
> >
> > check.crcs = true
> >
> > client.id = WatchlistUpdate-StreamThread-9-consumer
> >
> > connections.max.idle.ms = 540000
> >
> > enable.auto.commit = false
> >
> > exclude.internal.topics = true
> >
> > fetch.max.bytes = 52428800
> >
> > fetch.max.wait.ms = 500
> >
> > fetch.min.bytes = 1
> >
> > group.id = UI-Watchlist-ES-App
> >
> > heartbeat.interval.ms = 3000
> >
> > interceptor.classes = null
> >
> > internal.leave.group.on.close = false
> >
> > isolation.level = read_uncommitted
> >
> > key.deserializer = class
> > org.apache.kafka.common.serialization.ByteArrayDeserializer
> >
> > max.partition.fetch.bytes = 1048576
> >
> > max.poll.interval.ms = 2147483647
> >
> > max.poll.records = 1000
> >
> > metadata.max.age.ms = 300000
> >
> > metric.reporters = []
> >
> > metrics.num.samples = 2
> >
> > metrics.recording.level = INFO
> >
> > metrics.sample.window.ms = 30000
> >
> > partition.assignment.strategy =
> > [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
> >
> > receive.buffer.bytes = 65536
> >
> > reconnect.backoff.max.ms = 1000
> >
> > reconnect.backoff.ms = 50
> >
> > request.timeout.ms = 305000
> >
> > retry.backoff.ms = 100
> >
> > sasl.jaas.config = null
> >
> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >
> > sasl.kerberos.min.time.before.relogin = 60000
> >
> > sasl.kerberos.service.name = null
> >
> > sasl.kerberos.ticket.renew.jitter = 0.05
> >
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> >
> > sasl.mechanism = GSSAPI
> >
> > security.protocol = PLAINTEXT
> >
> > send.buffer.bytes = 131072
> >
> > session.timeout.ms = 10000
> >
> > ssl.cipher.suites = null
> >
> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >
> > ssl.endpoint.identification.algorithm = null
> >
> > ssl.key.password = null
> >
> > ssl.keymanager.algorithm = SunX509
> >
> > ssl.keystore.location = null
> >
> > ssl.keystore.password = null
> >
> > ssl.keystore.type = JKS
> >
> > ssl.protocol = TLS
> >
> > ssl.provider = null
> >
> > ssl.secure.random.implementation = null
> >
> > ssl.trustmanager.algorithm = PKIX
> >
> > ssl.truststore.location = null
> >
> > ssl.truststore.password = null
> >
> > ssl.truststore.type = JKS
> >
> > value.deserializer = class
> > org.apache.kafka.common.serialization.ByteArrayDeserializer
> >
> > Regards,
> > Amandeep Singh
> >
>
>
>
> --
> -- Guozhang
>
Re: Error in Kafka Stream
Posted by Guozhang Wang <wa...@gmail.com>.
Hello Amandeep,
What file system are you using? Also is `/opt/info` a temp folder that can
be auto-cleared from time to time?
Guozhang
On Fri, Jun 15, 2018 at 6:39 AM, Amandeep Singh <am...@gmail.com>
wrote:
> Hi,
>
>
>
> I am getting the below error while processign data with kafka stream. The
> application was runnign for a couple of hours and the '
> WatchlistUpdate-StreamThread-9 ' thread was assigned to the same partition
> since beginning. I am assuming it was able to successfully commit offsets
> for those couple of hours and the directory '
> /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> UI-Watchlist-ES-App/0_2
> ' did exist for that period.
>
> And then I start getting the below error after every 30 secs (probably
> because if offset commit interval) and messages are being missed from
> processing.
>
> Can you please help?
>
>
> 2018-06-15 08:47:58 [WatchlistUpdate-StreamThread-9] WARN
> o.a.k.s.p.i.ProcessorStateManager:246
> - task [0_2] Failed
>
> to write checkpoint file to
> /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> UI-Watchlist-ES-App/0_2/.che
>
> ckpoint:
>
> java.io.FileNotFoundException:
> /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> UI-Watchlist-ES-App/0_2/.
>
> checkpoint.tmp (No such file or directory)
>
> at java.io.FileOutputStream.open0(Native Method) ~[na:1.8.0_141]
>
> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> ~[na:1.8.0_141]
>
> at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> ~[na:1.8.0_141]
>
> at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
> ~[na:1.8.0_141]
>
> at
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
> OffsetCheckpoint.java:73)
> ~[kafka-streams-
>
> 1.0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> checkpoint(ProcessorStateManager.java:3
>
> 20) ~[kafka-streams-1.0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.
> java:306)
> [kafka-streams-1.0.0.ja
>
> r:na]
>
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:2
>
> 08) [kafka-streams-1.0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
> java:299)
> [kafka-streams-1.0.0.j
>
> ar:na]
>
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
> java:289)
> [kafka-streams-1.0.0.j
>
> ar:na]
>
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(
> AssignedTasks.java:87)
> [kafka-streams-1
>
> .0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.
> applyToRunningTasks(AssignedTasks.java:451)
> [ka
>
> fka-streams-1.0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(
> AssignedTasks.java:380)
> [kafka-streams-1
>
> .0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(
> TaskManager.java:309)
> [kafka-streams-1.
>
> 0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> StreamThread.java:1018)
> [kafka-strea
>
> ms-1.0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:835)
> [kafka-streams-1.
>
> 0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:774)
> [kafka-streams-1.
>
> 0.0.jar:na]
>
> at
> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:744)
> [kafka-streams-1.0.0.
>
> jar:na]
>
>
> Stream config:
>
> 2018-06-15 08:09:28 [main] INFO o.a.k.c.consumer.ConsumerConfig:223 -
> ConsumerConfig values:
>
> auto.commit.interval.ms = 5000
>
> auto.offset.reset = earliest
>
> bootstrap.servers = [XYZ]
>
> check.crcs = true
>
> client.id = WatchlistUpdate-StreamThread-9-consumer
>
> connections.max.idle.ms = 540000
>
> enable.auto.commit = false
>
> exclude.internal.topics = true
>
> fetch.max.bytes = 52428800
>
> fetch.max.wait.ms = 500
>
> fetch.min.bytes = 1
>
> group.id = UI-Watchlist-ES-App
>
> heartbeat.interval.ms = 3000
>
> interceptor.classes = null
>
> internal.leave.group.on.close = false
>
> isolation.level = read_uncommitted
>
> key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>
> max.partition.fetch.bytes = 1048576
>
> max.poll.interval.ms = 2147483647
>
> max.poll.records = 1000
>
> metadata.max.age.ms = 300000
>
> metric.reporters = []
>
> metrics.num.samples = 2
>
> metrics.recording.level = INFO
>
> metrics.sample.window.ms = 30000
>
> partition.assignment.strategy =
> [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
>
> receive.buffer.bytes = 65536
>
> reconnect.backoff.max.ms = 1000
>
> reconnect.backoff.ms = 50
>
> request.timeout.ms = 305000
>
> retry.backoff.ms = 100
>
> sasl.jaas.config = null
>
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>
> sasl.kerberos.min.time.before.relogin = 60000
>
> sasl.kerberos.service.name = null
>
> sasl.kerberos.ticket.renew.jitter = 0.05
>
> sasl.kerberos.ticket.renew.window.factor = 0.8
>
> sasl.mechanism = GSSAPI
>
> security.protocol = PLAINTEXT
>
> send.buffer.bytes = 131072
>
> session.timeout.ms = 10000
>
> ssl.cipher.suites = null
>
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>
> ssl.endpoint.identification.algorithm = null
>
> ssl.key.password = null
>
> ssl.keymanager.algorithm = SunX509
>
> ssl.keystore.location = null
>
> ssl.keystore.password = null
>
> ssl.keystore.type = JKS
>
> ssl.protocol = TLS
>
> ssl.provider = null
>
> ssl.secure.random.implementation = null
>
> ssl.trustmanager.algorithm = PKIX
>
> ssl.truststore.location = null
>
> ssl.truststore.password = null
>
> ssl.truststore.type = JKS
>
> value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>
> Regards,
> Amandeep Singh
>
--
-- Guozhang