You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/12/05 22:23:56 UTC
[kafka] branch 2.0 updated: KAFKA-7678: Avoid NPE when closing the
RecordCollector (#5993)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 87a37c5 KAFKA-7678: Avoid NPE when closing the RecordCollector (#5993)
87a37c5 is described below
commit 87a37c5a587f05288d12929c067a1623a05ae6f8
Author: Jonathan Santilli <jo...@users.noreply.github.com>
AuthorDate: Wed Dec 5 20:48:39 2018 +0100
KAFKA-7678: Avoid NPE when closing the RecordCollector (#5993)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
.../streams/processor/internals/RecordCollectorImpl.java | 6 ++++--
.../streams/processor/internals/RecordCollectorTest.java | 12 ++++++++++++
2 files changed, 16 insertions(+), 2 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 7e19297..554cc85 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -249,8 +249,10 @@ public class RecordCollectorImpl implements RecordCollector {
@Override
public void close() {
log.debug("Closing producer");
- producer.close();
- producer = null;
+ if (producer != null) {
+ producer.close();
+ producer = null;
+ }
checkForException();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 4f89a1e..e163029 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -362,4 +362,16 @@ public class RecordCollectorTest {
});
collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
}
+
+ @Test
+ public void testShouldNotThrowNPEOnCloseIfProducerIsNotInitialized() {
+ final RecordCollectorImpl collector = new RecordCollectorImpl(
+ "NoNPE",
+ logContext,
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records")
+ );
+
+ collector.close();
+ }
}