You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/03/20 01:17:09 UTC

[kafka] branch 2.8 updated: KAFKA-12500: fix memory leak in thread cache (#10355)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 45cd9e3  KAFKA-12500: fix memory leak in thread cache (#10355)
45cd9e3 is described below

commit 45cd9e30083ec5195ee032423063102878003a3f
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Mar 19 18:11:07 2021 -0700

    KAFKA-12500: fix memory leak in thread cache (#10355)
    
    Need to exclude threads in PENDING_SHUTDOWN from the num live threads computation used to compute the new cache size per thread. Also adds some logging to help follow what's happening when a thread is added/removed/replaced.
    
    Reviewers: Bruno Cadonna <ca...@confluent.io>, Walker Carlson <wc...@confluent.io>, John Roesler <jo...@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 73 +++++++++-------
 .../kafka/streams/state/internals/ThreadCache.java |  3 +
 .../integration/AdjustStreamThreadCountTest.java   | 96 +++++++++++++++++++++-
 .../integration/MetricsIntegrationTest.java        |  8 +-
 .../integration/utils/IntegrationTestUtils.java    |  9 --
 5 files changed, 143 insertions(+), 46 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index d8023b0..441faf6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -98,7 +98,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
 
 import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
@@ -499,7 +498,7 @@ public class KafkaStreams implements AutoCloseable {
                             "longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
                 }
 
-                if (Thread.currentThread().equals(globalStreamThread) && countStreamThread(StreamThread::isRunning) == 0) {
+                if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) {
                     log.error("Exception in global thread caused the application to attempt to shutdown." +
                             " This action will succeed only if there is at least one StreamThread running on this client." +
                             " Currently there are no running threads so will now close the client.");
@@ -838,8 +837,7 @@ public class KafkaStreams implements AutoCloseable {
         ClientMetrics.addApplicationIdMetric(streamsMetrics, config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
         ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, internalTopologyBuilder.describe().toString());
         ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state);
-        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
-            Math.toIntExact(countStreamThread(thread -> thread.state().isAlive())));
+        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> getNumLiveStreamThreads());
 
         streamsMetadataState = new StreamsMetadataState(
             internalTopologyBuilder,
@@ -965,12 +963,13 @@ public class KafkaStreams implements AutoCloseable {
      */
     public Optional<String> addStreamThread() {
         if (isRunningOrRebalancing()) {
-            final int threadIdx;
-            final long cacheSizePerThread;
             final StreamThread streamThread;
             synchronized (changeThreadCount) {
-                threadIdx = getNextThreadIndex();
-                cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads() + 1);
+                final int threadIdx = getNextThreadIndex();
+                final int numLiveThreads = getNumLiveStreamThreads();
+                final long cacheSizePerThread = getCacheSizePerThread(numLiveThreads + 1);
+                log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}",
+                         threadIdx, numLiveThreads + 1, cacheSizePerThread);
                 resizeThreadCache(cacheSizePerThread);
                 // Creating thread should hold the lock in order to avoid duplicate thread index.
                 // If the duplicate index happen, the metadata of thread may be duplicate too.
@@ -982,14 +981,19 @@ public class KafkaStreams implements AutoCloseable {
                     streamThread.start();
                     return Optional.of(streamThread.getName());
                 } else {
+                    log.warn("Terminating the new thread because the Kafka Streams client is in state {}", state);
                     streamThread.shutdown();
                     threads.remove(streamThread);
-                    resizeThreadCache(getCacheSizePerThread(getNumLiveStreamThreads()));
+                    final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads());
+                    log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread);
+                    resizeThreadCache(cacheSizePerThread);
+                    return Optional.empty();
                 }
             }
+        } else {
+            log.warn("Cannot add a stream thread when Kafka Streams client is in state {}", state);
+            return Optional.empty();
         }
