You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Liquan Pei <li...@gmail.com> on 2016/04/13 20:51:56 UTC

Re: Problem with kafka doFlush method

Hi

Are you using the JDBC connector? Can you share with me the command and
configuration that you were running?

Thanks,
Liquan

On Wed, Mar 23, 2016 at 8:36 AM, Aleksandar Pejakovic <a.pejakovic@levi9.com
> wrote:

> Hi,
>
>
> I have created Confluent Connect (http://docs.confluent.io/2.0.1/connect/?)
> sink and source tasks. While working in standalone mode there are no
> errrors or exceptions.
>
>
> But, when i start tasks in distributed mode i get NullPointerException
> while OffsetStorageWriter is executing doFlush() method.
>
> Full stack trace:
>
>
> ERROR Unhandled exception when committing
> WorkerSourceTask{id=distributed-s3-source-0}:
> (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:118)
> java.lang.NullPointerException
> at
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore.set(KafkaOffsetBackingStore.java:122)
> at
> org.apache.kafka.connect.storage.OffsetStorageWriter.doFlush(OffsetStorageWriter.java:161)
> at
> org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:267)
> at
> org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:110)
> at
> org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:76)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> After a closer look i have determined that in KafkaOffsetBackingStore in
> set() method, parameter [final Map<ByteBuffer, ByteBuffer> values] is null,
> therefore next line:
>
>  - SetCallbackFuture producerCallback = new
> SetCallbackFuture(values.size(), callback);      throws exception.
>
> After that exception sink and source tasks continue working without any
> problems.
> When kafka tries to do another flush, it throws following exception:
>
> ERROR Invalid call to OffsetStorageWriter flush() while already flushing,
> the framework should not allow this
> (org.apache.kafka.connect.storage.OffsetStorageWriter:108)
> [2016-03-23 14:57:33,220] ERROR Unhandled exception when committing
> WorkerSourceTask{id=distributed-s3-source-0}:
> (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:118)
> org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is
> already flushing
> at
> org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:110)
> at
> org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:227)
> at
> org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:110)
> at
> org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:76)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Does anyone know how to fix this exception?
>
>
> I am using confluent-2.0.1 with kafka-9.0.1-cp1.
>
>
>
>
>


-- 
Liquan Pei
Software Engineer, Confluent Inc