You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:21 UTC

[24/37] git commit: kafka-1533; transient unit test failure in ProducerFailureHandlingTest; reviewed by Guozhang Wang; reviewed by Jun Rao

kafka-1533; transient unit test failure in ProducerFailureHandlingTest; reviewed by Guozhang Wang; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ff05e9b3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ff05e9b3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ff05e9b3

Branch: refs/heads/transactional_messaging
Commit: ff05e9b3616a222e29a42f6e8fdf41945a417f41
Parents: 014b700
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Tue Jul 22 14:14:19 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jul 22 14:14:19 2014 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java | 13 ++-
 .../clients/producer/internals/Metadata.java    |  7 ++
 .../kafka/api/ProducerFailureHandlingTest.scala | 88 ++++++++------------
 .../kafka/api/ProducerSendTest.scala            | 56 +++++--------
 .../integration/KafkaServerTestHarness.scala    |  2 +
 .../test/scala/unit/kafka/utils/TestUtils.scala |  2 +-
 6 files changed, 78 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ff05e9b3/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index d8f9ce6..eea270a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -77,6 +77,9 @@ public class NetworkClient implements KafkaClient {
     /* true iff there is a metadata request that has been sent and for which we have not yet received a response */
     private boolean metadataFetchInProgress;
 
+    /* the last timestamp when no broker node is available to connect */
+    private long lastNoNodeAvailableMs;
+
     public NetworkClient(Selectable selector,
                          Metadata metadata,
                          String clientId,
@@ -94,6 +97,7 @@ public class NetworkClient implements KafkaClient {
         this.correlation = 0;
         this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE);
         this.metadataFetchInProgress = false;
+        this.lastNoNodeAvailableMs = 0;
     }
 
     /**
@@ -162,7 +166,10 @@ public class NetworkClient implements KafkaClient {
         }
 
         // should we update our metadata?
-        long metadataTimeout = metadata.timeToNextUpdate(now);
+        long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
+        long timeToNextReconnectAttempt = this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now;
+        // if there is no node available to connect, back off refreshing metadata
+        long metadataTimeout = Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt);
         if (!this.metadataFetchInProgress && metadataTimeout == 0)
             maybeUpdateMetadata(sends, now);
 
@@ -354,6 +361,8 @@ public class NetworkClient implements KafkaClient {
         Node node = this.leastLoadedNode(now);
         if (node == null) {
             log.debug("Give up sending metadata request since no node is available");
+            // mark the timestamp for no node available to connect
+            this.lastNoNodeAvailableMs = now;
             return;
         }
 
@@ -367,7 +376,7 @@ public class NetworkClient implements KafkaClient {
             this.inFlightRequests.add(metadataRequest);
         } else if (connectionStates.canConnect(node.id(), now)) {
             // we don't have a connection to this node right now, make one
-            log.debug("Give up sending metadata request to node {} since it is either not connected or cannot have more in flight requests", node.id());
+            log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id());
             initiateConnect(node, now);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff05e9b3/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 4aa5b01..1d30f9e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -139,4 +139,11 @@ public final class Metadata {
     public synchronized long lastUpdate() {
         return this.lastRefreshMs;
     }
+
+    /**
+     * The metadata refresh backoff in ms
+     */
+    public long refreshBackoff() {
+        return refreshBackoffMs;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff05e9b3/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 15fd5bc..789e74c 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -21,26 +21,31 @@ import org.scalatest.junit.JUnit3Suite
 import org.junit.Test
 import org.junit.Assert._
 
-import java.util.{Random, Properties}
+import java.util.Random
 import java.lang.Integer
 import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
 
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{ShutdownableThread, Utils, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils}
+import kafka.integration.KafkaServerTestHarness
 import kafka.consumer.SimpleConsumer
 
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.clients.producer._
 
-class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness {
-  private val brokerId1 = 0
-  private val brokerId2 = 1
-  private val ports = TestUtils.choosePorts(2)
-  private val (port1, port2) = (ports(0), ports(1))
-  private var server1: KafkaServer = null
-  private var server2: KafkaServer = null
-  private var servers = List.empty[KafkaServer]
+class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarness {
+  private val producerBufferSize = 30000
+  private val serverMessageMaxBytes =  producerBufferSize/2
+
+  val numServers = 2
+  val configs =
+    for(props <- TestUtils.createBrokerConfigs(numServers, false))
+    yield new KafkaConfig(props) {
+      override val zkConnect = TestZKUtils.zookeeperConnect
+      override val autoCreateTopicsEnable = false
+      override val messageMaxBytes = serverMessageMaxBytes
+    }
+
 
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
@@ -50,32 +55,19 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
   private var producer3: KafkaProducer = null
   private var producer4: KafkaProducer = null
 
-  private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
-  private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
-  props1.put("auto.create.topics.enable", "false")
-  props2.put("auto.create.topics.enable", "false")
-  private val config1 = new KafkaConfig(props1)
-  private val config2 = new KafkaConfig(props2)
-  private val brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))
-
-  private val bufferSize = 2 * config1.messageMaxBytes
-
   private val topic1 = "topic-1"
   private val topic2 = "topic-2"
 
   override def setUp() {
     super.setUp()
-    server1 = TestUtils.createServer(config1)
-    server2 = TestUtils.createServer(config2)
-    servers = List(server1,server2)
 
     // TODO: we need to migrate to new consumers when 0.9 is final
-    consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "")
-    consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "")
+    consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "")
+    consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "")
 
