You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/07/22 23:14:20 UTC
git commit: kafka-1533;
transient unit test failure in ProducerFailureHandlingTest;
reviewed by Guozhang Wang; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 014b700f0 -> ff05e9b36
kafka-1533; transient unit test failure in ProducerFailureHandlingTest; reviewed by Guozhang Wang; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ff05e9b3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ff05e9b3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ff05e9b3
Branch: refs/heads/trunk
Commit: ff05e9b3616a222e29a42f6e8fdf41945a417f41
Parents: 014b700
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Tue Jul 22 14:14:19 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jul 22 14:14:19 2014 -0700
----------------------------------------------------------------------
.../org/apache/kafka/clients/NetworkClient.java | 13 ++-
.../clients/producer/internals/Metadata.java | 7 ++
.../kafka/api/ProducerFailureHandlingTest.scala | 88 ++++++++------------
.../kafka/api/ProducerSendTest.scala | 56 +++++--------
.../integration/KafkaServerTestHarness.scala | 2 +
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
6 files changed, 78 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ff05e9b3/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index d8f9ce6..eea270a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -77,6 +77,9 @@ public class NetworkClient implements KafkaClient {
/* true iff there is a metadata request that has been sent and for which we have not yet received a response */
private boolean metadataFetchInProgress;
+ /* the last timestamp when no broker node is available to connect */
+ private long lastNoNodeAvailableMs;
+
public NetworkClient(Selectable selector,
Metadata metadata,
String clientId,
@@ -94,6 +97,7 @@ public class NetworkClient implements KafkaClient {
this.correlation = 0;
this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE);
this.metadataFetchInProgress = false;
+ this.lastNoNodeAvailableMs = 0;
}
/**
@@ -162,7 +166,10 @@ public class NetworkClient implements KafkaClient {
}
// should we update our metadata?
- long metadataTimeout = metadata.timeToNextUpdate(now);
+ long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
+ long timeToNextReconnectAttempt = this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now;
+ // if there is no node available to connect, back off refreshing metadata
+ long metadataTimeout = Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt);
if (!this.metadataFetchInProgress && metadataTimeout == 0)
maybeUpdateMetadata(sends, now);
@@ -354,6 +361,8 @@ public class NetworkClient implements KafkaClient {
Node node = this.leastLoadedNode(now);
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
+ // mark the timestamp for no node available to connect
+ this.lastNoNodeAvailableMs = now;
return;
}
@@ -367,7 +376,7 @@ public class NetworkClient implements KafkaClient {
this.inFlightRequests.add(metadataRequest);
} else if (connectionStates.canConnect(node.id(), now)) {
// we don't have a connection to this node right now, make one
- log.debug("Give up sending metadata request to node {} since it is either not connected or cannot have more in flight requests", node.id());
+ log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id());
initiateConnect(node, now);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ff05e9b3/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 4aa5b01..1d30f9e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -139,4 +139,11 @@ public final class Metadata {
public synchronized long lastUpdate() {
return this.lastRefreshMs;
}
+
+ /**
+ * The metadata refresh backoff in ms
+ */
+ public long refreshBackoff() {
+ return refreshBackoffMs;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ff05e9b3/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 15fd5bc..789e74c 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -21,26 +21,31 @@ import org.scalatest.junit.JUnit3Suite
import org.junit.Test
import org.junit.Assert._
-import java.util.{Random, Properties}
+import java.util.Random
import java.lang.Integer
import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{ShutdownableThread, Utils, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils}
+import kafka.integration.KafkaServerTestHarness
import kafka.consumer.SimpleConsumer
import org.apache.kafka.common.KafkaException
import org.apache.kafka.clients.producer._
-class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness {
- private val brokerId1 = 0
- private val brokerId2 = 1
- private val ports = TestUtils.choosePorts(2)
- private val (port1, port2) = (ports(0), ports(1))
- private var server1: KafkaServer = null
- private var server2: KafkaServer = null
- private var servers = List.empty[KafkaServer]
+class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarness {
+ private val producerBufferSize = 30000
+ private val serverMessageMaxBytes = producerBufferSize/2
+
+ val numServers = 2
+ val configs =
+ for(props <- TestUtils.createBrokerConfigs(numServers, false))
+ yield new KafkaConfig(props) {
+ override val zkConnect = TestZKUtils.zookeeperConnect
+ override val autoCreateTopicsEnable = false
+ override val messageMaxBytes = serverMessageMaxBytes
+ }
+
private var consumer1: SimpleConsumer = null
private var consumer2: SimpleConsumer = null
@@ -50,32 +55,19 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
private var producer3: KafkaProducer = null
private var producer4: KafkaProducer = null
- private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
- private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
- props1.put("auto.create.topics.enable", "false")
- props2.put("auto.create.topics.enable", "false")
- private val config1 = new KafkaConfig(props1)
- private val config2 = new KafkaConfig(props2)
- private val brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))
-
- private val bufferSize = 2 * config1.messageMaxBytes
-
private val topic1 = "topic-1"
private val topic2 = "topic-2"
override def setUp() {
super.setUp()
- server1 = TestUtils.createServer(config1)
- server2 = TestUtils.createServer(config2)
- servers = List(server1,server2)
// TODO: we need to migrate to new consumers when 0.9 is final
- consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "")
- consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "")
+ consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "")
+ consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "")
- producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = bufferSize);
- producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = bufferSize)
- producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = bufferSize)
+ producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize);
+ producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
+ producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
}
override def tearDown() {
@@ -87,9 +79,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
if (producer3 != null) producer3.close
if (producer4 != null) producer4.close
- server1.shutdown; Utils.rm(server1.config.logDirs)
- server2.shutdown; Utils.rm(server2.config.logDirs)
-
super.tearDown()
}
@@ -102,7 +91,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
// send a too-large record
- val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1))
+ val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset, -1L)
}
@@ -115,7 +104,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
// send a too-large record
- val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1))
+ val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
intercept[ExecutionException] {
producer2.send(record).get
}
@@ -149,7 +138,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
// producer with incorrect broker list
- producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = bufferSize)
+ producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
// send a record with incorrect broker list
val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
@@ -175,8 +164,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
// stop IO threads and request handling, but leave networking operational
// any requests should be accepted and queue up, but not handled
- server1.requestHandlerPool.shutdown()
- server2.requestHandlerPool.shutdown()
+ servers.foreach(server => server.requestHandlerPool.shutdown())
producer1.send(record1).get(5000, TimeUnit.MILLISECONDS)
@@ -186,11 +174,11 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
// TODO: expose producer configs after creating them
// send enough messages to get buffer full
- val msgSize = 10000
+ val tooManyRecords = 10
+ val msgSize = producerBufferSize / tooManyRecords
val value = new Array[Byte](msgSize)
new Random().nextBytes(value)
val record2 = new ProducerRecord(topic1, null, "key".getBytes, value)
- val tooManyRecords = bufferSize / ("key".getBytes.length + value.length)
intercept[KafkaException] {
for (i <- 1 to tooManyRecords)
@@ -269,17 +257,13 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
// rolling bounce brokers
for (i <- 0 until 2) {
- server1.shutdown()
- server1.awaitShutdown()
- server1.startup
-
- Thread.sleep(2000)
+ for (server <- servers) {
+ server.shutdown()
+ server.awaitShutdown()
+ server.startup
- server2.shutdown()
- server2.awaitShutdown()
- server2.startup
-
- Thread.sleep(2000)
+ Thread.sleep(2000)
+ }
// Make sure the producer do not see any exception
// in returned metadata due to broker failures
@@ -298,7 +282,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
// double check that the leader info has been propagated after consecutive bounces
val leader = TestUtils.waitUntilMetadataIsPropagated(servers, topic1, partition)
- val fetchResponse = if(leader == server1.config.brokerId) {
+ val fetchResponse = if(leader == configs(0).brokerId) {
consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
} else {
consumer2.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
@@ -317,7 +301,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
var sent = 0
var failed = false
- val producer = TestUtils.createNewProducer(brokerList, bufferSize = bufferSize, retries = 10)
+ val producer = TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize, retries = 10)
override def doWork(): Unit = {
val responses =
http://git-wip-us.apache.org/repos/asf/kafka/blob/ff05e9b3/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 34a7db4..d407af9 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -17,7 +17,6 @@
package kafka.api.test
-import java.util.Properties
import java.lang.{Integer, IllegalArgumentException}
import org.apache.kafka.clients.producer._
@@ -25,53 +24,41 @@ import org.scalatest.junit.JUnit3Suite
import org.junit.Test
import org.junit.Assert._
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{Utils, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestZKUtils, TestUtils}
import kafka.consumer.SimpleConsumer
import kafka.api.FetchRequestBuilder
import kafka.message.Message
+import kafka.integration.KafkaServerTestHarness
-class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
- private val brokerId1 = 0
- private val brokerId2 = 1
- private val ports = TestUtils.choosePorts(2)
- private val (port1, port2) = (ports(0), ports(1))
- private var server1: KafkaServer = null
- private var server2: KafkaServer = null
- private var servers = List.empty[KafkaServer]
+class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
+ val numServers = 2
+ val configs =
+ for(props <- TestUtils.createBrokerConfigs(numServers, false))
+ yield new KafkaConfig(props) {
+ override val zkConnect = TestZKUtils.zookeeperConnect
+ override val numPartitions = 4
+ }
private var consumer1: SimpleConsumer = null
private var consumer2: SimpleConsumer = null
- private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
- private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
- props1.put("num.partitions", "4")
- props2.put("num.partitions", "4")
- private val config1 = new KafkaConfig(props1)
- private val config2 = new KafkaConfig(props2)
-
private val topic = "topic"
private val numRecords = 100
override def setUp() {
super.setUp()
- // set up 2 brokers with 4 partitions each
- server1 = TestUtils.createServer(config1)
- server2 = TestUtils.createServer(config2)
- servers = List(server1,server2)
// TODO: we need to migrate to new consumers when 0.9 is final
- consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "")
- consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "")
+ consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "")
+ consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "")
}
override def tearDown() {
- server1.shutdown
- server2.shutdown
- Utils.rm(server1.config.logDirs)
- Utils.rm(server2.config.logDirs)
+ consumer1.close()
+ consumer2.close()
+
super.tearDown()
}
@@ -90,7 +77,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
*/
@Test
def testSendOffset() {
- var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+ var producer = TestUtils.createNewProducer(brokerList)
val callback = new CheckErrorCallback
@@ -146,7 +133,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
*/
@Test
def testClose() {
- var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+ var producer = TestUtils.createNewProducer(brokerList)
try {
// create topic
@@ -182,7 +169,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
*/
@Test
def testSendToPartition() {
- var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+ var producer = TestUtils.createNewProducer(brokerList)
try {
// create topic
@@ -209,7 +196,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
}
// make sure the fetched messages also respect the partitioning and ordering
- val fetchResponse1 = if(leader1.get == server1.config.brokerId) {
+ val fetchResponse1 = if(leader1.get == configs(0).brokerId) {
consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
} else {
consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
@@ -237,8 +224,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
*/
@Test
def testAutoCreateTopic() {
- var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
- retries = 5)
+ var producer = TestUtils.createNewProducer(brokerList, retries = 5)
try {
// Send a message to auto-create the topic
http://git-wip-us.apache.org/repos/asf/kafka/blob/ff05e9b3/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 194dd70..3cf7c9b 100644
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -30,11 +30,13 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
val configs: List[KafkaConfig]
var servers: List[KafkaServer] = null
+ var brokerList: String = null
override def setUp() {
super.setUp
if(configs.size <= 0)
throw new KafkaException("Must suply at least one server config.")
+ brokerList = TestUtils.getBrokerListStrFromConfigs(configs)
servers = configs.map(TestUtils.createServer(_))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ff05e9b3/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 3faa884..4d01d25 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -385,7 +385,7 @@ object TestUtils extends Logging {
producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString)
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
- producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000")
+ producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
return new KafkaProducer(producerProps)
}