You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/11/16 01:35:53 UTC

[kafka] branch 2.1 updated: KAFKA-7576; Fix shutdown of replica fetcher threads (#5875)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 51dc648  KAFKA-7576; Fix shutdown of replica fetcher threads (#5875)
51dc648 is described below

commit 51dc648e4f0b788c3f3495c93edd446dc72b3225
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Nov 16 01:27:29 2018 +0000

    KAFKA-7576; Fix shutdown of replica fetcher threads (#5875)
    
    ReplicaFetcherThread.shutdown attempts to close the fetcher's Selector while the thread is running. This in unsafe and can result in `Selector.close()` failing with an exception. The exception is caught and logged at debug level, but this can lead to socket leak if the shutdown is due to dynamic config update rather than broker shutdown. This PR changes the shutdown logic to close Selector after the replica fetcher thread is shutdown, with a wakeup() and flag to terminate blocking sen [...]
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
 .../java/org/apache/kafka/clients/KafkaClient.java | 16 ++++++
 .../org/apache/kafka/clients/NetworkClient.java    | 40 ++++++++++++-
 .../apache/kafka/clients/NetworkClientUtils.java   | 33 +++++++----
 .../java/org/apache/kafka/clients/MockClient.java  | 12 ++++
 .../controller/ControllerChannelManager.scala      |  7 +++
 .../kafka/server/ReplicaFetcherBlockingSend.scala  |  6 ++
 .../scala/kafka/server/ReplicaFetcherThread.scala  | 27 ++++++---
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 33 ++++-------
 .../unit/kafka/server/ServerShutdownTest.scala     | 65 +++++++++++++++++++++-
 .../util/ReplicaFetcherMockBlockingSend.scala      |  2 +
 10 files changed, 197 insertions(+), 44 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 448932e..18a7eef 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -197,4 +197,20 @@ public interface KafkaClient extends Closeable {
                                    int requestTimeoutMs,
                                    RequestCompletionHandler callback);
 
+
+
+    /**
+     * Initiates shutdown of this client. This method may be invoked from another thread while this
+     * client is being polled. No further requests may be sent using the client. The current poll()
+     * will be terminated using wakeup(). The client should be explicitly shutdown using {@link #close()}
+     * after poll returns. Note that {@link #close()} should not be invoked concurrently while polling.
+     */
+    void initiateClose();
+
+    /**
+     * Returns true if the client is still active. Returns false if {@link #initiateClose()} or {@link #close()}
+     * was invoked for this client.
+     */
+    boolean active();
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index c6f0c0b..93b9db1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelState;
@@ -54,6 +55,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
@@ -64,6 +66,12 @@ import java.util.stream.Collectors;
  */
 public class NetworkClient implements KafkaClient {
 
+    private enum State {
+        ACTIVE,
+        CLOSING,
+        CLOSED
+    }
+
     private final Logger log;
 
     /* the selector used to perform network i/o */
@@ -114,6 +122,8 @@ public class NetworkClient implements KafkaClient {
 
     private final Sensor throttleTimeSensor;
 
+    private final AtomicReference<State> state;
+
     public NetworkClient(Selectable selector,
                          Metadata metadata,
                          String clientId,
@@ -254,6 +264,7 @@ public class NetworkClient implements KafkaClient {
         this.throttleTimeSensor = throttleTimeSensor;
         this.log = logContext.logger(NetworkClient.class);
         this.clientDnsLookup = clientDnsLookup;
+        this.state = new AtomicReference<>(State.ACTIVE);
     }
 
     /**
@@ -429,6 +440,7 @@ public class NetworkClient implements KafkaClient {
     }
 
     private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
+        ensureActive();
         String nodeId = clientRequest.destination();
         if (!isInternalRequest) {
             // If this request came from outside the NetworkClient, validate
@@ -507,6 +519,8 @@ public class NetworkClient implements KafkaClient {
      */
     @Override
     public List<ClientResponse> poll(long timeout, long now) {
+        ensureActive();
+
         if (!abortedSends.isEmpty()) {
             // If there are aborted sends because of unsupported version exceptions or disconnects,
             // handle them immediately without waiting for Selector#poll.
@@ -586,13 +600,35 @@ public class NetworkClient implements KafkaClient {
         this.selector.wakeup();
     }
 
+    @Override
+    public void initiateClose() {
+        if (state.compareAndSet(State.ACTIVE, State.CLOSING)) {
+            wakeup();
+        }
+    }
+
+    @Override
+    public boolean active() {
+        return state.get() == State.ACTIVE;
+    }
+
+    private void ensureActive() {
+        if (!active())
+            throw new DisconnectException("NetworkClient is no longer active, state is " + state);
+    }
+
     /**
      * Close the network client
      */
     @Override
     public void close() {
-        this.selector.close();
-        this.metadataUpdater.close();
+        state.compareAndSet(State.ACTIVE, State.CLOSING);
+        if (state.compareAndSet(State.CLOSING, State.CLOSED)) {
+            this.selector.close();
+            this.metadataUpdater.close();
+        } else {
+            log.warn("Attempting to close NetworkClient that has already been closed.");
+        }
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
index 94fe288..c952b82 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.utils.Time;
 
 import java.io.IOException;
@@ -83,25 +84,35 @@ public final class NetworkClientUtils {
      * disconnection happens (which can happen for a number of reasons including a request timeout).
      *
      * In case of a disconnection, an `IOException` is thrown.
+     * If shutdown is initiated on the client during this method, an IOException is thrown.
      *
      * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
      * care.
      */
     public static ClientResponse sendAndReceive(KafkaClient client, ClientRequest request, Time time) throws IOException {
-        client.send(request, time.milliseconds());
-        while (true) {
-            List<ClientResponse> responses = client.poll(Long.MAX_VALUE, time.milliseconds());
-            for (ClientResponse response : responses) {
-                if (response.requestHeader().correlationId() == request.correlationId()) {
-                    if (response.wasDisconnected()) {
-                        throw new IOException("Connection to " + response.destination() + " was disconnected before the response was read");
+        try {
+            client.send(request, time.milliseconds());
+            while (client.active()) {
+                List<ClientResponse> responses = client.poll(Long.MAX_VALUE, time.milliseconds());
+                for (ClientResponse response : responses) {
+                    if (response.requestHeader().correlationId() == request.correlationId()) {
+                        if (response.wasDisconnected()) {
+                            throw new IOException("Connection to " + response.destination() + " was disconnected before the response was read");
+                        }
+                        if (response.versionMismatch() != null) {
+                            throw response.versionMismatch();
+                        }
+                        return response;
                     }
-                    if (response.versionMismatch() != null) {
-                        throw response.versionMismatch();
-                    }
-                    return response;
                 }
             }
+            throw new IOException("Client was shutdown before response was read");
+        } catch (DisconnectException e) {
+            if (client.active())
+                throw e;
+            else
+                throw new IOException("Client was shutdown before response was read");
+
         }
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index a586af8..11822ac 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -96,6 +96,7 @@ public class MockClient implements KafkaClient {
     private final Queue<MetadataUpdate> metadataUpdates = new ConcurrentLinkedDeque<>();
     private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
     private volatile int numBlockingWakeups = 0;
+    private volatile boolean active = true;
 
     public MockClient(Time time) {
         this(time, null);
@@ -533,7 +534,18 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
+    public void initiateClose() {
+        close();
+    }
+
+    @Override
+    public boolean active() {
+        return active;
+    }
+
+    @Override
     public void close() {
+        active = false;
         metadata.close();
     }
 
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 7002219..9ca54b3 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -291,6 +291,13 @@ class RequestSendThread(val controllerId: Int,
     }
   }
 
+  override def initiateShutdown(): Boolean = {
+    if (super.initiateShutdown()) {
+      networkClient.initiateClose()
+      true
+    } else
+      false
+  }
 }
 
 class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogger: StateChangeLogger) extends  Logging {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
index 4e642f3..d6a92b4 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -35,6 +35,8 @@ trait BlockingSend {
 
   def sendRequest(requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): ClientResponse
 
+  def initiateClose()
+
   def close()
 }
 
@@ -104,6 +106,10 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
     }
   }
 
+  override def initiateClose(): Unit = {
+    networkClient.initiateClose()
+  }
+
   def close(): Unit = {
     networkClient.close()
   }
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 71a4b95..4452d89 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -115,23 +115,32 @@ class ReplicaFetcherThread(name: String,
   override def initiateShutdown(): Boolean = {
     val justShutdown = super.initiateShutdown()
     if (justShutdown) {
-      // leaderEndpoint.close() can throw an exception when the replica fetcher thread is still
-      // actively fetching because the selector can close the channel while sending the request
-      // after we initiate leaderEndpoint.close() and the leaderEndpoint.close() itself may also close
-      // the channel again. When this race condition happens, an exception will be thrown.
-      // Throwing the exception to the caller may fail the ReplicaManager shutdown. It is safe to catch
-      // the exception without here causing correctness issue because we are going to shutdown the thread
-      // and will not re-use the leaderEndpoint anyway.
+      // This is thread-safe, so we don't expect any exceptions, but catch and log any errors
+      // to avoid failing the caller, especially during shutdown. We will attempt to close
+      // leaderEndpoint after the thread terminates.
       try {
-        leaderEndpoint.close()
+        leaderEndpoint.initiateClose()
       } catch {
         case t: Throwable =>
-          debug(s"Fail to close leader endpoint $leaderEndpoint after initiating replica fetcher thread shutdown", t)
+          error(s"Failed to initiate shutdown of leader endpoint $leaderEndpoint after initiating replica fetcher thread shutdown", t)
       }
     }
     justShutdown
   }
 
+  override def awaitShutdown(): Unit = {
+    super.awaitShutdown()
+    // We don't expect any exceptions here, but catch and log any errors to avoid failing the caller,
+    // especially during shutdown. It is safe to catch the exception here without causing correctness
+    // issue because we are going to shutdown the thread and will not re-use the leaderEndpoint anyway.
+    try {
+      leaderEndpoint.close()
+    } catch {
+      case t: Throwable =>
+        error(s"Failed to close leader endpoint $leaderEndpoint after shutting down replica fetcher thread", t)
+    }
+  }
+
   // process fetched data
   override def processPartitionData(topicPartition: TopicPartition,
                                     fetchOffset: Long,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index c9d9b96..a370101 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -24,7 +24,7 @@ import kafka.cluster.Partition
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
-import kafka.utils.{LogCaptureAppender, TestUtils}
+import kafka.utils.TestUtils
 import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
@@ -33,7 +33,6 @@ import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRequest}
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.utils.SystemTime
-import org.apache.log4j.Level
 import org.easymock.EasyMock._
 import org.easymock.{Capture, CaptureType, IAnswer}
 import org.junit.Assert._
@@ -799,7 +798,8 @@ class ReplicaFetcherThreadTest {
     val config = KafkaConfig.fromProps(props)
     val mockBlockingSend = createMock(classOf[BlockingSend])
 
-    expect(mockBlockingSend.close()).andThrow(new IllegalArgumentException()).once()
+    expect(mockBlockingSend.initiateClose()).andThrow(new IllegalArgumentException()).once()
+    expect(mockBlockingSend.close()).andThrow(new IllegalStateException()).once()
     replay(mockBlockingSend)
 
     val thread = new ReplicaFetcherThread(
@@ -812,24 +812,15 @@ class ReplicaFetcherThreadTest {
       time = new SystemTime(),
       quota = null,
       leaderEndpointBlockingSend = Some(mockBlockingSend))
-
-    val previousLevel = LogCaptureAppender.setClassLoggerLevel(thread.getClass, Level.DEBUG)
-    val logCaptureAppender = LogCaptureAppender.createAndRegister()
-
-    try {
-      thread.initiateShutdown()
-
-      val event = logCaptureAppender.getMessages.find(e => e.getLevel == Level.DEBUG
-        && e.getRenderedMessage.contains(s"Fail to close leader endpoint $mockBlockingSend after initiating replica fetcher thread shutdown")
-        && e.getThrowableInformation != null
-        && e.getThrowableInformation.getThrowable.getClass.getName.equals(new IllegalArgumentException().getClass.getName))
-      assertTrue(event.isDefined)
-
-      verify(mockBlockingSend)
-    } finally {
-      LogCaptureAppender.unregister(logCaptureAppender)
-      LogCaptureAppender.setClassLoggerLevel(thread.getClass, previousLevel)
-    }
+    thread.start()
+
+    // Verify that:
+    //   1) IllegalArgumentException thrown by BlockingSend#initiateClose() during `initiateShutdown` is not propagated
+    //   2) BlockingSend.close() is invoked even if BlockingSend#initiateClose() fails
+    //   3) IllegalStateException thrown by BlockingSend.close() during `awaitShutdown` is not propagated
+    thread.initiateShutdown()
+    thread.awaitShutdown()
+    verify(mockBlockingSend)
   }
 
   def stub(replica: Replica, partition: Partition, replicaManager: ReplicaManager) = {
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index ff09749..416e46c 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -19,15 +19,24 @@ package kafka.server
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.{CoreUtils, TestUtils}
 import kafka.utils.TestUtils._
-import java.io.File
+import java.io.{DataInputStream, File}
+import java.net.ServerSocket
+import java.util.concurrent.{Executors, TimeUnit}
 
+import kafka.cluster.Broker
+import kafka.controller.{ControllerChannelManager, ControllerContext, StateChangeLogger}
 import kafka.log.LogManager
 import kafka.zookeeper.ZooKeeperClientTimeoutException
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.LeaderAndIsrRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
+import org.apache.kafka.common.utils.Time
 import org.junit.{Before, Test}
 import org.junit.Assert._
 
@@ -189,4 +198,58 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
     server.awaitShutdown()
     server.shutdown()
   }
+
+  // Verify that if controller is in the midst of processing a request, shutdown completes
+  // without waiting for request timeout.
+  @Test
+  def testControllerShutdownDuringSend(): Unit = {
+    val securityProtocol = SecurityProtocol.PLAINTEXT
+    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+
+    val controllerId = 2
+    val metrics = new Metrics
+    val executor = Executors.newSingleThreadExecutor
+    var serverSocket: ServerSocket = null
+    var controllerChannelManager: ControllerChannelManager = null
+
+    try {
+      // Set up a server to accept a connection and receive one byte from the first request. No response is sent.
+      serverSocket = new ServerSocket(0)
+      val receiveFuture = executor.submit(new Runnable {
+        override def run(): Unit = {
+          val socket = serverSocket.accept()
+          new DataInputStream(socket.getInputStream).readByte()
+        }
+      })
+
+      // Start a ControllerChannelManager
+      val brokers = Seq(new Broker(1, "localhost", serverSocket.getLocalPort, listenerName, securityProtocol))
+      val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, zkConnect))
+      val controllerContext = new ControllerContext
+      controllerContext.liveBrokers = brokers.toSet
+      controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM,
+        metrics, new StateChangeLogger(controllerId, inControllerContext = true, None))
+      controllerChannelManager.startup()
+
+      // Initiate a sendRequest and wait until connection is established and one byte is received by the peer
+      val requestBuilder = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
+        controllerId, 1, Map.empty.asJava, brokers.map(_.node(listenerName)).toSet.asJava)
+      controllerChannelManager.sendRequest(1, ApiKeys.LEADER_AND_ISR, requestBuilder)
+      receiveFuture.get(10, TimeUnit.SECONDS)
+
+      // Shutdown controller. Request timeout is 30s, verify that shutdown completed well before that
+      val shutdownFuture = executor.submit(new Runnable {
+        override def run(): Unit = controllerChannelManager.shutdown()
+      })
+      shutdownFuture.get(10, TimeUnit.SECONDS)
+
+    } finally {
+      if (serverSocket != null)
+        serverSocket.close()
+      if (controllerChannelManager != null)
+        controllerChannelManager.shutdown()
+      executor.shutdownNow()
+      metrics.close()
+    }
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
index b7c037e..fdb22b5 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
@@ -86,5 +86,7 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc
       true)
   }
 
+  override def initiateClose(): Unit = {}
+
   override def close(): Unit = {}
 }