-    producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = bufferSize);
-    producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = bufferSize)
-    producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = bufferSize)
+    producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize);
+    producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
+    producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
   }
 
   override def tearDown() {
@@ -87,9 +79,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
     if (producer3 != null) producer3.close
     if (producer4 != null) producer4.close
 
-    server1.shutdown; Utils.rm(server1.config.logDirs)
-    server2.shutdown; Utils.rm(server2.config.logDirs)
-
     super.tearDown()
   }
 
@@ -102,7 +91,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
     TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
 
     // send a too-large record
-    val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1))
+    val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
     assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset, -1L)
   }
 
@@ -115,7 +104,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
     TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
 
     // send a too-large record
-    val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1))
+    val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
     intercept[ExecutionException] {
       producer2.send(record).get
     }
@@ -149,7 +138,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
     TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
 
     // producer with incorrect broker list
-    producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = bufferSize)
+    producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
 
     // send a record with incorrect broker list
     val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
@@ -175,8 +164,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
 
     // stop IO threads and request handling, but leave networking operational
     // any requests should be accepted and queue up, but not handled
-    server1.requestHandlerPool.shutdown()
-    server2.requestHandlerPool.shutdown()
+    servers.foreach(server => server.requestHandlerPool.shutdown())
 
     producer1.send(record1).get(5000, TimeUnit.MILLISECONDS)
 
@@ -186,11 +174,11 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
 
     // TODO: expose producer configs after creating them
     // send enough messages to get buffer full
-    val msgSize = 10000
+    val tooManyRecords = 10
+    val msgSize = producerBufferSize / tooManyRecords
     val value = new Array[Byte](msgSize)
     new Random().nextBytes(value)
     val record2 = new ProducerRecord(topic1, null, "key".getBytes, value)
