You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jonathan Santilli (JIRA)" <ji...@apache.org> on 2018/11/27 12:25:00 UTC

[jira] [Updated] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

     [ https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Santilli updated KAFKA-7678:
-------------------------------------
    Description: 
This occurs when the group is rebalancing in a Kafka Stream application and the process (the Kafka Stream application) receives a *SIGTERM* to stop it gracefully.

 

 
{noformat}
ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] Failed to close producer due to the following error:
java.lang.NullPointerException
 at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
 at org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
 at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
 at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
 at org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
 at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
 at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
 at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
 at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
 

 

Although I have checked the code and the method `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` class is expecting any kind of error to happen since is catching `*Throwable*`.

 

 

 
{noformat}
try {
 recordCollector.close();
} catch (final Throwable e) {
 log.error("Failed to close producer due to the following error:", e);
} finally {
 producer = null;
}{noformat}
 

Should we consider this a bug?

In my opinion, we could check for the `*null*` possibility at `*RecordCollectorImpl*.*java*` class:
{noformat}
@Override
public void close() {
 log.debug("Closing producer");
 producer.close();
 producer = null;
 checkForException();
}{noformat}
 

Change it for:

 
{noformat}
@Override
public void close() {
 log.debug("Closing producer");
 if ( Objects.nonNull(producer) ) {
    producer.close();
    producer = null;
 }
 checkForException();
}{noformat}
 

How does that sound?

 

Kafka Brokers running 2.0.0

Kafka Stream and client 2.1.0

OpenJDK 8

 

  was:
This occurs when the group is rebalancing in a Kafka Stream application and the process (the Kafka Stream application) receives a SIGTERM to stop it gracefully.

 

 
{noformat}
ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] Failed to close producer due to the following error:
java.lang.NullPointerException
 at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
 at org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
 at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
 at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
 at org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
 at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
 at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
 at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
 at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
 

 

Although I have checked the code and the method `maybeAbortTransactionAndCloseRecordCollector` in the `StreamTask.java` class is expecting any kind of error to happen since is catching `Throwable`.

 

 

 
{noformat}
try {
 recordCollector.close();
} catch (final Throwable e) {
 log.error("Failed to close producer due to the following error:", e);
} finally {
 producer = null;
}{noformat}
 

Should we consider this a bug?

In my opinion, we could check for the `null` possibility at `RecordCollectorImpl.java` class:
{noformat}
@Override
public void close() {
 log.debug("Closing producer");
 producer.close();
 producer = null;
 checkForException();
}{noformat}
 

Change it for:

 
{noformat}
@Override
public void close() {
 log.debug("Closing producer");
 if ( Objects.nonNull(producer) ) {
    producer.close();
    producer = null;
 }
 checkForException();
}{noformat}
 

How does that sound?

 

Kafka Brokers running 2.0.0

Kafka Stream and client 2.1.0

OpenJDK 8

 


> Failed to close producer due to java.lang.NullPointerException
> --------------------------------------------------------------
>
>                 Key: KAFKA-7678
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7678
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jonathan Santilli
>            Priority: Major
>
> This occurs when the group is rebalancing in a Kafka Stream application and the process (the Kafka Stream application) receives a *SIGTERM* to stop it gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` class is expecting any kind of error to happen since is catching `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
>     producer.close();
>     producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)