-        log.warn("Cannot add a stream thread when Kafka Streams client is in state  " + state());
-        return Optional.empty();
     }
 
     /**
@@ -1031,7 +1035,8 @@ public class KafkaStreams implements AutoCloseable {
     }
 
     private Optional<String> removeStreamThread(final long timeoutMs) throws TimeoutException {
-        boolean timeout = false;
+        final long startMs = time.milliseconds();
+
         if (isRunningOrRebalancing()) {
             synchronized (changeThreadCount) {
                 // make a copy of threads to avoid holding lock
@@ -1043,18 +1048,23 @@ public class KafkaStreams implements AutoCloseable {
                         streamThread.requestLeaveGroupDuringShutdown();
                         streamThread.shutdown();
                         if (!streamThread.getName().equals(Thread.currentThread().getName())) {
-                            if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
-                                log.warn("Thread " + streamThread.getName() + " did not shutdown in the allotted time");
-                                timeout = true;
+                            final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs);
+                            if (remainingTimeMs <= 0 || !streamThread.waitOnThreadState(StreamThread.State.DEAD, remainingTimeMs)) {
+                                log.warn("{} did not shutdown in the allotted time.", streamThread.getName());
                                 // Don't remove from threads until shutdown is complete. We will trim it from the
                                 // list once it reaches DEAD, and if for some reason it's hanging indefinitely in the
                                 // shutdown then we should just consider this thread.id to be burned
                             } else {
+                                log.info("Successfully removed {} in {}ms", streamThread.getName(), time.milliseconds() - startMs);
                                 threads.remove(streamThread);
                             }
+                        } else {
+                            log.info("{} is the last remaining thread and must remove itself, therefore we cannot wait "
+                                + "for it to complete shutdown as this will result in deadlock.", streamThread.getName());
                         }
 
                         final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads());
+                        log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", cacheSizePerThread);
                         resizeThreadCache(cacheSizePerThread);
                         if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) {
                             final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceID.get());
@@ -1065,7 +1075,8 @@ public class KafkaStreams implements AutoCloseable {
                                     new RemoveMembersFromConsumerGroupOptions(membersToRemove)
                                 );
                             try {
-                                removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs, TimeUnit.MILLISECONDS);
+                                final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs);
+                                removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs, TimeUnit.MILLISECONDS);
                             } catch (final java.util.concurrent.TimeoutException e) {
                                 log.error("Could not remove static member {} from consumer group {} due to a timeout: {}",
                                         groupInstanceID.get(), config.getString(StreamsConfig.APPLICATION_ID_CONFIG), e);
@@ -1083,7 +1094,8 @@ public class KafkaStreams implements AutoCloseable {
                                 );
                             }
                         }
-                        if (timeout) {
+                        final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs);
+                        if (remainingTimeMs <= 0) {
                             throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time");
                         }
                         return Optional.of(streamThread.getName());
@@ -1097,13 +1109,25 @@ public class KafkaStreams implements AutoCloseable {
         return Optional.empty();
     }
 
-    // Returns the number of threads that are not in the DEAD state -- use this over threads.size()
+    /**
+     * Takes a snapshot and counts the number of stream threads which are not in PENDING_SHUTDOWN or DEAD
+     *
+     * note: iteration over SynchronizedList is not thread safe so it must be manually synchronized. However, we may
+     * require other locks when looping threads and it could cause deadlock. Hence, we create a copy to avoid holding
+     * threads lock when looping threads.
+     * @return number of alive stream threads
+     */
     private int getNumLiveStreamThreads() {
         final AtomicInteger numLiveThreads = new AtomicInteger(0);
+
         synchronized (threads) {
             processStreamThread(thread -> {
                 if (thread.state() == StreamThread.State.DEAD) {
+                    log.debug("Trimming thread {} from the threads list since it's state is {}", thread.getName(), StreamThread.State.DEAD);
                     threads.remove(thread);
+                } else if (thread.state() == StreamThread.State.PENDING_SHUTDOWN) {
+                    log.debug("Skipping thread {} from num live threads computation since it's state is {}",
+                              thread.getName(), StreamThread.State.PENDING_SHUTDOWN);
                 } else {
                     numLiveThreads.incrementAndGet();
                 }
@@ -1618,19 +1642,6 @@ public class KafkaStreams implements AutoCloseable {
     }
 
     /**
-     * count the snapshot of threads.
-     * noted: iteration over SynchronizedList is not thread safe so it must be manually synchronized. However, we may
-     * require other locks when looping threads and it could cause deadlock. Hence, we create a copy to avoid holding
-     * threads lock when looping threads.
-     * @param predicate predicate
-     * @return number of matched threads
-     */
-    private long countStreamThread(final Predicate<StreamThread> predicate) {
-        final List<StreamThread> copy = new ArrayList<>(threads);
-        return copy.stream().filter(predicate).count();
-    }
-
-    /**
      * Returns runtime information about the local threads of this {@link KafkaStreams} instance.
      *
      * @return the set of {@link ThreadMetadata}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index fce7875..cf3a39a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -76,6 +76,7 @@ public class ThreadCache {
         final boolean shrink = newCacheSizeBytes < maxCacheSizeBytes;
         maxCacheSizeBytes = newCacheSizeBytes;
         if (shrink) {
+            log.debug("Cache size was shrunk to {}", newCacheSizeBytes);
             if (caches.values().isEmpty()) {
                 return;
             }
@@ -85,6 +86,8 @@ public class ThreadCache {
                 cache.evict();
                 numEvicts++;
             }
+        } else {
+            log.debug("Cache size was expanded to {}", newCacheSizeBytes);
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
index 764af9c..dcca432 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
@@ -20,13 +20,22 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
+
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -34,7 +43,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
-
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -53,6 +61,7 @@ import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
+
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
@@ -61,6 +70,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @Category(IntegrationTest.class)
 public class AdjustStreamThreadCountTest {
@@ -85,7 +95,7 @@ public class AdjustStreamThreadCountTest {
         inputTopic = "input" + testId;
         IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
 
-        builder  = new StreamsBuilder();
+        builder = new StreamsBuilder();
         builder.stream(inputTopic);
 
         properties  = mkObjectProperties(
@@ -346,4 +356,86 @@ public class AdjustStreamThreadCountTest {
             assertEquals(oldThreadCount, kafkaStreams.localThreadsMetadata().size());
         }
     }
+
+    @Test
+    public void shouldResizeCacheAfterThreadRemovalTimesOut() throws InterruptedException {
+        final long totalCacheBytes = 10L;
+        final Properties props = new Properties();
+        props.putAll(properties);
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, totalCacheBytes);
+
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props)) {
+            addStreamStateChangeListener(kafkaStreams);
+            startStreamsAndWaitForRunning(kafkaStreams);
+
+            try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KafkaStreams.class)) {
+                assertThrows(TimeoutException.class, () -> kafkaStreams.removeStreamThread(Duration.ofSeconds(0)));
+
+                for (final String log : appender.getMessages()) {
+                    // all 10 bytes should be available for remaining thread
+                    if (log.endsWith("Resizing thread cache due to thread removal, new cache size per thread is 10")) {
+                        return;
+                    }
+                }
+            }
+        }
+        fail();
+    }
+
+    @Test
+    public void shouldResizeCacheAfterThreadReplacement() throws InterruptedException {
+        final long totalCacheBytes = 10L;
+        final Properties props = new Properties();
+        props.putAll(properties);
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, totalCacheBytes);
+
+        final AtomicBoolean injectError = new AtomicBoolean(false);
+
+        final StreamsBuilder builder  = new StreamsBuilder();
+        final KStream<String, String> stream = builder.stream(inputTopic);
+        stream.transform(() -> new Transformer<String, String, KeyValue<String, String>>() {
+                @Override
+                public void init(final ProcessorContext context) {
+                    context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
+                        if (Thread.currentThread().getName().endsWith("StreamThread-1") && injectError.get()) {
+                            injectError.set(false);
+                            throw new RuntimeException("BOOM");
+                        }
+                    });
+                }
+
+                @Override
+                public KeyValue<String, String> transform(final String key, final String value) {
+                    return new KeyValue<>(key, value);
+                }
+
+                @Override
+                public void close() {
+                }
+            });
+
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props)) {
+            addStreamStateChangeListener(kafkaStreams);
+            kafkaStreams.setUncaughtExceptionHandler(e -> StreamThreadExceptionResponse.REPLACE_THREAD);
+            startStreamsAndWaitForRunning(kafkaStreams);
+
+            stateTransitionHistory.clear();
+            try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
+                injectError.set(true);
+                waitForCondition(() -> !injectError.get(), "StreamThread did not hit and reset the injected error");
+
+                waitForTransitionFromRebalancingToRunning();
+
+                for (final String log : appender.getMessages()) {
+                    // after we replace the thread there should be two remaining threads with 5 bytes each
+                    if (log.endsWith("Adding StreamThread-3, there will now be 2 live threads and the new cache size per thread is 5")) {
+                        return;
+                    }
+                }
+            }
+        }
+        fail();
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 72d65fc..4a5025b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -260,7 +260,7 @@ public class MetricsIntegrationTest {
         final Topology topology = builder.build();
         kafkaStreams = new KafkaStreams(topology, streamsConfiguration);
 
-        verifyAliveStreamThreadsMetric(0);
+        verifyAliveStreamThreadsMetric();
         verifyStateMetric(State.CREATED);
         verifyTopologyDescriptionMetric(topology.describe().toString());
         verifyApplicationIdMetric();
@@ -271,7 +271,7 @@ public class MetricsIntegrationTest {
             timeout,
             () -> "Kafka Streams application did not reach state RUNNING in " + timeout + " ms");
 
-        verifyAliveStreamThreadsMetric(NUM_THREADS);
+        verifyAliveStreamThreadsMetric();
         verifyStateMetric(State.RUNNING);
     }
 
@@ -463,13 +463,13 @@ public class MetricsIntegrationTest {
         checkMetricsDeregistration();
     }
 
-    private void verifyAliveStreamThreadsMetric(final int numThreads) {
+    private void verifyAliveStreamThreadsMetric() {
         final List<Metric> metricsList = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
             .filter(m -> m.metricName().name().equals(ALIVE_STREAM_THREADS) &&
                 m.metricName().group().equals(STREAM_CLIENT_NODE_METRICS))
             .collect(Collectors.toList());
         assertThat(metricsList.size(), is(1));
-        assertThat(metricsList.get(0).metricValue(), is(numThreads));
+        assertThat(metricsList.get(0).metricValue(), is(NUM_THREADS));
     }
 
     private void verifyStateMetric(final State state) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 4afa6a0..630ffca 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -1352,15 +1352,6 @@ public class IntegrationTestUtils {
                                  final long totalRestored) {
         }
 
-        public boolean allStartOffsetsAtZero() {
-            for (final AtomicLong startOffset : changelogToStartOffset.values()) {
-                if (startOffset.get() != 0L) {
-                    return false;
-                }
-            }
-            return true;
-        }
-
         public long totalNumRestored() {
             long totalNumRestored = 0L;
             for (final AtomicLong numRestored : changelogToTotalNumRestored.values()) {