You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/12/05 22:23:56 UTC

[kafka] branch 2.0 updated: KAFKA-7678: Avoid NPE when closing the RecordCollector (#5993)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 87a37c5  KAFKA-7678: Avoid NPE when closing the RecordCollector (#5993)
87a37c5 is described below

commit 87a37c5a587f05288d12929c067a1623a05ae6f8
Author: Jonathan Santilli <jo...@users.noreply.github.com>
AuthorDate: Wed Dec 5 20:48:39 2018 +0100

    KAFKA-7678: Avoid NPE when closing the RecordCollector (#5993)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
 .../streams/processor/internals/RecordCollectorImpl.java     |  6 ++++--
 .../streams/processor/internals/RecordCollectorTest.java     | 12 ++++++++++++
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 7e19297..554cc85 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -249,8 +249,10 @@ public class RecordCollectorImpl implements RecordCollector {
     @Override
     public void close() {
         log.debug("Closing producer");
-        producer.close();
-        producer = null;
+        if (producer != null) {
+            producer.close();
+            producer = null;
+        }
         checkForException();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 4f89a1e..e163029 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -362,4 +362,16 @@ public class RecordCollectorTest {
         });
         collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
     }
+
+    @Test
+    public void testShouldNotThrowNPEOnCloseIfProducerIsNotInitialized() {
+        final RecordCollectorImpl collector = new RecordCollectorImpl(
+                "NoNPE",
+                logContext,
+                new DefaultProductionExceptionHandler(),
+                new Metrics().sensor("skipped-records")
+        );
+
+        collector.close();
+    }
 }