You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/10/08 21:08:44 UTC

[kafka] branch 2.1 updated: MINOR: Fix LogDirFailureTest flake

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 3630512  MINOR: Fix LogDirFailureTest flake
3630512 is described below

commit 3630512417242855718ae35a3f1d0a9b2162dbea
Author: Gardner Vickers <ga...@vickers.me>
AuthorDate: Mon Oct 8 14:07:56 2018 -0700

    MINOR: Fix LogDirFailureTest flake
    
    Ensure that `TestUtils.waitUntilTrue(..)` is blocked on both send completed and a new leader being assigned
    
    Author: Gardner Vickers <ga...@vickers.me>
    
    Reviewers: Dhruvil Shah <dh...@confluent.io>, Dong Lin <li...@gmail.com>
    
    Closes #5695 from gardnervickers/log-dir-failure-test-fix
    
    (cherry picked from commit 6165b43744bf9df9d4bda577f11db75b704a6ac5)
    Signed-off-by: Dong Lin <li...@gmail.com>
---
 core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index aaf6477..4709282 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -178,13 +178,15 @@ class LogDirFailureTest extends IntegrationTestHarness {
         }
     }
 
-    // Wait for producer to update metadata for the partition
     TestUtils.waitUntilTrue(() => {
       // ProduceResponse may contain KafkaStorageException and trigger metadata update
       producer.send(record)
       producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id() != leaderServerId
     }, "Expected new leader for the partition", 6000L)
 
+    // Block on send to ensure that new leader accepts a message.
+    producer.send(record).get(6000L, TimeUnit.MILLISECONDS)
+
     // Consumer should receive some messages
     TestUtils.waitUntilTrue(() => {
       consumer.poll(0).count() > 0