You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/08/26 16:54:31 UTC

[kafka] branch 2.3 updated: KAFKA-8412: Fix nullpointer exception thrown on flushing before closing producers (#7207)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new e0e2dbc  KAFKA-8412: Fix nullpointer exception thrown on flushing before closing producers (#7207)
e0e2dbc is described below

commit e0e2dbc9d89b0fcbf49ac9bf30a7159d3c27c9f8
Author: cpettitt-confluent <53...@users.noreply.github.com>
AuthorDate: Mon Aug 26 09:53:36 2019 -0700

    KAFKA-8412: Fix nullpointer exception thrown on flushing before closing producers (#7207)
    
    Prior to this change an NPE is raised when calling AssignedTasks.close
    under the following conditions:
    
    1. EOS is enabled
    2. The task was in a suspended state
    
    The cause for the NPE is that when a clean close is requested for a
    StreamTask the StreamTask tries to commit. However, in the suspended
    state there is no producer so ultimately an NPE is thrown for the
    contained RecordCollector in flush.
    
    The fix put forth in this commit is to have AssignedTasks call
    closeSuspended when it knows the underlying StreamTask is suspended.
    
    Note also that this test is quite involved. I could have just tested
    that AssignedTasks calls closeSuspended when appropriate, but that is
    testing, IMO, a detail of the implementation and doesn't actually verify
    we reproduced the original problem as it was described. I feel much more
    confident that we are reproducing the behavior - and we can test exactly
    the conditions that lead to it - when testing across AssignedTasks and
    StreamTask. I believe this is an additional support for the argument of
    eventually consolidating the state split across classes.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 checkstyle/import-control.xml                      |   2 +
 .../streams/processor/internals/AssignedTasks.java |  17 ++-
 .../internals/AssignedStreamsTasksTest.java        | 129 +++++++++++++++++++--
 .../processor/internals/StreamTaskTest.java        |  10 +-
 4 files changed, 136 insertions(+), 22 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index a76fd1d..dfaa3f6 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -266,8 +266,10 @@
       <subpackage name="internals">
         <allow pkg="org.I0Itec.zkclient" />
         <allow pkg="com.fasterxml.jackson" />
+        <allow pkg="kafka.utils" />
         <allow pkg="org.apache.zookeeper" />
         <allow pkg="org.apache.zookeeper" />
+        <allow pkg="org.apache.log4j" />
         <subpackage name="testutil">
           <allow pkg="org.apache.log4j" />
         </subpackage>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index a9baa3f..6a39df1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -332,18 +332,23 @@ abstract class AssignedTasks<T extends Task> {
 
     void close(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
-        for (final T task : allTasks()) {
+
+        for (final T task: allTasks()) {
             try {
-                task.close(clean, false);
+                if (suspended.containsKey(task.id())) {
+                    task.closeSuspended(clean, false, null);
+                } else {
+                    task.close(clean, false);
+                }
             } catch (final TaskMigratedException e) {
                 log.info("Failed to close {} {} since it got migrated to another thread already. " +
-                        "Closing it as zombie and move on.", taskTypeName, task.id());
+                    "Closing it as zombie and move on.", taskTypeName, task.id());
                 firstException.compareAndSet(null, closeZombieTask(task));
             } catch (final RuntimeException t) {
                 log.error("Failed while closing {} {} due to the following error:",
-                          task.getClass().getSimpleName(),
-                          task.id(),
-                          t);
+                    task.getClass().getSimpleName(),
+                    task.id(),
+                    t);
                 if (clean) {
                     if (!closeUnclean(task)) {
                         firstException.compareAndSet(null, t);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index ffd0f8b..ca51a3b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -17,27 +17,42 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import kafka.utils.LogCaptureAppender;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.test.MockSourceNode;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
 import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
-
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 public class AssignedStreamsTasksTest {
 
     private final StreamTask t1 = EasyMock.createMock(StreamTask.class);
@@ -451,6 +466,96 @@ public class AssignedStreamsTasksTest {
         EasyMock.verify(t1);
     }
 
+    @Test
+    public void shouldCloseCleanlyWithSuspendedTaskAndEOS() {
+        final String topic = "topic";
+
+        final Deserializer<byte[]> deserializer = Serdes.ByteArray().deserializer();
+        final Serializer<byte[]> serializer = Serdes.ByteArray().serializer();
+
+        final MockConsumer<byte[], byte[]> consumer =
+            new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        final MockProducer<byte[], byte[]> producer =
+            new MockProducer<>(false, serializer, serializer);
+
+        final MockSourceNode<byte[], byte[]> source = new MockSourceNode<>(
+            new String[] {"topic"},
+            deserializer,
+            deserializer);
+
+        final ChangelogReader changelogReader = new MockChangelogReader();
+
+        final ProcessorTopology topology = new ProcessorTopology(
+            Collections.singletonList(source),
+            Collections.singletonMap(topic, source),
+            Collections.emptyMap(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyMap(),
+            Collections.emptySet());
+
+        final Set<TopicPartition> partitions = Collections.singleton(
+            new TopicPartition(topic, 1));
+
+        final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.DEBUG));
+
+        final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(metrics);
+
+        final MockTime time = new MockTime();
+
+        final StateDirectory stateDirectory = new StateDirectory(
+            StreamTaskTest.createConfig(true),
+            time,
+            true);
+
+        final StreamTask task = new StreamTask(
+            new TaskId(0, 0),
+            partitions,
+            topology,
+            consumer,
+            changelogReader,
+            StreamTaskTest.createConfig(true),
+            streamsMetrics,
+            stateDirectory,
+            null,
+            time,
+            () -> producer);
+
+        assignedTasks.addNewTask(task);
+        assignedTasks.initializeNewTasks();
+        assertNull(assignedTasks.suspend());
+
+        // We have to test for close failure by looking at the logs because the current close
+        // logic suppresses the raised exception in AssignedTasks.close. It's not clear if this
+        // is the intended behavior.
+        //
+        // Also note that capturing the failure through this side effect is very brittle.
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        final Level previousLevel =
+            LogCaptureAppender.setClassLoggerLevel(AssignedStreamsTasks.class, Level.ERROR);
+        try {
+            assignedTasks.close(true);
+        } finally {
+            LogCaptureAppender.setClassLoggerLevel(AssignedStreamsTasks.class, previousLevel);
+            LogCaptureAppender.unregister(appender);
+        }
+        if (!appender.getMessages().isEmpty()) {
+            final LoggingEvent firstError = appender.getMessages().head();
+            final String firstErrorCause =
+                firstError.getThrowableStrRep() != null
+                    ? String.join("\n", firstError.getThrowableStrRep())
+                    : "N/A";
+
+            final String failMsg =
+                String.format("Expected no ERROR message while closing assignedTasks, but got %d. " +
+                    "First error: %s. Cause: %s",
+                    appender.getMessages().size(),
+                    firstError.getMessage(),
+                    firstErrorCause);
+            fail(failMsg);
+        }
+    }
+
     private void addAndInitTask() {
         assignedTasks.addNewTask(t1);
         assignedTasks.initializeNewTasks();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index e4b6bd1..c6d753d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -89,6 +89,8 @@ import static org.junit.Assert.fail;
 
 public class StreamTaskTest {
 
+    private static final File BASE_DIR = TestUtils.tempDirectory();
+
     private final Serializer<Integer> intSerializer = Serdes.Integer().serializer();
     private final Serializer<byte[]> bytesSerializer = Serdes.ByteArray().serializer();
     private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer();
@@ -140,7 +142,6 @@ public class StreamTaskTest {
     private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(metrics);
     private final TaskId taskId00 = new TaskId(0, 0);
     private final MockTime time = new MockTime();
-    private final File baseDir = TestUtils.tempDirectory();
     private StateDirectory stateDirectory;
     private StreamTask task;
     private long punctuatedAt;
@@ -175,10 +176,11 @@ public class StreamTaskTest {
                                      Collections.emptySet());
     }
 
-    private StreamsConfig createConfig(final boolean enableEoS) {
+    // Exposed to make it easier to create StreamTask config from other tests.
+    static StreamsConfig createConfig(final boolean enableEoS) {
         final String canonicalPath;
         try {
-            canonicalPath = baseDir.getCanonicalPath();
+            canonicalPath = BASE_DIR.getCanonicalPath();
         } catch (final IOException e) {
             throw new RuntimeException(e);
         }
@@ -210,7 +212,7 @@ public class StreamTaskTest {
                 }
             }
         } finally {
-            Utils.delete(baseDir);
+            Utils.delete(BASE_DIR);
         }
     }