-    val tooManyRecords = bufferSize / ("key".getBytes.length + value.length)
 
     intercept[KafkaException] {
       for (i <- 1 to tooManyRecords)
@@ -269,17 +257,13 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
 
     // rolling bounce brokers
     for (i <- 0 until 2) {
-      server1.shutdown()
-      server1.awaitShutdown()
-      server1.startup
-
-      Thread.sleep(2000)
+      for (server <- servers) {
+        server.shutdown()
+        server.awaitShutdown()
+        server.startup
 
-      server2.shutdown()
-      server2.awaitShutdown()
-      server2.startup
-
-      Thread.sleep(2000)
+        Thread.sleep(2000)
+      }
 
       // Make sure the producer do not see any exception
       // in returned metadata due to broker failures
@@ -298,7 +282,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
     // double check that the leader info has been propagated after consecutive bounces
     val leader = TestUtils.waitUntilMetadataIsPropagated(servers, topic1, partition)
 
-    val fetchResponse = if(leader == server1.config.brokerId) {
+    val fetchResponse = if(leader == configs(0).brokerId) {
       consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
     } else {
       consumer2.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
@@ -317,7 +301,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
     var sent = 0
     var failed = false
 
-    val producer = TestUtils.createNewProducer(brokerList, bufferSize = bufferSize, retries = 10)
+    val producer = TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize, retries = 10)
 
     override def doWork(): Unit = {
       val responses =

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff05e9b3/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 34a7db4..d407af9 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -17,7 +17,6 @@
 
 package kafka.api.test
 
-import java.util.Properties
 import java.lang.{Integer, IllegalArgumentException}
 
 import org.apache.kafka.clients.producer._
@@ -25,53 +24,41 @@ import org.scalatest.junit.JUnit3Suite
 import org.junit.Test
 import org.junit.Assert._
 
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{Utils, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestZKUtils, TestUtils}
 import kafka.consumer.SimpleConsumer
 import kafka.api.FetchRequestBuilder
 import kafka.message.Message
+import kafka.integration.KafkaServerTestHarness
 
 
-class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
-  private val brokerId1 = 0
-  private val brokerId2 = 1
-  private val ports = TestUtils.choosePorts(2)
-  private val (port1, port2) = (ports(0), ports(1))
-  private var server1: KafkaServer = null
-  private var server2: KafkaServer = null
-  private var servers = List.empty[KafkaServer]
+class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
+  val numServers = 2
+  val configs =
+    for(props <- TestUtils.createBrokerConfigs(numServers, false))
+    yield new KafkaConfig(props) {
+      override val zkConnect = TestZKUtils.zookeeperConnect
+      override val numPartitions = 4
+    }
 
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
 
-  private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
-  private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
-  props1.put("num.partitions", "4")
-  props2.put("num.partitions", "4")
-  private val config1 = new KafkaConfig(props1)
-  private val config2 = new KafkaConfig(props2)
-
   private val topic = "topic"
   private val numRecords = 100
 
   override def setUp() {
     super.setUp()
-    // set up 2 brokers with 4 partitions each
-    server1 = TestUtils.createServer(config1)
-    server2 = TestUtils.createServer(config2)
-    servers = List(server1,server2)
 
     // TODO: we need to migrate to new consumers when 0.9 is final
-    consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "")
-    consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "")
+    consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "")
+    consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "")
   }
 
   override def tearDown() {
-    server1.shutdown
-    server2.shutdown
-    Utils.rm(server1.config.logDirs)
-    Utils.rm(server2.config.logDirs)
+    consumer1.close()
+    consumer2.close()
+
     super.tearDown()
   }
 
@@ -90,7 +77,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
    */
   @Test
   def testSendOffset() {
-    var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+    var producer = TestUtils.createNewProducer(brokerList)
 
     val callback = new CheckErrorCallback
 
@@ -146,7 +133,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
    */
   @Test
   def testClose() {
-    var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+    var producer = TestUtils.createNewProducer(brokerList)
 
     try {
       // create topic
@@ -182,7 +169,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
    */
   @Test
   def testSendToPartition() {
-    var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+    var producer = TestUtils.createNewProducer(brokerList)
 
     try {
       // create topic
@@ -209,7 +196,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
       }
 
       // make sure the fetched messages also respect the partitioning and ordering
-      val fetchResponse1 = if(leader1.get == server1.config.brokerId) {
+      val fetchResponse1 = if(leader1.get == configs(0).brokerId) {
         consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
       } else {
         consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
@@ -237,8 +224,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
    */
   @Test
   def testAutoCreateTopic() {
-    var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
-                                            retries = 5)
+    var producer = TestUtils.createNewProducer(brokerList, retries = 5)
 
     try {
       // Send a message to auto-create the topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff05e9b3/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 194dd70..3cf7c9b 100644
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -30,11 +30,13 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
 
   val configs: List[KafkaConfig]
   var servers: List[KafkaServer] = null
+  var brokerList: String = null
 
   override def setUp() {
     super.setUp
     if(configs.size <= 0)
       throw new KafkaException("Must suply at least one server config.")
+    brokerList = TestUtils.getBrokerListStrFromConfigs(configs)
     servers = configs.map(TestUtils.createServer(_))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff05e9b3/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 3faa884..4d01d25 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -385,7 +385,7 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString)
     producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
-    producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000")
+    producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
     producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
     return new KafkaProducer(producerProps)
   }