You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/01 19:55:59 UTC

[kafka] branch 2.1 updated: KAFKA-8066; Always close the sensors in Selector.close() (#6402)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 1f4419d  KAFKA-8066; Always close the sensors in Selector.close() (#6402)
1f4419d is described below

commit 1f4419d0ea6db548402326ae17bd7f0da03ce8e6
Author: Zhanxiang (Patrick) Huang <hz...@hotmail.com>
AuthorDate: Wed May 1 12:40:49 2019 -0700

    KAFKA-8066; Always close the sensors in Selector.close() (#6402)
    
    When shutting down the ReplicaFetcher thread, we may fail to unregister sensors in selector.close(). When that happened, we will fail to start up the ReplicaFetcherThread with the same fetch id again because of the IllegalArgumentException in sensor registration. This issue will cause constant URPs in the cluster because the ReplicaFetchterThread is gone.
    
    This patch addresses this issue by introducing a try-finally block in selector.close() so that we will always unregister the sensors in shutting down ReplicaFetcherThreads.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../java/org/apache/kafka/clients/ClientUtils.java | 13 ----------
 .../kafka/clients/consumer/KafkaConsumer.java      | 12 +++++-----
 .../kafka/clients/producer/KafkaProducer.java      | 11 +++++----
 .../org/apache/kafka/common/network/Selector.java  | 28 +++++++++++++++-------
 .../java/org/apache/kafka/common/utils/Utils.java  | 13 ++++++++++
 .../apache/kafka/common/network/SelectorTest.java  | 26 ++++++++++++++++++++
 .../runtime/distributed/WorkerGroupMember.java     |  7 +++---
 7 files changed, 75 insertions(+), 35 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index dce5f3f..6c4bd13 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -26,14 +26,12 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
@@ -87,17 +85,6 @@ public final class ClientUtils {
         return addresses;
     }
 
-    public static void closeQuietly(Closeable c, String name, AtomicReference<Throwable> firstException) {
-        if (c != null) {
-            try {
-                c.close();
-            } catch (Throwable t) {
-                firstException.compareAndSet(null, t);
-                log.error("Failed to close " + name, t);
-            }
-        }
-    }
-
     /**
      * @param config client configs
      * @return configured ChannelBuilder based on the configs.
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 4061373..430dfff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -2128,12 +2128,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             firstException.compareAndSet(null, t);
             log.error("Failed to close coordinator", t);
         }
-        ClientUtils.closeQuietly(fetcher, "fetcher", firstException);
-        ClientUtils.closeQuietly(interceptors, "consumer interceptors", firstException);
-        ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
-        ClientUtils.closeQuietly(client, "consumer network client", firstException);
-        ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);
-        ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);
+        Utils.closeQuietly(fetcher, "fetcher", firstException);
+        Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
+        Utils.closeQuietly(metrics, "consumer metrics", firstException);
+        Utils.closeQuietly(client, "consumer network client", firstException);
+        Utils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);
+        Utils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);
         AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
         log.debug("Kafka consumer has been closed");
         Throwable exception = firstException.get();
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 8528bba..8b12a76 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -80,6 +80,7 @@ import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
 
@@ -1180,11 +1181,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             }
         }
 
-        ClientUtils.closeQuietly(interceptors, "producer interceptors", firstException);
-        ClientUtils.closeQuietly(metrics, "producer metrics", firstException);
-        ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException);
-        ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
-        ClientUtils.closeQuietly(partitioner, "producer partitioner", firstException);
+        Utils.closeQuietly(interceptors, "producer interceptors", firstException);
+        Utils.closeQuietly(metrics, "producer metrics", firstException);
+        Utils.closeQuietly(keySerializer, "producer keySerializer", firstException);
+        Utils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
+        Utils.closeQuietly(partitioner, "producer partitioner", firstException);
         AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
         log.debug("Kafka producer has been closed");
         Throwable exception = firstException.get();
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 93325d5..c6dcd34 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.metrics.stats.SampledStat;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
 import java.io.IOException;
@@ -51,6 +52,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A nioSelector interface for doing non-blocking multi-connection network I/O.
@@ -353,15 +355,24 @@ public class Selector implements Selectable, AutoCloseable {
     @Override
     public void close() {
         List<String> connections = new ArrayList<>(channels.keySet());
-        for (String id : connections)
-            close(id);
         try {
-            this.nioSelector.close();
-        } catch (IOException | SecurityException e) {
-            log.error("Exception closing nioSelector:", e);
+            for (String id : connections)
+                close(id);
+        } finally {
+            // If there is any exception thrown in close(id), we should still be able
+            // to close the remaining objects, especially the sensors because keeping
+            // the sensors may lead to failure to start up the ReplicaFetcherThread if
+            // the old sensors with the same names has not yet been cleaned up.
+            AtomicReference<Throwable> firstException = new AtomicReference<>();
+            Utils.closeQuietly(nioSelector, "nioSelector", firstException);
+            Utils.closeQuietly(sensors, "sensors", firstException);
+            Utils.closeQuietly(channelBuilder, "channelBuilder", firstException);
+            Throwable exception = firstException.get();
+            if (exception instanceof RuntimeException && !(exception instanceof SecurityException)) {
+                throw (RuntimeException) exception;
+            }
+
         }
-        sensors.close();
-        channelBuilder.close();
     }
 
     /**
@@ -961,7 +972,7 @@ public class Selector implements Selectable, AutoCloseable {
         return deque == null ? 0 : deque.size();
     }
 
-    private class SelectorMetrics {
+    private class SelectorMetrics implements AutoCloseable {
         private final Metrics metrics;
         private final String metricGrpPrefix;
         private final Map<String, String> metricTags;
@@ -1137,6 +1148,7 @@ public class Selector implements Selectable, AutoCloseable {
             }
         }
 
+        @Override
         public void close() {
             for (MetricName metricName : topLevelMetricNames)
                 metrics.removeMetric(metricName);
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index f94858c..ca86c9b 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -57,6 +57,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -852,6 +853,18 @@ public final class Utils {
         }
     }
 
+    public static void closeQuietly(AutoCloseable closeable, String name, AtomicReference<Throwable> firstException) {
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (Throwable t) {
+                firstException.compareAndSet(null, t);
+                log.error("Failed to close {} with type {}", name, closeable.getClass().getName(), t);
+            }
+        }
+    }
+
+
     /**
      * A cheap way to deterministically convert a number to a positive value. When the input is
      * positive, the original value is returned. When the input number is negative, the returned
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 6cf7586..a1c0be8 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -634,6 +634,32 @@ public class SelectorTest {
         assertEquals((double) conns, getMetric("connection-count").metricValue());
     }
 
+    @Test
+    public void testMetricsCleanupOnSelectorClose() throws Exception {
+        Metrics metrics = new Metrics();
+        Selector selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()) {
+            @Override
+            public void close(String id) {
+                throw new RuntimeException();
+            }
+        };
+        assertTrue(metrics.metrics().size() > 1);
+        String id = "0";
+        selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
+
+        // Close the selector and ensure a RuntimeException has been throw
+        try {
+            selector.close();
+            fail();
+        } catch (RuntimeException e) {
+            // Expected
+        }
+
+        // We should only have one remaining metric for kafka-metrics-count, which is a global metric
+        assertEquals(1, metrics.metrics().size());
+    }
+
+
     private String blockingRequest(String node, String s) throws IOException {
         selector.send(createSend(node, s));
         selector.poll(1000L);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 5725ff5..1082714 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
@@ -204,9 +205,9 @@ public class WorkerGroupMember {
         log.trace("Stopping the Connect group member.");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
         this.stopped = true;
-        ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
-        ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
-        ClientUtils.closeQuietly(client, "consumer network client", firstException);
+        Utils.closeQuietly(coordinator, "coordinator", firstException);
+        Utils.closeQuietly(metrics, "consumer metrics", firstException);
+        Utils.closeQuietly(client, "consumer network client", firstException);
         AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
         if (firstException.get() != null && !swallowException)
             throw new KafkaException("Failed to stop the Connect group member", firstException.get());