You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2013/03/09 00:09:39 UTC
[1/2] KAFKA-554 Dynamic per-topic configuration. This patch adds a
mechanism for storing per-topic configurations in zookeeper and dynamically
making config changes across the cluster. Reviewed by Neha and Jun.
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 8e027b2..b73e5d4 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -19,6 +19,9 @@ package kafka.admin
import junit.framework.Assert._
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
+import java.util.Properties
+import kafka.utils._
+import kafka.log._
import kafka.zk.ZooKeeperTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{Logging, ZkUtils, TestUtils}
@@ -32,28 +35,17 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val brokerList = List(0, 1, 2, 3, 4)
// test 0 replication factor
- try {
+ intercept[AdminOperationException] {
AdminUtils.assignReplicasToBrokers(brokerList, 10, 0)
- fail("shouldn't allow replication factor 0")
- }
- catch {
- case e: AdministrationException => // this is good
- case e2 => throw e2
}
// test wrong replication factor
- try {
+ intercept[AdminOperationException] {
AdminUtils.assignReplicasToBrokers(brokerList, 10, 6)
- fail("shouldn't allow replication factor larger than # of brokers")
- }
- catch {
- case e: AdministrationException => // this is good
- case e2 => throw e2
}
// correct assignment
- {
- val expectedAssignment = Map(
+ val expectedAssignment = Map(
0 -> List(0, 1, 2),
1 -> List(1, 2, 3),
2 -> List(2, 3, 4),
@@ -63,65 +55,34 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
6 -> List(1, 3, 4),
7 -> List(2, 4, 0),
8 -> List(3, 0, 1),
- 9 -> List(4, 1, 2)
- )
+ 9 -> List(4, 1, 2))
- val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
- val e = (expectedAssignment.toList == actualAssignment.toList)
- assertTrue(expectedAssignment.toList == actualAssignment.toList)
- }
+ val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
+ val e = (expectedAssignment.toList == actualAssignment.toList)
+ assertTrue(expectedAssignment.toList == actualAssignment.toList)
}
@Test
def testManualReplicaAssignment() {
- val brokerList = Set(0, 1, 2, 3, 4)
-
- // duplicated brokers
- try {
- val replicationAssignmentStr = "0,0,1:1,2,3"
- CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
- fail("replication assginment shouldn't have duplicated brokers")
- }
- catch {
- case e: AdministrationException => // this is good
- case e2 => throw e2
- }
+ val brokers = List(0, 1, 2, 3, 4)
+ TestUtils.createBrokersInZk(zkClient, brokers)
- // non-exist brokers
- try {
- val replicationAssignmentStr = "0,1,2:1,2,7"
- CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
- fail("replication assginment shouldn't contain non-exist brokers")
- }
- catch {
- case e: AdministrationException => // this is good
- case e2 => throw e2
+ // duplicate brokers
+ intercept[IllegalArgumentException] {
+ AdminUtils.createTopicWithAssignment(zkClient, "test", Map(0->Seq(0,0)))
}
// inconsistent replication factor
- try {
- val replicationAssignmentStr = "0,1,2:1,2"
- CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
- fail("all partitions should have the same replication factor")
- }
- catch {
- case e: AdministrationException => // this is good
- case e2 => throw e2
+ intercept[IllegalArgumentException] {
+ AdminUtils.createTopicWithAssignment(zkClient, "test", Map(0->Seq(0,1), 1->Seq(0)))
}
// good assignment
- {
- val replicationAssignmentStr = "0:1:2,1:2:3"
- val expectedReplicationAssignment = Map(
- 0 -> List(0, 1, 2),
- 1 -> List(1, 2, 3)
- )
- val actualReplicationAssignment = CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
- assertEquals(expectedReplicationAssignment.size, actualReplicationAssignment.size)
- for( (part, replicas) <- expectedReplicationAssignment ) {
- assertEquals(replicas, actualReplicationAssignment(part))
- }
- }
+ val assignment = Map(0 -> List(0, 1, 2),
+ 1 -> List(1, 2, 3))
+ AdminUtils.createTopicWithAssignment(zkClient, "test", assignment)
+ val found = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq("test"))
+ assertEquals(assignment, found("test"))
}
@Test
@@ -157,7 +118,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val topic = "test"
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// create leaders for all partitions
TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p => p.replicas)
@@ -166,12 +127,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
for(i <- 0 until actualReplicaList.size)
assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
- try {
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
- fail("shouldn't be able to create a topic already exists")
- } catch {
- case e: TopicExistsException => // this is good
- case e2 => throw e2
+ intercept[TopicExistsException] {
+ // shouldn't be able to create a topic that already exists
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
}
}
@@ -179,15 +137,13 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
def testGetTopicMetadata() {
val expectedReplicaAssignment = Map(
0 -> List(0, 1, 2),
- 1 -> List(1, 2, 3)
- )
+ 1 -> List(1, 2, 3))
val leaderForPartitionMap = Map(
0 -> 0,
- 1 -> 1
- )
+ 1 -> 1)
val topic = "auto-topic"
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// create leaders for all partitions
TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
@@ -215,7 +171,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// reassign partition 0
val newReplicas = Seq(0, 2, 3)
val partitionToBeReassigned = 0
@@ -240,7 +196,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// reassign partition 0
val newReplicas = Seq(1, 2, 3)
val partitionToBeReassigned = 0
@@ -266,7 +222,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// reassign partition 0
val newReplicas = Seq(2, 3)
val partitionToBeReassigned = 0
@@ -307,7 +263,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val expectedReplicaAssignment = Map(0 -> List(0, 1))
val topic = "test"
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
// put the partition in the reassigned path as well
// reassign partition 0
val newReplicas = Seq(0, 1)
@@ -346,7 +302,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
// broker 2 should be the leader since it was started first
val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get
@@ -367,7 +323,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// create brokers
val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
// create the topic
- AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment)
val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
// broker 2 should be the leader since it was started first
@@ -404,6 +360,50 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
servers.foreach(_.shutdown())
}
}
+
+ /**
+ * This test creates a topic with a few config overrides and checks that the configs are applied to the new topic
+ * then changes the config and checks that the new values take effect.
+ */
+ @Test
+ def testTopicConfigChange() {
+ val partitions = 3
+ val topic = "my-topic"
+ val server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0)))
+
+ def makeConfig(messageSize: Int, retentionMs: Long) = {
+ var props = new Properties()
+ props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString)
+ props.setProperty(LogConfig.RententionMsProp, retentionMs.toString)
+ props
+ }
+
+ def checkConfig(messageSize: Int, retentionMs: Long) {
+ TestUtils.retry(10000) {
+ for(part <- 0 until partitions) {
+ val logOpt = server.logManager.getLog(TopicAndPartition(topic, part))
+ assertTrue(logOpt.isDefined)
+ assertEquals(retentionMs, logOpt.get.config.retentionMs)
+ assertEquals(messageSize, logOpt.get.config.maxMessageSize)
+ }
+ }
+ }
+
+ try {
+ // create a topic with a few config overrides and check that they are applied
+ val maxMessageSize = 1024
+ val retentionMs = 1000*1000
+ AdminUtils.createTopic(server.zkClient, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs))
+ checkConfig(maxMessageSize, retentionMs)
+
+ // now double the config values for the topic and check that it is applied
+ AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs))
+ checkConfig(2*maxMessageSize, 2 * retentionMs)
+ } finally {
+ server.shutdown()
+ server.config.logDirs.map(Utils.rm(_))
+ }
+ }
private def checkIfReassignPartitionPathExists(): Boolean = {
ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 8ae30ea..fec17aa 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -20,6 +20,7 @@ package kafka.consumer
import java.util.concurrent._
import java.util.concurrent.atomic._
+import java.util.Properties
import scala.collection._
import junit.framework.Assert._
@@ -27,7 +28,7 @@ import kafka.message._
import kafka.server._
import kafka.utils.TestUtils._
import kafka.utils._
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
import org.junit.Test
import kafka.serializer._
import kafka.cluster.{Broker, Cluster}
@@ -60,7 +61,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
override def setUp() {
super.setUp
- CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)), new Properties)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index f7ee914..c70a435 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -26,7 +26,7 @@ import org.scalatest.junit.JUnit3Suite
import org.apache.log4j.{Level, Logger}
import kafka.message._
import kafka.serializer._
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
import org.I0Itec.zkclient.ZkClient
import kafka.utils._
import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
@@ -298,7 +298,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer)
// create topic topic1 with 1 partition on broker 0
- CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+ AdminUtils.createTopic(zkClient, topic, 1, 1)
// send some messages to each broker
val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 4c646f0..c046a39 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -32,8 +32,7 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
val topic = "test_topic"
val group = "default_group"
val testConsumer = "consumer"
- val BrokerPort = 9892
- val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, BrokerPort)))
+ val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0)))
val NumMessages = 10
val LargeOffset = 10000
val SmallOffset = -1
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 5a57bd1..845b966 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -30,7 +30,7 @@ import kafka.serializer._
import kafka.producer.{KeyedMessage, Producer}
import kafka.utils.TestUtils._
import kafka.utils.TestUtils
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
@@ -55,7 +55,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
override def setUp() {
super.setUp
- CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
+ AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)))
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
fetcher.stopAllConnections()
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 2fc08d3..1c6a01b 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -27,7 +27,7 @@ import org.I0Itec.zkclient.ZkClient
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import scala.collection._
-import kafka.admin.{AdminUtils, CreateTopicCommand}
+import kafka.admin.AdminUtils
import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
import kafka.utils.{TestUtils, Utils}
@@ -42,19 +42,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val configs = List(config)
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
- override def setUp() {
- super.setUp
- // temporarily set request handler logger to a higher level
- requestHandlerLogger.setLevel(Level.FATAL)
- }
-
- override def tearDown() {
- // restore set request handler logger to a higher level
- requestHandlerLogger.setLevel(Level.ERROR)
-
- super.tearDown
- }
-
def testFetchRequestCanProperlySerialize() {
val request = new FetchRequestBuilder()
.clientId("test-client")
@@ -299,7 +286,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
def testConsumerEmptyTopic() {
val newTopic = "new-topic"
- CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString)
+ AdminUtils.createTopic(zkClient, newTopic, 1, 1)
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.fetchTopicMetadataFromZk(newTopic, zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0, 500)
@@ -327,10 +314,10 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
}
// wait until the messages are published
- TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test1", 0).get.logEndOffset == 2 }, 1000)
- TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test2", 0).get.logEndOffset == 2 }, 1000)
- TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test3", 0).get.logEndOffset == 2 }, 1000)
- TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test4", 0).get.logEndOffset == 2 }, 1000)
+ TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test1", 0)).get.logEndOffset == 2 }, 1000)
+ TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test2", 0)).get.logEndOffset == 2 }, 1000)
+ TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test3", 0)).get.logEndOffset == 2 }, 1000)
+ TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(TopicAndPartition("test4", 0)).get.logEndOffset == 2 }, 1000)
val replicaId = servers.head.config.brokerId
val hwWaitMs = config.replicaHighWatermarkCheckpointIntervalMs
@@ -354,7 +341,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
*/
def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) {
for( topic <- topics ) {
- CreateTopicCommand.createTopic(zkClient, topic, 1, 1, brokerId.toString)
+ AdminUtils.createTopic(zkClient, topic, 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 5169aea..be94254 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -19,7 +19,7 @@ package kafka.integration
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
import java.nio.ByteBuffer
import junit.framework.Assert._
import org.easymock.EasyMock
@@ -48,7 +48,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
def testTopicMetadataRequest {
// create topic
val topic = "test"
- CreateTopicCommand.createTopic(zkClient, topic, 1)
+ AdminUtils.createTopic(zkClient, topic, 1, 1)
// create a topic metadata request
val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
@@ -64,7 +64,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
def testBasicTopicMetadata {
// create topic
val topic = "test"
- CreateTopicCommand.createTopic(zkClient, topic, 1)
+ AdminUtils.createTopic(zkClient, topic, 1, 1)
// set up leader for topic partition 0
val leaderForPartitionMap = Map(
0 -> configs.head.brokerId
@@ -83,7 +83,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
def testGetAllTopicMetadata {
// create topic
val topic = "test"
- CreateTopicCommand.createTopic(zkClient, topic, 1)
+ AdminUtils.createTopic(zkClient, topic, 1, 1)
// set up leader for topic partition 0
val leaderForPartitionMap = Map(
0 -> configs.head.brokerId
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index fad3baa..6916df4 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -58,7 +58,7 @@ class LogManagerTest extends JUnit3Suite {
*/
@Test
def testCreateLog() {
- val log = logManager.getOrCreateLog(name, 0)
+ val log = logManager.createLog(TopicAndPartition(name, 0), logConfig)
val logFile = new File(logDir, name + "-0")
assertTrue(logFile.exists)
log.append(TestUtils.singleMessageSet("test".getBytes()))
@@ -69,7 +69,7 @@ class LogManagerTest extends JUnit3Suite {
*/
@Test
def testGetNonExistentLog() {
- val log = logManager.getLog(name, 0)
+ val log = logManager.getLog(TopicAndPartition(name, 0))
assertEquals("No log should be found.", None, log)
val logFile = new File(logDir, name + "-0")
assertTrue(!logFile.exists)
@@ -80,7 +80,7 @@ class LogManagerTest extends JUnit3Suite {
*/
@Test
def testCleanupExpiredSegments() {
- val log = logManager.getOrCreateLog(name, 0)
+ val log = logManager.createLog(TopicAndPartition(name, 0), logConfig)
var offset = 0L
for(i <- 0 until 200) {
var set = TestUtils.singleMessageSet("test".getBytes())
@@ -120,7 +120,7 @@ class LogManagerTest extends JUnit3Suite {
logManager.startup
// create a log
- val log = logManager.getOrCreateLog(name, 0)
+ val log = logManager.createLog(TopicAndPartition(name, 0), config)
var offset = 0L
// add a bunch of messages that should be larger than the retentionSize
@@ -158,7 +158,7 @@ class LogManagerTest extends JUnit3Suite {
val config = logConfig.copy(flushMs = 1000)
logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 1000L, time.scheduler, time)
logManager.startup
- val log = logManager.getOrCreateLog(name, 0)
+ val log = logManager.createLog(TopicAndPartition(name, 0), config)
val lastFlush = log.lastFlushTime
for(i <- 0 until 200) {
var set = TestUtils.singleMessageSet("test".getBytes())
@@ -182,7 +182,7 @@ class LogManagerTest extends JUnit3Suite {
// verify that logs are always assigned to the least loaded partition
for(partition <- 0 until 20) {
- logManager.getOrCreateLog("test", partition)
+ logManager.createLog(TopicAndPartition("test", partition), logConfig)
assertEquals("We should have created the right number of logs", partition + 1, logManager.allLogs.size)
val counts = logManager.allLogs.groupBy(_.dir.getParent).values.map(_.size)
assertTrue("Load should balance evenly", counts.max <= counts.min + 1)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 04acef5..e4b057e 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -27,7 +27,7 @@ import org.junit.Assert._
import org.junit.Test
import kafka.utils._
import java.util
-import kafka.admin.{AdminUtils, CreateTopicCommand}
+import kafka.admin.AdminUtils
import util.Properties
import kafka.api.FetchRequestBuilder
import kafka.common.{KafkaException, ErrorMapping, FailedToSendMessageException}
@@ -77,17 +77,15 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
// restore set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.ERROR)
server1.shutdown
- server1.awaitShutdown()
server2.shutdown
- server2.awaitShutdown()
Utils.rm(server1.config.logDirs)
Utils.rm(server2.config.logDirs)
super.tearDown()
}
-
+ @Test
def testUpdateBrokerPartitionInfo() {
- CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
+ AdminUtils.createTopic(zkClient, "new-topic", 1, 2)
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
@@ -152,7 +150,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
val producerConfig2 = new ProducerConfig(props2)
// create topic with 1 partition and await leadership
- CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
+ AdminUtils.createTopic(zkClient, "new-topic", 1, 2)
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
@@ -203,7 +201,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
// create topic
- CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
+ AdminUtils.createTopicWithAssignment(zkClient, "new-topic", Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0)))
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
@@ -213,13 +211,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
- try {
- // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only
- // on broker 0
- producer.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
- } catch {
- case e => fail("Unexpected exception: " + e)
- }
+ // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only on broker 0
+ producer.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
// kill the broker
server1.shutdown
@@ -264,7 +257,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
val producer = new Producer[String, String](config)
// create topics in ZK
- CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
+ AdminUtils.createTopicWithAssignment(zkClient, "new-topic", Map(0->Seq(0,1)))
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index b5ee31d..bbf0406 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -19,7 +19,7 @@ package kafka.producer
import java.net.SocketTimeoutException
import junit.framework.Assert
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
import kafka.integration.KafkaServerTestHarness
import kafka.message._
import kafka.server.KafkaConfig
@@ -92,7 +92,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
val producer = new SyncProducer(new SyncProducerConfig(props))
- CreateTopicCommand.createTopic(zkClient, "test", 1, 1)
+ AdminUtils.createTopic(zkClient, "test", 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500)
val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))
@@ -135,9 +135,9 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
}
// #2 - test that we get correct offsets when partition is owned by broker
- CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
+ AdminUtils.createTopic(zkClient, "topic1", 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic1", 0, 500)
- CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
+ AdminUtils.createTopic(zkClient, "topic3", 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic3", 0, 500)
val response2 = producer.send(request)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 8a3e33b..9963502 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -62,7 +62,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
assertEquals(0L, fooPartition0Hw)
val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1)
// create leader and follower replicas
- val log0 = logManagers(0).getOrCreateLog(topic, 0)
+ val log0 = logManagers(0).createLog(TopicAndPartition(topic, 0), LogConfig())
val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0))
partition0.addReplicaIfNotExists(leaderReplicaPartition0)
val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime)
@@ -101,7 +101,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
assertEquals(0L, topic1Partition0Hw)
val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1)
// create leader log
- val topic1Log0 = logManagers(0).getOrCreateLog(topic1, 0)
+ val topic1Log0 = logManagers(0).createLog(TopicAndPartition(topic1, 0), LogConfig())
// create a local replica for topic1
val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0))
topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
@@ -117,7 +117,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
// add another partition and set highwatermark
val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1)
// create leader log
- val topic2Log0 = logManagers(0).getOrCreateLog(topic2, 0)
+ val topic2Log0 = logManagers(0).createLog(TopicAndPartition(topic2, 0), LogConfig())
// create a local replica for topic2
val leaderReplicaTopic2Partition0 = new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0))
topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 129bc56..176718e 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -19,7 +19,7 @@ package kafka.server
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
import kafka.utils.TestUtils._
import junit.framework.Assert._
import kafka.utils.{ZkUtils, Utils, TestUtils}
@@ -61,7 +61,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val partitionId = 0
// create topic with 1 partition, 2 replicas, one on each broker
- CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+ AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(0, 1)))
// wait until leader is elected
val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@ -108,7 +108,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val partitionId = 0
// create topic with 1 partition, 2 replicas, one on each broker
- CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+ AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(0, 1)))
// wait until leader is elected
val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index f857171..6801f4e 100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -26,7 +26,7 @@ import org.junit.{After, Before, Test}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
import kafka.utils.TestUtils._
import kafka.common.{ErrorMapping, TopicAndPartition}
@@ -82,10 +82,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in zookeeper as owners of partitions for this test
- CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
+ AdminUtils.createTopic(zkClient, topic, 1, 1)
val logManager = server.getLogManager
- val log = logManager.getOrCreateLog(topic, part)
+ val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
val message = new Message(Integer.toString(42).getBytes())
for(i <- 0 until 20)
@@ -120,7 +120,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
val topic = topicPartition.split("-").head
// setup brokers in zookeeper as owners of partitions for this test
- CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
+ AdminUtils.createTopic(zkClient, topic, 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
var offsetChanged = false
@@ -145,10 +145,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in zookeeper as owners of partitions for this test
- CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
+ AdminUtils.createTopic(zkClient, topic, 3, 1)
val logManager = server.getLogManager
- val log = logManager.getOrCreateLog(topic, part)
+ val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
val message = new Message(Integer.toString(42).getBytes())
for(i <- 0 until 20)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@@ -174,10 +174,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in zookeeper as owners of partitions for this test
- CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
+ AdminUtils.createTopic(zkClient, topic, 3, 1)
val logManager = server.getLogManager
- val log = logManager.getOrCreateLog(topic, part)
+ val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
val message = new Message(Integer.toString(42).getBytes())
for(i <- 0 until 20)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 7430485..d2650e3 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -19,7 +19,7 @@ package kafka.server
import org.scalatest.junit.JUnit3Suite
import org.junit.Assert._
import java.io.File
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
import kafka.utils.TestUtils._
import kafka.utils.IntEncoder
import kafka.utils.{Utils, TestUtils}
@@ -53,7 +53,14 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs))
producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString)
producerProps.put("request.required.acks", "-1")
-
+
+ override def tearDown() {
+ super.tearDown()
+ for(server <- servers) {
+ server.shutdown()
+ Utils.rm(server.config.logDirs(0))
+ }
+ }
def testHWCheckpointNoFailuresSingleLogSegment {
// start both servers
@@ -64,7 +71,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
producer = new Producer[Int, String](new ProducerConfig(producerProps))
// create topic with 1 partition, 2 replicas, one on each broker
- CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+ AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1)))
// wait until leader is elected
var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@ -86,7 +93,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(numMessages, leaderHW)
val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
assertEquals(numMessages, followerHW)
- servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDirs(0))})
}
def testHWCheckpointWithFailuresSingleLogSegment {
@@ -98,7 +104,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
producer = new Producer[Int, String](new ProducerConfig(producerProps))
// create topic with 1 partition, 2 replicas, one on each broker
- CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+ AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1)))
// wait until leader is elected
var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@ -148,7 +154,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
producer.close()
assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
- servers.foreach(server => Utils.rm(server.config.logDirs))
}
def testHWCheckpointNoFailuresMultipleLogSegments {
@@ -163,7 +168,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
producer = new Producer[Int, String](new ProducerConfig(producerProps))
// create topic with 1 partition, 2 replicas, one on each broker
- CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+ AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1)))
// wait until leader is elected
var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@ -182,7 +187,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(hw, leaderHW)
val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
assertEquals(hw, followerHW)
- servers.foreach(server => Utils.rm(server.config.logDirs))
}
def testHWCheckpointWithFailuresMultipleLogSegments {
@@ -197,7 +201,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
producer = new Producer[Int, String](new ProducerConfig(producerProps))
// create topic with 1 partition, 2 replicas, one on each broker
- CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+ AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(server1.config.brokerId, server2.config.brokerId)))
// wait until leader is elected
var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@ -241,7 +245,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
producer.close()
assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
- servers.foreach(server => Utils.rm(server.config.logDirs))
}
private def sendMessages(n: Int = 1) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 6989c95..c0475d0 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -26,7 +26,6 @@ import org.junit.{After, Before, Test}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
-import kafka.admin.CreateTopicCommand
import kafka.api.{OffsetCommitRequest, OffsetFetchRequest}
import kafka.utils.TestUtils._
import kafka.common.{ErrorMapping, TopicAndPartition, OffsetMetadataAndError}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index d0e3590..dd85c71 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -22,9 +22,10 @@ import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._
import kafka.producer.KeyedMessage
import kafka.serializer.StringEncoder
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
import kafka.utils.TestUtils
import junit.framework.Assert._
+import kafka.common._
class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(2)
@@ -50,7 +51,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
// create a topic and partition and await leadership
for (topic <- List(topic1,topic2)) {
- CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
+ AdminUtils.createTopic(zkClient, topic, 1, 2)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
}
@@ -65,9 +66,10 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
def logsMatch(): Boolean = {
var result = true
for (topic <- List(topic1, topic2)) {
- val expectedOffset = brokers.head.getLogManager().getLog(topic, partition).get.logEndOffset
+ val topicAndPart = TopicAndPartition(topic, partition)
+ val expectedOffset = brokers.head.getLogManager().getLog(topicAndPart).get.logEndOffset
result = result && expectedOffset > 0 && brokers.foldLeft(true) { (total, item) => total &&
- (expectedOffset == item.getLogManager().getLog(topic, partition).get.logEndOffset) }
+ (expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset) }
}
result
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 3728f8c..c5f39cb 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -26,7 +26,7 @@ import kafka.zk.ZooKeeperTestHarness
import kafka.producer._
import kafka.utils.IntEncoder
import kafka.utils.TestUtils._
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
import kafka.api.FetchRequestBuilder
import kafka.utils.{TestUtils, Utils}
@@ -49,7 +49,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
var producer = new Producer[Int, String](new ProducerConfig(producerConfig))
// create topic
- CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+ AdminUtils.createTopic(zkClient, topic, 1, 1)
// send some messages
producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 571e2df..1c6f615 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -67,7 +67,7 @@ class SimpleFetchTest extends JUnit3Suite {
EasyMock.replay(log)
val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
- EasyMock.expect(logManager.getLog(topic, partitionId)).andReturn(Some(log)).anyTimes()
+ EasyMock.expect(logManager.getLog(TopicAndPartition(topic, partitionId))).andReturn(Some(log)).anyTimes()
EasyMock.replay(logManager)
val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
@@ -133,7 +133,7 @@ class SimpleFetchTest extends JUnit3Suite {
EasyMock.replay(log)
val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
- EasyMock.expect(logManager.getLog(topic, 0)).andReturn(Some(log)).anyTimes()
+ EasyMock.expect(logManager.getLog(TopicAndPartition(topic, 0))).andReturn(Some(log)).anyTimes()
EasyMock.replay(logManager)
val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/utils/JsonTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala
new file mode 100644
index 0000000..4482dab
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/JsonTest.scala
@@ -0,0 +1,27 @@
+package kafka.utils
+
+import junit.framework.Assert._
+import org.junit.{Test, After, Before}
+
+class JsonTest {
+
+ @Test
+ def testJsonEncoding() {
+ assertEquals("null", Json.encode(null))
+ assertEquals("1", Json.encode(1))
+ assertEquals("1", Json.encode(1L))
+ assertEquals("1", Json.encode(1.toByte))
+ assertEquals("1", Json.encode(1.toShort))
+ assertEquals("1.0", Json.encode(1.0))
+ assertEquals("\"str\"", Json.encode("str"))
+ assertEquals("true", Json.encode(true))
+ assertEquals("false", Json.encode(false))
+ assertEquals("[]", Json.encode(Seq()))
+ assertEquals("[1,2,3]", Json.encode(Seq(1,2,3)))
+ assertEquals("[1,\"2\",[3]]", Json.encode(Seq(1,"2",Seq(3))))
+ assertEquals("{}", Json.encode(Map()))
+ assertEquals("{\"a\":1,\"b\":2}", Json.encode(Map("a" -> 1, "b" -> 2)))
+ assertEquals("{\"a\":[1,2],\"c\":[3,4]}", Json.encode(Map("a" -> Seq(1,2), "c" -> Seq(3,4))))
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index ec27ef9..b364ac2 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -76,7 +76,9 @@ class SchedulerTest {
@Test
def testNonPeriodicTask() {
scheduler.schedule("test", counter1.getAndIncrement, delay = 0)
- retry(30000, () => assertEquals(counter1.get, 1))
+ retry(30000) {
+ assertEquals(counter1.get, 1)
+ }
Thread.sleep(5)
assertEquals("Should only run once", 1, counter1.get)
}
@@ -84,6 +86,8 @@ class SchedulerTest {
@Test
def testPeriodicTask() {
scheduler.schedule("test", counter1.getAndIncrement, delay = 0, period = 5)
- retry(30000, () => assertTrue("Should count to 20", counter1.get >= 20))
+ retry(30000){
+ assertTrue("Should count to 20", counter1.get >= 20)
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1ed12e4/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 98901c2..40bfacb 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -23,6 +23,7 @@ import java.nio._
import java.nio.channels._
import java.util.Random
import java.util.Properties
+import junit.framework.AssertionFailedError
import junit.framework.Assert._
import kafka.server._
import kafka.producer._
@@ -122,7 +123,7 @@ object TestUtils extends Logging {
/**
* Create a test config for the given node id
*/
- def createBrokerConfig(nodeId: Int, port: Int): Properties = {
+ def createBrokerConfig(nodeId: Int, port: Int = choosePort()): Properties = {
val props = new Properties
props.put("broker.id", nodeId.toString)
props.put("host.name", "localhost")
@@ -448,18 +449,20 @@ object TestUtils extends Logging {
* Execute the given block. If it throws an assert error, retry. Repeat
* until no error is thrown or the time limit ellapses
*/
- def retry(maxWaitMs: Long, block: () => Unit) {
+ def retry(maxWaitMs: Long)(block: => Unit) {
var wait = 1L
val startTime = System.currentTimeMillis()
while(true) {
try {
- block()
+ block
return
} catch {
- case e: AssertionError =>
- if(System.currentTimeMillis - startTime > maxWaitMs) {
+ case e: AssertionFailedError =>
+ val ellapsed = System.currentTimeMillis - startTime
+ if(ellapsed > maxWaitMs) {
throw e
} else {
+ info("Attempt failed, sleeping for " + wait + ", and then retrying.")
Thread.sleep(wait)
wait += math.min(wait, 1000)
}