You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/11/16 11:15:34 UTC

[kafka] branch 1.1 updated: KAFKA-7576; Fix shutdown of replica fetcher threads (#5875)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 234d184  KAFKA-7576; Fix shutdown of replica fetcher threads (#5875)
234d184 is described below

commit 234d184b6221890a0efb7b6e8c795a932f457797
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Nov 16 01:27:29 2018 +0000

    KAFKA-7576; Fix shutdown of replica fetcher threads (#5875)
    
    ReplicaFetcherThread.shutdown attempts to close the fetcher's Selector while the thread is running. This in unsafe and can result in `Selector.close()` failing with an exception. The exception is caught and logged at debug level, but this can lead to socket leak if the shutdown is due to dynamic config update rather than broker shutdown. This PR changes the shutdown logic to close Selector after the replica fetcher thread is shutdown, with a wakeup() and flag to terminate blocking sen [...]
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
 .../java/org/apache/kafka/clients/KafkaClient.java | 16 +++++
 .../org/apache/kafka/clients/NetworkClient.java    | 38 +++++++++++-
 .../apache/kafka/clients/NetworkClientUtils.java   | 33 ++++++----
 .../java/org/apache/kafka/clients/MockClient.java  | 12 ++++
 .../controller/ControllerChannelManager.scala      |  7 +++
 .../kafka/server/ReplicaFetcherBlockingSend.scala  |  6 ++
 .../scala/kafka/server/ReplicaFetcherThread.scala  | 23 ++++++-
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 32 ++++++++++
 .../unit/kafka/server/ServerShutdownTest.scala     | 70 +++++++++++++++++++++-
 .../util/ReplicaFetcherMockBlockingSend.scala      |  2 +
 10 files changed, 223 insertions(+), 16 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 0a9b519..05ef6d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -177,4 +177,20 @@ public interface KafkaClient extends Closeable {
     ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
                                    boolean expectResponse, RequestCompletionHandler callback);
 
+
+
+    /**
+     * Initiates shutdown of this client. This method may be invoked from another thread while this
+     * client is being polled. No further requests may be sent using the client. The current poll()
+     * will be terminated using wakeup(). The client should be explicitly shutdown using {@link #close()}
+     * after poll returns. Note that {@link #close()} should not be invoked concurrently while polling.
+     */
+    void initiateClose();
+
+    /**
+     * Returns true if the client is still active. Returns false if {@link #initiateClose()} or {@link #close()}
+     * was invoked for this client.
+     */
+    boolean active();
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 4b291b5..7fd6c91 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelState;
@@ -52,6 +53,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A network client for asynchronous request/response network i/o. This is an internal class used to implement the
@@ -61,6 +63,12 @@ import java.util.Random;
  */
 public class NetworkClient implements KafkaClient {
 
+    private enum State {
+        ACTIVE,
+        CLOSING,
+        CLOSED
+    }
+
     private final Logger log;
 
     /* the selector used to perform network i/o */
@@ -109,6 +117,8 @@ public class NetworkClient implements KafkaClient {
 
     private final Sensor throttleTimeSensor;
 
+    private final AtomicReference<State> state;
+
     public NetworkClient(Selectable selector,
                          Metadata metadata,
                          String clientId,
@@ -208,6 +218,7 @@ public class NetworkClient implements KafkaClient {
         this.apiVersions = apiVersions;
         this.throttleTimeSensor = throttleTimeSensor;
         this.log = logContext.logger(NetworkClient.class);
+        this.state = new AtomicReference<>(State.ACTIVE);
     }
 
     /**
@@ -365,6 +376,7 @@ public class NetworkClient implements KafkaClient {
     }
 
     private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
+        ensureActive();
         String nodeId = clientRequest.destination();
         if (!isInternalRequest) {
             // If this request came from outside the NetworkClient, validate
@@ -446,6 +458,8 @@ public class NetworkClient implements KafkaClient {
      */
     @Override
     public List<ClientResponse> poll(long timeout, long now) {
+        ensureActive();
+
         if (!abortedSends.isEmpty()) {
             // If there are aborted sends because of unsupported version exceptions or disconnects,
             // handle them immediately without waiting for Selector#poll.
@@ -525,12 +539,34 @@ public class NetworkClient implements KafkaClient {
         this.selector.wakeup();
     }
 
+    @Override
+    public void initiateClose() {
+        if (state.compareAndSet(State.ACTIVE, State.CLOSING)) {
+            wakeup();
+        }
+    }
+
+    @Override
+    public boolean active() {
+        return state.get() == State.ACTIVE;
+    }
+
+    private void ensureActive() {
+        if (!active())
+            throw new DisconnectException("NetworkClient is no longer active, state is " + state);
+    }
+
     /**
      * Close the network client
      */
     @Override
     public void close() {
-        this.selector.close();
+        state.compareAndSet(State.ACTIVE, State.CLOSING);
+        if (state.compareAndSet(State.CLOSING, State.CLOSED)) {
+            this.selector.close();
+        } else {
+            log.warn("Attempting to close NetworkClient that has already been closed.");
+        }
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
index 94fe288..c952b82 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.utils.Time;
 
 import java.io.IOException;
@@ -83,25 +84,35 @@ public final class NetworkClientUtils {
      * disconnection happens (which can happen for a number of reasons including a request timeout).
      *
      * In case of a disconnection, an `IOException` is thrown.
+     * If shutdown is initiated on the client during this method, an IOException is thrown.
      *
      * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
      * care.
      */
     public static ClientResponse sendAndReceive(KafkaClient client, ClientRequest request, Time time) throws IOException {
-        client.send(request, time.milliseconds());
-        while (true) {
-            List<ClientResponse> responses = client.poll(Long.MAX_VALUE, time.milliseconds());
-            for (ClientResponse response : responses) {
-                if (response.requestHeader().correlationId() == request.correlationId()) {
-                    if (response.wasDisconnected()) {
-                        throw new IOException("Connection to " + response.destination() + " was disconnected before the response was read");
+        try {
+            client.send(request, time.milliseconds());
+            while (client.active()) {
+                List<ClientResponse> responses = client.poll(Long.MAX_VALUE, time.milliseconds());
+                for (ClientResponse response : responses) {
+                    if (response.requestHeader().correlationId() == request.correlationId()) {
+                        if (response.wasDisconnected()) {
+                            throw new IOException("Connection to " + response.destination() + " was disconnected before the response was read");
+                        }
+                        if (response.versionMismatch() != null) {
+                            throw response.versionMismatch();
+                        }
+                        return response;
                     }
-                    if (response.versionMismatch() != null) {
-                        throw response.versionMismatch();
-                    }
-                    return response;
                 }
             }
+            throw new IOException("Client was shutdown before response was read");
+        } catch (DisconnectException e) {
+            if (client.active())
+                throw e;
+            else
+                throw new IOException("Client was shutdown before response was read");
+
         }
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index ee03714..4efcdbf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -86,6 +86,7 @@ public class MockClient implements KafkaClient {
     private final Queue<MetadataUpdate> metadataUpdates = new ArrayDeque<>();
     private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
     private volatile int numBlockingWakeups = 0;
+    private volatile boolean active = true;
 
     public MockClient(Time time) {
         this(time, null);
@@ -453,7 +454,18 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
+    public void initiateClose() {
+        close();
+    }
+
+    @Override
+    public boolean active() {
+        return active;
+    }
+
+    @Override
     public void close() {
+        active = false;
     }
 
     @Override
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 7f0b8a4..0cab456 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -283,6 +283,13 @@ class RequestSendThread(val controllerId: Int,
     }
   }
 
+  override def initiateShutdown(): Boolean = {
+    if (super.initiateShutdown()) {
+      networkClient.initiateClose()
+      true
+    } else
+      false
+  }
 }
 
 class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogger: StateChangeLogger) extends  Logging {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
index 0bf2bd3..2fa84a2 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -35,6 +35,8 @@ trait BlockingSend {
 
   def sendRequest(requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): ClientResponse
 
+  def initiateClose()
+
   def close()
 }
 
@@ -103,6 +105,10 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
     }
   }
 
+  override def initiateClose(): Unit = {
+    networkClient.initiateClose()
+  }
+
   def close(): Unit = {
     networkClient.close()
   }
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 4fceef8..dbf07a4 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -84,11 +84,32 @@ class ReplicaFetcherThread(name: String,
   override def initiateShutdown(): Boolean = {
     val justShutdown = super.initiateShutdown()
     if (justShutdown) {
-      leaderEndpoint.close()
+      // This is thread-safe, so we don't expect any exceptions, but catch and log any errors
+      // to avoid failing the caller, especially during shutdown. We will attempt to close
+      // leaderEndpoint after the thread terminates.
+      try {
+        leaderEndpoint.initiateClose()
+      } catch {
+        case t: Throwable =>
+          error(s"Failed to initiate shutdown of leader endpoint $leaderEndpoint after initiating replica fetcher thread shutdown", t)
+      }
     }
     justShutdown
   }
 
+  override def awaitShutdown(): Unit = {
+    super.awaitShutdown()
+    // We don't expect any exceptions here, but catch and log any errors to avoid failing the caller,
+    // especially during shutdown. It is safe to catch the exception here without causing correctness
+    // issue because we are going to shutdown the thread and will not re-use the leaderEndpoint anyway.
+    try {
+      leaderEndpoint.close()
+    } catch {
+      case t: Throwable =>
+        error(s"Failed to close leader endpoint $leaderEndpoint after shutting down replica fetcher thread", t)
+    }
+  }
+
   // process fetched data
   def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) {
     val replica = replicaMgr.getReplicaOrException(topicPartition)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 1f4d04b..23ae265 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -406,6 +406,38 @@ class ReplicaFetcherThreadTest {
     assertEquals(49, truncateToCapture.getValue)
   }
 
+  @Test
+  def shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread(): Unit = {
+    val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+    val config = KafkaConfig.fromProps(props)
+    val mockBlockingSend = createMock(classOf[BlockingSend])
+    val brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000)
+
+    expect(mockBlockingSend.initiateClose()).andThrow(new IllegalArgumentException()).once()
+    expect(mockBlockingSend.close()).andThrow(new IllegalStateException()).once()
+    replay(mockBlockingSend)
+
+    val thread = new ReplicaFetcherThread(
+      name = "bob",
+      fetcherId = 0,
+      sourceBroker = brokerEndPoint,
+      brokerConfig = config,
+      replicaMgr = null,
+      metrics =  new Metrics(),
+      time = new SystemTime(),
+      quota = null,
+      leaderEndpointBlockingSend = Some(mockBlockingSend))
+    thread.start()
+
+    // Verify that:
+    //   1) IllegalArgumentException thrown by BlockingSend#initiateClose() during `initiateShutdown` is not propagated
+    //   2) BlockingSend.close() is invoked even if BlockingSend#initiateClose() fails
+    //   3) IllegalStateException thrown by BlockingSend.close() during `awaitShutdown` is not propagated
+    thread.initiateShutdown()
+    thread.awaitShutdown()
+    verify(mockBlockingSend)
+  }
+
   def stub(replica: Replica, partition: Partition, replicaManager: ReplicaManager) = {
     expect(replicaManager.getReplica(t1p0)).andReturn(Some(replica)).anyTimes()
     expect(replicaManager.getReplicaOrException(t1p0)).andReturn(replica).anyTimes()
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 5e33816..1df0928 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -16,22 +16,32 @@
  */
 package kafka.server
 
+import java.io.{DataInputStream, File}
+import java.net.{ServerSocket, UnknownHostException}
+import java.util.concurrent.{Executors, TimeUnit}
+
 import kafka.zk.ZooKeeperTestHarness
 import kafka.consumer.SimpleConsumer
 import kafka.utils.{CoreUtils, TestUtils}
 import kafka.utils.TestUtils._
 import kafka.api.FetchRequestBuilder
 import kafka.message.ByteBufferMessageSet
-import java.io.File
-import java.net.UnknownHostException
-
+import kafka.cluster.Broker
+import kafka.controller.{ControllerChannelManager, ControllerContext, StateChangeLogger}
 import kafka.log.LogManager
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.LeaderAndIsrRequest
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
+import org.apache.kafka.common.utils.Time
 import org.junit.{Before, Test}
 import org.junit.Assert._
 
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 class ServerShutdownTest extends ZooKeeperTestHarness {
@@ -195,4 +205,58 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
       case _: Throwable => fail()
     }
   }
+
+  // Verify that if controller is in the midst of processing a request, shutdown completes
+  // without waiting for request timeout.
+  @Test
+  def testControllerShutdownDuringSend(): Unit = {
+    val securityProtocol = SecurityProtocol.PLAINTEXT
+    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+
+    val controllerId = 2
+    val metrics = new Metrics
+    val executor = Executors.newSingleThreadExecutor
+    var serverSocket: ServerSocket = null
+    var controllerChannelManager: ControllerChannelManager = null
+
+    try {
+      // Set up a server to accept a connection and receive one byte from the first request. No response is sent.
+      serverSocket = new ServerSocket(0)
+      val receiveFuture = executor.submit(new Runnable {
+        override def run(): Unit = {
+          val socket = serverSocket.accept()
+          new DataInputStream(socket.getInputStream).readByte()
+        }
+      })
+
+      // Start a ControllerChannelManager
+      val brokers = Seq(new Broker(1, "localhost", serverSocket.getLocalPort, listenerName, securityProtocol))
+      val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, zkConnect))
+      val controllerContext = new ControllerContext
+      controllerContext.liveBrokers = brokers.toSet
+      controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM,
+        metrics, new StateChangeLogger(controllerId, inControllerContext = true, None))
+      controllerChannelManager.startup()
+
+      // Initiate a sendRequest and wait until connection is established and one byte is received by the peer
+      val requestBuilder = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
+        controllerId, 1, Map.empty.asJava, brokers.map(_.node(listenerName)).toSet.asJava)
+      controllerChannelManager.sendRequest(1, ApiKeys.LEADER_AND_ISR, requestBuilder)
+      receiveFuture.get(10, TimeUnit.SECONDS)
+
+      // Shutdown controller. Request timeout is 30s, verify that shutdown completed well before that
+      val shutdownFuture = executor.submit(new Runnable {
+        override def run(): Unit = controllerChannelManager.shutdown()
+      })
+      shutdownFuture.get(10, TimeUnit.SECONDS)
+
+    } finally {
+      if (serverSocket != null)
+        serverSocket.close()
+      if (controllerChannelManager != null)
+        controllerChannelManager.shutdown()
+      executor.shutdownNow()
+      metrics.close()
+    }
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
index 1f5bec1..65bdba9 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
@@ -74,5 +74,7 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc
       true)
   }
 
+  override def initiateClose(): Unit = {}
+
   override def close(): Unit = {}
 }
\ No newline at end of file