You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/13 08:01:31 UTC
[kafka] branch 2.0 updated: MINOR: Remove unnecessary old consumer
usage in tests and other clean-ups (#5199)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 5c71948 MINOR: Remove unnecessary old consumer usage in tests and other clean-ups (#5199)
5c71948 is described below
commit 5c71948f5c369cdb0ffea9137f83eda9ffdda6e3
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Wed Jun 13 00:51:11 2018 -0700
MINOR: Remove unnecessary old consumer usage in tests and other clean-ups (#5199)
- Update some tests to use the Java consumer.
- Remove ignored `ProducerBounceTest.testBrokerFailure`. This test
is flaky and it has been superseded by `TransactionBounceTest`.
- Use non-blocking poll for consumption methods in `TestUtils`.
This is a step on the road to remove the old consumers.
---
core/src/main/scala/kafka/log/LogConfig.scala | 6 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 15 +++--
core/src/main/scala/kafka/tools/JmxTool.scala | 2 +-
.../kafka/api/AdminClientIntegrationTest.scala | 3 +-
.../integration/kafka/api/ProducerBounceTest.scala | 70 +++-------------------
.../scala/other/kafka/TestLinearWriteSpeed.scala | 2 +-
.../kafka/admin/DeleteConsumerGroupsTest.scala | 5 +-
.../integration/UncleanLeaderElectionTest.scala | 52 +++++++++-------
.../scala/unit/kafka/server/KafkaConfigTest.scala | 3 +-
.../unit/kafka/server/ServerShutdownTest.scala | 36 +++++------
.../test/scala/unit/kafka/utils/TestUtils.scala | 14 +++--
12 files changed, 82 insertions(+), 128 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 158209a..c827121 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -21,12 +21,12 @@ import java.util.{Collections, Locale, Properties}
import scala.collection.JavaConverters._
import kafka.api.ApiVersion
-import kafka.message.{BrokerCompressionCodec, Message}
+import kafka.message.BrokerCompressionCodec
import kafka.server.{KafkaConfig, ThrottledReplicaListValidator}
import kafka.utils.Implicits._
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, TopicConfig}
-import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.record.{LegacyRecord, TimestampType}
import org.apache.kafka.common.utils.Utils
import scala.collection.{Map, mutable}
@@ -212,7 +212,7 @@ object LogConfig {
import org.apache.kafka.common.config.ConfigDef.ValidString._
new LogConfigDef()
- .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(Message.MinMessageOverhead), MEDIUM,
+ .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(LegacyRecord.RECORD_OVERHEAD_V0), MEDIUM,
SegmentSizeDoc, KafkaConfig.LogSegmentBytesProp)
.define(SegmentMsProp, LONG, Defaults.SegmentMs, atLeast(1), MEDIUM, SegmentMsDoc,
KafkaConfig.LogRollTimeMillisProp)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ae7845b..cdd0d72 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1318,7 +1318,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDeleteGroupsRequest(request: RequestChannel.Request): Unit = {
val deleteGroupsRequest = request.body[DeleteGroupsRequest]
- var groups = deleteGroupsRequest.groups.asScala.toSet
+ val groups = deleteGroupsRequest.groups.asScala.toSet
val (authorizedGroups, unauthorizedGroups) = groups.partition { group =>
authorize(request.session, Delete, Resource(Group, group, LITERAL))
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 19bb807..ecbb790 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -22,10 +22,9 @@ import java.util.{Collections, Properties}
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1}
import kafka.cluster.EndPoint
-import kafka.consumer.ConsumerConfig
import kafka.coordinator.group.OffsetConfig
import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
-import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
+import kafka.message.{BrokerCompressionCodec, CompressionCodec}
import kafka.utils.CoreUtils
import kafka.utils.Implicits._
import org.apache.kafka.clients.CommonClientConfigs
@@ -35,7 +34,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs, TopicConfig}
import org.apache.kafka.common.metrics.Sensor
import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.record.{LegacyRecord, Records, TimestampType}
import org.apache.kafka.common.security.auth.SecurityProtocol
import scala.collection.JavaConverters._
@@ -52,7 +51,7 @@ object Defaults {
val BrokerIdGenerationEnable = true
val MaxReservedBrokerId = 1000
val BrokerId = -1
- val MessageMaxBytes = 1000000 + MessageSet.LogOverhead
+ val MessageMaxBytes = 1000000 + Records.LOG_OVERHEAD
val NumNetworkThreads = 3
val NumIoThreads = 8
val BackgroundThreads = 10
@@ -122,9 +121,9 @@ object Defaults {
val ControllerMessageQueueSize = Int.MaxValue
val DefaultReplicationFactor = 1
val ReplicaLagTimeMaxMs = 10000L
- val ReplicaSocketTimeoutMs = ConsumerConfig.SocketTimeout
- val ReplicaSocketReceiveBufferBytes = ConsumerConfig.SocketBufferSize
- val ReplicaFetchMaxBytes = ConsumerConfig.FetchSize
+ val ReplicaSocketTimeoutMs = 30 * 1000
+ val ReplicaSocketReceiveBufferBytes = 64 * 1024
+ val ReplicaFetchMaxBytes = 1024 * 1024
val ReplicaFetchWaitMaxMs = 500
val ReplicaFetchMinBytes = 1
val ReplicaFetchResponseMaxBytes = 10 * 1024 * 1024
@@ -820,7 +819,7 @@ object KafkaConfig {
.define(NumPartitionsProp, INT, Defaults.NumPartitions, atLeast(1), MEDIUM, NumPartitionsDoc)
.define(LogDirProp, STRING, Defaults.LogDir, HIGH, LogDirDoc)
.define(LogDirsProp, STRING, null, HIGH, LogDirsDoc)
- .define(LogSegmentBytesProp, INT, Defaults.LogSegmentBytes, atLeast(Message.MinMessageOverhead), HIGH, LogSegmentBytesDoc)
+ .define(LogSegmentBytesProp, INT, Defaults.LogSegmentBytes, atLeast(LegacyRecord.RECORD_OVERHEAD_V0), HIGH, LogSegmentBytesDoc)
.define(LogRollTimeMillisProp, LONG, null, HIGH, LogRollTimeMillisDoc)
.define(LogRollTimeHoursProp, INT, Defaults.LogRollHours, atLeast(1), HIGH, LogRollTimeHoursDoc)
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 27e4631..c5303a9 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -99,7 +99,7 @@ object JmxTool extends Logging {
val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt))
val interval = options.valueOf(reportingIntervalOpt).intValue
- var oneTime = interval < 0 || options.has(oneTimeOpt)
+ val oneTime = interval < 0 || options.has(oneTimeOpt)
val attributesWhitelistExists = options.has(attributesOpt)
val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(",").filterNot(_.equals(""))) else None
val dateFormatExists = options.has(dateFormatOpt)
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 986fa4a..50ed7ae 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -389,7 +389,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val finalNumMessages = Await.result(producerFuture, Duration(20, TimeUnit.SECONDS))
// Verify that all messages that are produced can be consumed
- val consumerRecords = TestUtils.consumeTopicRecords(servers, topic, finalNumMessages, securityProtocol, trustStoreFile)
+ val consumerRecords = TestUtils.consumeTopicRecords(servers, topic, finalNumMessages,
+ securityProtocol = securityProtocol, trustStoreFile = trustStoreFile)
consumerRecords.zipWithIndex.foreach { case (consumerRecord, index) =>
assertEquals(s"xxxxxxxxxxxxxxxxxxxx-$index", new String(consumerRecord.value))
}
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index e3514cd..a11afd3 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -16,16 +16,18 @@ package kafka.api
import java.util.Properties
import java.util.concurrent.Future
-import kafka.consumer.SimpleConsumer
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{ShutdownableThread, TestUtils}
import kafka.utils.Implicits._
import org.apache.kafka.clients.producer._
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.serialization.StringDeserializer
import org.junit.Assert._
import org.junit.{Ignore, Test}
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
class ProducerBounceTest extends KafkaServerTestHarness {
@@ -35,14 +37,14 @@ class ProducerBounceTest extends KafkaServerTestHarness {
val numServers = 4
val overridingProps = new Properties()
- overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
+ overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, "false")
overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString)
// Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
- overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
- overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
- overridingProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
- overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+ overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+ overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, "true")
+ overridingProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "false")
+ overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
// This is the one of the few tests we currently allow to preallocate ports, despite the fact that this can result in transient
// failures due to ports getting reused. We can't use random ports because of bad behavior that can result from bouncing
// brokers too quickly when they get new, random ports. If we're not careful, the client can end up in a situation
@@ -59,62 +61,6 @@ class ProducerBounceTest extends KafkaServerTestHarness {
private val topic1 = "topic-1"
- /**
- * With replication, producer should able to find new leader after it detects broker failure
- */
- @Ignore // To be re-enabled once we can make it less flaky (KAFKA-2837)
- @Test
- def testBrokerFailure() {
- val numPartitions = 3
- val topicConfig = new Properties()
- topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
- createTopic(topic1, numPartitions, numServers, topicConfig)
-
- val scheduler = new ProducerScheduler()
- scheduler.start()
-
- // rolling bounce brokers
-
- for (_ <- 0 until numServers) {
- for (server <- servers) {
- info("Shutting down server : %s".format(server.config.brokerId))
- server.shutdown()
- server.awaitShutdown()
- info("Server %s shut down. Starting it up again.".format(server.config.brokerId))
- server.startup()
- info("Restarted server: %s".format(server.config.brokerId))
- }
-
- // Make sure the producer do not see any exception in returned metadata due to broker failures
- assertFalse(scheduler.failed)
-
- // Make sure the leader still exists after bouncing brokers
- (0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition))
- }
-
- scheduler.shutdown()
-
- // Make sure the producer do not see any exception
- // when draining the left messages on shutdown
- assertFalse(scheduler.failed)
-
- // double check that the leader info has been propagated after consecutive bounces
- val newLeaders = (0 until numPartitions).map(i => TestUtils.waitUntilMetadataIsPropagated(servers, topic1, i))
- val fetchResponses = newLeaders.zipWithIndex.map { case (leader, partition) =>
- // Consumers must be instantiated after all the restarts since they use random ports each time they start up
- val consumer = new SimpleConsumer("localhost", boundPort(servers(leader)), 30000, 1024 * 1024, "")
- val response = consumer.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
- consumer.close
- response
- }
- val messages = fetchResponses.flatMap(r => r.iterator.toList.map(_.message))
- val uniqueMessages = messages.toSet
- val uniqueMessageSize = uniqueMessages.size
- info(s"number of unique messages sent: ${uniqueMessageSize}")
- assertEquals(s"Found ${messages.size - uniqueMessageSize} duplicate messages.", uniqueMessageSize, messages.size)
- assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, messages.size)
- }
-
private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) {
val numRecords = 1000
var sent = 0
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 16325ee..b385a2a 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -102,7 +102,7 @@ object TestLinearWriteSpeed {
val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt))
val rand = new Random
rand.nextBytes(buffer.array)
- val numMessages = bufferSize / (messageSize + MessageSet.LogOverhead)
+ val numMessages = bufferSize / (messageSize + Records.LOG_OVERHEAD)
val createTime = System.currentTimeMillis
val messageSet = {
val compressionType = CompressionType.forId(compressionCodec.codec)
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
index ef3b17c..aae48d1 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -14,16 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package unit.kafka.admin
+package kafka.admin
import joptsimple.OptionException
-import kafka.admin.ConsumerGroupCommandTest
import kafka.utils.TestUtils
import org.apache.kafka.common.protocol.Errors
import org.junit.Assert._
import org.junit.Test
-class DeleteConsumerGroupTest extends ConsumerGroupCommandTest {
+class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
@Test(expected = classOf[OptionException])
def testDeleteWithTopicOption() {
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index a15ddb8..d5c0a55 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -21,17 +21,21 @@ import org.apache.kafka.common.config.ConfigException
import org.junit.{After, Before, Ignore, Test}
import scala.util.Random
+import scala.collection.JavaConverters._
import org.apache.log4j.{Level, Logger}
import java.util.Properties
import java.util.concurrent.ExecutionException
-import kafka.consumer.{Consumer, ConsumerConfig}
-import kafka.serializer.StringDecoder
import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.CoreUtils
+import kafka.utils.{CoreUtils, TestUtils}
import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.serialization.StringDeserializer
import org.junit.Assert._
class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@@ -177,19 +181,19 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
produceMessage(servers, topic, "first")
waitUntilMetadataIsPropagated(servers, topic, partitionId)
- assertEquals(List("first"), consumeAllMessages(topic))
+ assertEquals(List("first"), consumeAllMessages(topic, 1))
// shutdown follower server
servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
produceMessage(servers, topic, "second")
- assertEquals(List("first", "second"), consumeAllMessages(topic))
+ assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
//remove any previous unclean election metric
- servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+ servers.map(_.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
// shutdown leader and then restart follower
- servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
+ servers.filter(_.config.brokerId == leaderId).map(shutdownServer)
val followerServer = servers.find(_.config.brokerId == followerId).get
followerServer.startup()
@@ -200,7 +204,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
produceMessage(servers, topic, "third")
// second message was lost due to unclean election
- assertEquals(List("first", "third"), consumeAllMessages(topic))
+ assertEquals(List("first", "third"), consumeAllMessages(topic, 2))
}
def verifyUncleanLeaderElectionDisabled(): Unit = {
@@ -215,13 +219,13 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
produceMessage(servers, topic, "first")
waitUntilMetadataIsPropagated(servers, topic, partitionId)
- assertEquals(List("first"), consumeAllMessages(topic))
+ assertEquals(List("first"), consumeAllMessages(topic, 1))
// shutdown follower server
servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
produceMessage(servers, topic, "second")
- assertEquals(List("first", "second"), consumeAllMessages(topic))
+ assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
//remove any previous unclean election metric
servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
@@ -243,7 +247,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] => // expected
}
- assertEquals(List.empty[String], consumeAllMessages(topic))
+ assertEquals(List.empty[String], consumeAllMessages(topic, 0))
// restart leader temporarily to send a successfully replicated message
servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup())
@@ -257,7 +261,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId))
// verify messages can be consumed from ISR follower that was just promoted to leader
- assertEquals(List("first", "second", "third"), consumeAllMessages(topic))
+ assertEquals(List("first", "second", "third"), consumeAllMessages(topic, 3))
}
private def shutdownServer(server: KafkaServer) = {
@@ -265,16 +269,18 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
server.awaitShutdown()
}
- private def consumeAllMessages(topic: String) : List[String] = {
- // use a fresh consumer group every time so that we don't need to mess with disabling auto-commit or
- // resetting the ZK offset
- val consumerProps = createConsumerProperties(zkConnect, "group" + random.nextLong, "id", 1000)
- val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps))
- val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
-
- val messages = getMessages(messageStream)
- consumerConnector.shutdown
-
- messages
+ private def consumeAllMessages(topic: String, numMessages: Int): Seq[String] = {
+ val brokerList = TestUtils.bootstrapServers(servers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+ val props = new Properties
+ // Don't rely on coordinator as it may be down when this method is called
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ val consumer = TestUtils.createNewConsumer(brokerList, "group" + random.nextLong,
+ securityProtocol = SecurityProtocol.PLAINTEXT, valueDeserializer = new StringDeserializer, props = Some(props))
+ try {
+ val tp = new TopicPartition(topic, partitionId)
+ consumer.assign(Seq(tp).asJava)
+ consumer.seek(tp, 0)
+ TestUtils.consumeRecords(consumer, numMessages).map(_.value)
+ } finally consumer.close()
}
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index afb297d..0ee8d81 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -26,6 +26,7 @@ import kafka.utils.{CoreUtils, TestUtils}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.metrics.Sensor
import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.record.Records
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.Assert._
import org.junit.Test
@@ -590,7 +591,7 @@ class KafkaConfigTest {
case KafkaConfig.NumPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogDirsProp => // ignore string
case KafkaConfig.LogDirProp => // ignore string
- case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", Message.MinMessageOverhead - 1)
+ case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", Records.LOG_OVERHEAD - 1)
case KafkaConfig.LogRollTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogRollTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 596c353..78c0c33 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -17,20 +17,20 @@
package kafka.server
import kafka.zk.ZooKeeperTestHarness
-import kafka.consumer.SimpleConsumer
import kafka.utils.{CoreUtils, TestUtils}
import kafka.utils.TestUtils._
-import kafka.api.FetchRequestBuilder
-import kafka.message.ByteBufferMessageSet
import java.io.File
import kafka.log.LogManager
+import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.errors.KafkaStorageException
-import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
import org.junit.{Before, Test}
import org.junit.Assert._
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
class ServerShutdownTest extends ZooKeeperTestHarness {
@@ -58,6 +58,14 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
valueSerializer = new StringSerializer
)
+ def createConsumer(server: KafkaServer): KafkaConsumer[Integer, String] =
+ TestUtils.createNewConsumer(
+ TestUtils.getBrokerListStrFromServers(Seq(server)),
+ securityProtocol = SecurityProtocol.PLAINTEXT,
+ keyDeserializer = new IntegerDeserializer,
+ valueDeserializer = new StringDeserializer
+ )
+
var server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
server.startup()
var producer = createProducer(server)
@@ -85,25 +93,17 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0)
producer = createProducer(server)
- val consumer = new SimpleConsumer(host, TestUtils.boundPort(server), 1000000, 64*1024, "")
+ val consumer = createConsumer(server)
+ consumer.subscribe(Seq(topic).asJava)
- var fetchedMessage: ByteBufferMessageSet = null
- while (fetchedMessage == null || fetchedMessage.validBytes == 0) {
- val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build())
- fetchedMessage = fetched.messageSet(topic, 0)
- }
- assertEquals(sent1, fetchedMessage.map(m => TestUtils.readString(m.message.payload)))
- val newOffset = fetchedMessage.last.nextOffset
+ val consumerRecords = TestUtils.consumeRecords(consumer, sent1.size)
+ assertEquals(sent1, consumerRecords.map(_.value))
// send some more messages
sent2.map(value => producer.send(new ProducerRecord(topic, 0, value))).foreach(_.get)
- fetchedMessage = null
- while (fetchedMessage == null || fetchedMessage.validBytes == 0) {
- val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build())
- fetchedMessage = fetched.messageSet(topic, 0)
- }
- assertEquals(sent2, fetchedMessage.map(m => TestUtils.readString(m.message.payload)))
+ val consumerRecords2 = TestUtils.consumeRecords(consumer, sent2.size)
+ assertEquals(sent2, consumerRecords2.map(_.value))
consumer.close()
producer.close()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f50ef3a..8b89504 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -22,6 +22,7 @@ import java.nio._
import java.nio.channels._
import java.nio.charset.{Charset, StandardCharsets}
import java.security.cert.X509Certificate
+import java.time.Duration
import java.util.{Collections, Properties}
import java.util.concurrent.{Callable, Executors, TimeUnit}
import javax.net.ssl.X509TrustManager
@@ -1258,10 +1259,11 @@ object TestUtils extends Logging {
def consumeTopicRecords[K, V](servers: Seq[KafkaServer],
topic: String,
numMessages: Int,
+ groupId: String = "group",
securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
trustStoreFile: Option[File] = None,
waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
- val consumer = createNewConsumer(TestUtils.getBrokerListStrFromServers(servers, securityProtocol),
+ val consumer = createNewConsumer(TestUtils.getBrokerListStrFromServers(servers, securityProtocol), groupId = groupId,
securityProtocol = securityProtocol, trustStoreFile = trustStoreFile)
try {
consumer.subscribe(Collections.singleton(topic))
@@ -1273,7 +1275,7 @@ object TestUtils extends Logging {
waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Seq[ConsumerRecord[K, V]] = {
val records = new ArrayBuffer[ConsumerRecord[K, V]]()
waitUntilTrue(() => {
- records ++= consumer.poll(50).asScala
+ records ++= consumer.poll(Duration.ofMillis(50)).asScala
records.size >= numMessages
}, s"Consumed ${records.size} records until timeout instead of the expected $numMessages records", waitTime)
assertEquals("Consumed more records than expected", numMessages, records.size)
@@ -1288,11 +1290,11 @@ object TestUtils extends Logging {
*
* @return All the records consumed by the consumer within the specified duration.
*/
- def consumeRecordsFor[K, V](consumer: KafkaConsumer[K, V], duration: Long): Seq[ConsumerRecord[K, V]] = {
+ def consumeRecordsFor[K, V](consumer: KafkaConsumer[K, V], duration: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Seq[ConsumerRecord[K, V]] = {
val startTime = System.currentTimeMillis()
val records = new ArrayBuffer[ConsumerRecord[K, V]]()
waitUntilTrue(() => {
- records ++= consumer.poll(50).asScala
+ records ++= consumer.poll(Duration.ofMillis(50)).asScala
System.currentTimeMillis() - startTime > duration
}, s"The timeout $duration was greater than the maximum wait time.")
records
@@ -1375,14 +1377,14 @@ object TestUtils extends Logging {
def pollUntilAtLeastNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
TestUtils.waitUntilTrue(() => {
- records ++= consumer.poll(50).asScala
+ records ++= consumer.poll(Duration.ofMillis(50)).asScala
records.size >= numRecords
}, s"Consumed ${records.size} records until timeout, but expected $numRecords records.")
records
}
def resetToCommittedPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) = {
- consumer.assignment.asScala.foreach { case(topicPartition) =>
+ consumer.assignment.asScala.foreach { topicPartition =>
val offset = consumer.committed(topicPartition)
if (offset != null)
consumer.seek(topicPartition, offset.offset)
--
To stop receiving notification emails like this one, please contact
ijuma@apache.org.