You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/13 08:01:31 UTC

[kafka] branch 2.0 updated: MINOR: Remove unnecessary old consumer usage in tests and other clean-ups (#5199)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 5c71948  MINOR: Remove unnecessary old consumer usage in tests and other clean-ups (#5199)
5c71948 is described below

commit 5c71948f5c369cdb0ffea9137f83eda9ffdda6e3
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Wed Jun 13 00:51:11 2018 -0700

    MINOR: Remove unnecessary old consumer usage in tests and other clean-ups (#5199)
    
    - Update some tests to use the Java consumer.
    - Remove ignored `ProducerBounceTest.testBrokerFailure`. This test
    is flaky and it has been superseded by `TransactionBounceTest`.
    - Use non-blocking poll for consumption methods in `TestUtils`.
    
    This is a step on the road to remove the old consumers.
---
 core/src/main/scala/kafka/log/LogConfig.scala      |  6 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala | 15 +++--
 core/src/main/scala/kafka/tools/JmxTool.scala      |  2 +-
 .../kafka/api/AdminClientIntegrationTest.scala     |  3 +-
 .../integration/kafka/api/ProducerBounceTest.scala | 70 +++-------------------
 .../scala/other/kafka/TestLinearWriteSpeed.scala   |  2 +-
 .../kafka/admin/DeleteConsumerGroupsTest.scala     |  5 +-
 .../integration/UncleanLeaderElectionTest.scala    | 52 +++++++++-------
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  3 +-
 .../unit/kafka/server/ServerShutdownTest.scala     | 36 +++++------
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 14 +++--
 12 files changed, 82 insertions(+), 128 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 158209a..c827121 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -21,12 +21,12 @@ import java.util.{Collections, Locale, Properties}
 
 import scala.collection.JavaConverters._
 import kafka.api.ApiVersion
-import kafka.message.{BrokerCompressionCodec, Message}
+import kafka.message.BrokerCompressionCodec
 import kafka.server.{KafkaConfig, ThrottledReplicaListValidator}
 import kafka.utils.Implicits._
 import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, TopicConfig}
-import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.record.{LegacyRecord, TimestampType}
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.{Map, mutable}
@@ -212,7 +212,7 @@ object LogConfig {
     import org.apache.kafka.common.config.ConfigDef.ValidString._
 
     new LogConfigDef()
-      .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(Message.MinMessageOverhead), MEDIUM,
+      .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(LegacyRecord.RECORD_OVERHEAD_V0), MEDIUM,
         SegmentSizeDoc, KafkaConfig.LogSegmentBytesProp)
       .define(SegmentMsProp, LONG, Defaults.SegmentMs, atLeast(1), MEDIUM, SegmentMsDoc,
         KafkaConfig.LogRollTimeMillisProp)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ae7845b..cdd0d72 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1318,7 +1318,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleDeleteGroupsRequest(request: RequestChannel.Request): Unit = {
     val deleteGroupsRequest = request.body[DeleteGroupsRequest]
-    var groups = deleteGroupsRequest.groups.asScala.toSet
+    val groups = deleteGroupsRequest.groups.asScala.toSet
 
     val (authorizedGroups, unauthorizedGroups) = groups.partition { group =>
       authorize(request.session, Delete, Resource(Group, group, LITERAL))
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 19bb807..ecbb790 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -22,10 +22,9 @@ import java.util.{Collections, Properties}
 
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1}
 import kafka.cluster.EndPoint
-import kafka.consumer.ConsumerConfig
 import kafka.coordinator.group.OffsetConfig
 import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
-import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
+import kafka.message.{BrokerCompressionCodec, CompressionCodec}
 import kafka.utils.CoreUtils
 import kafka.utils.Implicits._
 import org.apache.kafka.clients.CommonClientConfigs
@@ -35,7 +34,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs, TopicConfig}
 import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.record.{LegacyRecord, Records, TimestampType}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
 import scala.collection.JavaConverters._
@@ -52,7 +51,7 @@ object Defaults {
   val BrokerIdGenerationEnable = true
   val MaxReservedBrokerId = 1000
   val BrokerId = -1
-  val MessageMaxBytes = 1000000 + MessageSet.LogOverhead
+  val MessageMaxBytes = 1000000 + Records.LOG_OVERHEAD
   val NumNetworkThreads = 3
   val NumIoThreads = 8
   val BackgroundThreads = 10
@@ -122,9 +121,9 @@ object Defaults {
   val ControllerMessageQueueSize = Int.MaxValue
   val DefaultReplicationFactor = 1
   val ReplicaLagTimeMaxMs = 10000L
-  val ReplicaSocketTimeoutMs = ConsumerConfig.SocketTimeout
-  val ReplicaSocketReceiveBufferBytes = ConsumerConfig.SocketBufferSize
-  val ReplicaFetchMaxBytes = ConsumerConfig.FetchSize
+  val ReplicaSocketTimeoutMs = 30 * 1000
+  val ReplicaSocketReceiveBufferBytes = 64 * 1024
+  val ReplicaFetchMaxBytes = 1024 * 1024
   val ReplicaFetchWaitMaxMs = 500
   val ReplicaFetchMinBytes = 1
   val ReplicaFetchResponseMaxBytes = 10 * 1024 * 1024
@@ -820,7 +819,7 @@ object KafkaConfig {
       .define(NumPartitionsProp, INT, Defaults.NumPartitions, atLeast(1), MEDIUM, NumPartitionsDoc)
       .define(LogDirProp, STRING, Defaults.LogDir, HIGH, LogDirDoc)
       .define(LogDirsProp, STRING, null, HIGH, LogDirsDoc)
-      .define(LogSegmentBytesProp, INT, Defaults.LogSegmentBytes, atLeast(Message.MinMessageOverhead), HIGH, LogSegmentBytesDoc)
+      .define(LogSegmentBytesProp, INT, Defaults.LogSegmentBytes, atLeast(LegacyRecord.RECORD_OVERHEAD_V0), HIGH, LogSegmentBytesDoc)
 
       .define(LogRollTimeMillisProp, LONG, null, HIGH, LogRollTimeMillisDoc)
       .define(LogRollTimeHoursProp, INT, Defaults.LogRollHours, atLeast(1), HIGH, LogRollTimeHoursDoc)
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 27e4631..c5303a9 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -99,7 +99,7 @@ object JmxTool extends Logging {
 
     val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt))
     val interval = options.valueOf(reportingIntervalOpt).intValue
-    var oneTime = interval < 0 || options.has(oneTimeOpt)
+    val oneTime = interval < 0 || options.has(oneTimeOpt)
     val attributesWhitelistExists = options.has(attributesOpt)
     val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(",").filterNot(_.equals(""))) else None
     val dateFormatExists = options.has(dateFormatOpt)
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 986fa4a..50ed7ae 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -389,7 +389,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     val finalNumMessages = Await.result(producerFuture, Duration(20, TimeUnit.SECONDS))
 
     // Verify that all messages that are produced can be consumed
-    val consumerRecords = TestUtils.consumeTopicRecords(servers, topic, finalNumMessages, securityProtocol, trustStoreFile)
+    val consumerRecords = TestUtils.consumeTopicRecords(servers, topic, finalNumMessages,
+      securityProtocol = securityProtocol, trustStoreFile = trustStoreFile)
     consumerRecords.zipWithIndex.foreach { case (consumerRecord, index) =>
       assertEquals(s"xxxxxxxxxxxxxxxxxxxx-$index", new String(consumerRecord.value))
     }
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index e3514cd..a11afd3 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -16,16 +16,18 @@ package kafka.api
 import java.util.Properties
 import java.util.concurrent.Future
 
-import kafka.consumer.SimpleConsumer
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.{ShutdownableThread, TestUtils}
 import kafka.utils.Implicits._
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.serialization.StringDeserializer
 import org.junit.Assert._
 import org.junit.{Ignore, Test}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 class ProducerBounceTest extends KafkaServerTestHarness {
@@ -35,14 +37,14 @@ class ProducerBounceTest extends KafkaServerTestHarness {
   val numServers = 4
 
   val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
+  overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, "false")
   overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString)
   // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
   // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
-  overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
-  overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
-  overridingProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
-  overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+  overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+  overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, "true")
+  overridingProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "false")
+  overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
   // This is the one of the few tests we currently allow to preallocate ports, despite the fact that this can result in transient
   // failures due to ports getting reused. We can't use random ports because of bad behavior that can result from bouncing
   // brokers too quickly when they get new, random ports. If we're not careful, the client can end up in a situation
@@ -59,62 +61,6 @@ class ProducerBounceTest extends KafkaServerTestHarness {
 
   private val topic1 = "topic-1"
 
-  /**
-   * With replication, producer should able to find new leader after it detects broker failure
-   */
-  @Ignore // To be re-enabled once we can make it less flaky (KAFKA-2837)
-  @Test
-  def testBrokerFailure() {
-    val numPartitions = 3
-    val topicConfig = new Properties()
-    topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
-    createTopic(topic1, numPartitions, numServers, topicConfig)
-
-    val scheduler = new ProducerScheduler()
-    scheduler.start()
-
-    // rolling bounce brokers
-
-    for (_ <- 0 until numServers) {
-      for (server <- servers) {
-        info("Shutting down server : %s".format(server.config.brokerId))
-        server.shutdown()
-        server.awaitShutdown()
-        info("Server %s shut down. Starting it up again.".format(server.config.brokerId))
-        server.startup()
-        info("Restarted server: %s".format(server.config.brokerId))
-      }
-
-      // Make sure the producer do not see any exception in returned metadata due to broker failures
-      assertFalse(scheduler.failed)
-
-      // Make sure the leader still exists after bouncing brokers
-      (0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition))
-    }
-
-    scheduler.shutdown()
-
-    // Make sure the producer do not see any exception
-    // when draining the left messages on shutdown
-    assertFalse(scheduler.failed)
-
-    // double check that the leader info has been propagated after consecutive bounces
-    val newLeaders = (0 until numPartitions).map(i => TestUtils.waitUntilMetadataIsPropagated(servers, topic1, i))
-    val fetchResponses = newLeaders.zipWithIndex.map { case (leader, partition) =>
-      // Consumers must be instantiated after all the restarts since they use random ports each time they start up
-      val consumer = new SimpleConsumer("localhost", boundPort(servers(leader)), 30000, 1024 * 1024, "")
-      val response = consumer.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
-      consumer.close
-      response
-    }
-    val messages = fetchResponses.flatMap(r => r.iterator.toList.map(_.message))
-    val uniqueMessages = messages.toSet
-    val uniqueMessageSize = uniqueMessages.size
-    info(s"number of unique messages sent: ${uniqueMessageSize}")
-    assertEquals(s"Found ${messages.size - uniqueMessageSize} duplicate messages.", uniqueMessageSize, messages.size)
-    assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, messages.size)
-  }
-
   private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) {
     val numRecords = 1000
     var sent = 0
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 16325ee..b385a2a 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -102,7 +102,7 @@ object TestLinearWriteSpeed {
     val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt))
     val rand = new Random
     rand.nextBytes(buffer.array)
-    val numMessages = bufferSize / (messageSize + MessageSet.LogOverhead)
+    val numMessages = bufferSize / (messageSize + Records.LOG_OVERHEAD)
     val createTime = System.currentTimeMillis
     val messageSet = {
       val compressionType = CompressionType.forId(compressionCodec.codec)
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
index ef3b17c..aae48d1 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -14,16 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package unit.kafka.admin
+package kafka.admin
 
 import joptsimple.OptionException
-import kafka.admin.ConsumerGroupCommandTest
 import kafka.utils.TestUtils
 import org.apache.kafka.common.protocol.Errors
 import org.junit.Assert._
 import org.junit.Test
 
-class DeleteConsumerGroupTest extends ConsumerGroupCommandTest {
+class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
   @Test(expected = classOf[OptionException])
   def testDeleteWithTopicOption() {
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index a15ddb8..d5c0a55 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -21,17 +21,21 @@ import org.apache.kafka.common.config.ConfigException
 import org.junit.{After, Before, Ignore, Test}
 
 import scala.util.Random
+import scala.collection.JavaConverters._
 import org.apache.log4j.{Level, Logger}
 import java.util.Properties
 import java.util.concurrent.ExecutionException
 
-import kafka.consumer.{Consumer, ConsumerConfig}
-import kafka.serializer.StringDecoder
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.CoreUtils
+import kafka.utils.{CoreUtils, TestUtils}
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.serialization.StringDeserializer
 import org.junit.Assert._
 
 class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@@ -177,19 +181,19 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
 
     produceMessage(servers, topic, "first")
     waitUntilMetadataIsPropagated(servers, topic, partitionId)
-    assertEquals(List("first"), consumeAllMessages(topic))
+    assertEquals(List("first"), consumeAllMessages(topic, 1))
 
     // shutdown follower server
     servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
 
     produceMessage(servers, topic, "second")
-    assertEquals(List("first", "second"), consumeAllMessages(topic))
+    assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
 
     //remove any previous unclean election metric
-    servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+    servers.map(_.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
 
     // shutdown leader and then restart follower
-    servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
+    servers.filter(_.config.brokerId == leaderId).map(shutdownServer)
     val followerServer = servers.find(_.config.brokerId == followerId).get
     followerServer.startup()
 
@@ -200,7 +204,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     produceMessage(servers, topic, "third")
 
     // second message was lost due to unclean election
-    assertEquals(List("first", "third"), consumeAllMessages(topic))
+    assertEquals(List("first", "third"), consumeAllMessages(topic, 2))
   }
 
   def verifyUncleanLeaderElectionDisabled(): Unit = {
@@ -215,13 +219,13 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
 
     produceMessage(servers, topic, "first")
     waitUntilMetadataIsPropagated(servers, topic, partitionId)
-    assertEquals(List("first"), consumeAllMessages(topic))
+    assertEquals(List("first"), consumeAllMessages(topic, 1))
 
     // shutdown follower server
     servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
 
     produceMessage(servers, topic, "second")
-    assertEquals(List("first", "second"), consumeAllMessages(topic))
+    assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
 
     //remove any previous unclean election metric
     servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
@@ -243,7 +247,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
       case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] => // expected
     }
 
-    assertEquals(List.empty[String], consumeAllMessages(topic))
+    assertEquals(List.empty[String], consumeAllMessages(topic, 0))
 
     // restart leader temporarily to send a successfully replicated message
     servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup())
@@ -257,7 +261,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId))
 
     // verify messages can be consumed from ISR follower that was just promoted to leader
-    assertEquals(List("first", "second", "third"), consumeAllMessages(topic))
+    assertEquals(List("first", "second", "third"), consumeAllMessages(topic, 3))
   }
 
   private def shutdownServer(server: KafkaServer) = {
@@ -265,16 +269,18 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     server.awaitShutdown()
   }
 
-  private def consumeAllMessages(topic: String) : List[String] = {
-    // use a fresh consumer group every time so that we don't need to mess with disabling auto-commit or
-    // resetting the ZK offset
-    val consumerProps = createConsumerProperties(zkConnect, "group" + random.nextLong, "id", 1000)
-    val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps))
-    val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
-
-    val messages = getMessages(messageStream)
-    consumerConnector.shutdown
-
-    messages
+  private def consumeAllMessages(topic: String, numMessages: Int): Seq[String] = {
+    val brokerList = TestUtils.bootstrapServers(servers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+    val props = new Properties
+    // Don't rely on coordinator as it may be down when this method is called
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+    val consumer = TestUtils.createNewConsumer(brokerList, "group" + random.nextLong,
+      securityProtocol = SecurityProtocol.PLAINTEXT, valueDeserializer = new StringDeserializer, props = Some(props))
+    try {
+      val tp = new TopicPartition(topic, partitionId)
+      consumer.assign(Seq(tp).asJava)
+      consumer.seek(tp, 0)
+      TestUtils.consumeRecords(consumer, numMessages).map(_.value)
+    } finally consumer.close()
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index afb297d..0ee8d81 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -26,6 +26,7 @@ import kafka.utils.{CoreUtils, TestUtils}
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.record.Records
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.Assert._
 import org.junit.Test
@@ -590,7 +591,7 @@ class KafkaConfigTest {
         case KafkaConfig.NumPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.LogDirsProp => // ignore string
         case KafkaConfig.LogDirProp => // ignore string
-        case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", Message.MinMessageOverhead - 1)
+        case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", Records.LOG_OVERHEAD - 1)
 
         case KafkaConfig.LogRollTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.LogRollTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 596c353..78c0c33 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -17,20 +17,20 @@
 package kafka.server
 
 import kafka.zk.ZooKeeperTestHarness
-import kafka.consumer.SimpleConsumer
 import kafka.utils.{CoreUtils, TestUtils}
 import kafka.utils.TestUtils._
-import kafka.api.FetchRequestBuilder
-import kafka.message.ByteBufferMessageSet
 import java.io.File
 
 import kafka.log.LogManager
+import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.errors.KafkaStorageException
-import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
 import org.junit.{Before, Test}
 import org.junit.Assert._
 
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 class ServerShutdownTest extends ZooKeeperTestHarness {
@@ -58,6 +58,14 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
         valueSerializer = new StringSerializer
       )
 
+    def createConsumer(server: KafkaServer): KafkaConsumer[Integer, String] =
+      TestUtils.createNewConsumer(
+        TestUtils.getBrokerListStrFromServers(Seq(server)),
+        securityProtocol = SecurityProtocol.PLAINTEXT,
+        keyDeserializer = new IntegerDeserializer,
+        valueDeserializer = new StringDeserializer
+      )
+
     var server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
     server.startup()
     var producer = createProducer(server)
@@ -85,25 +93,17 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
     TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0)
 
     producer = createProducer(server)
-    val consumer = new SimpleConsumer(host, TestUtils.boundPort(server), 1000000, 64*1024, "")
+    val consumer = createConsumer(server)
+    consumer.subscribe(Seq(topic).asJava)
 
-    var fetchedMessage: ByteBufferMessageSet = null
-    while (fetchedMessage == null || fetchedMessage.validBytes == 0) {
-      val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build())
-      fetchedMessage = fetched.messageSet(topic, 0)
-    }
-    assertEquals(sent1, fetchedMessage.map(m => TestUtils.readString(m.message.payload)))
-    val newOffset = fetchedMessage.last.nextOffset
+    val consumerRecords = TestUtils.consumeRecords(consumer, sent1.size)
+    assertEquals(sent1, consumerRecords.map(_.value))
 
     // send some more messages
     sent2.map(value => producer.send(new ProducerRecord(topic, 0, value))).foreach(_.get)
 
-    fetchedMessage = null
-    while (fetchedMessage == null || fetchedMessage.validBytes == 0) {
-      val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build())
-      fetchedMessage = fetched.messageSet(topic, 0)
-    }
-    assertEquals(sent2, fetchedMessage.map(m => TestUtils.readString(m.message.payload)))
+    val consumerRecords2 = TestUtils.consumeRecords(consumer, sent2.size)
+    assertEquals(sent2, consumerRecords2.map(_.value))
 
     consumer.close()
     producer.close()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f50ef3a..8b89504 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -22,6 +22,7 @@ import java.nio._
 import java.nio.channels._
 import java.nio.charset.{Charset, StandardCharsets}
 import java.security.cert.X509Certificate
+import java.time.Duration
 import java.util.{Collections, Properties}
 import java.util.concurrent.{Callable, Executors, TimeUnit}
 import javax.net.ssl.X509TrustManager
@@ -1258,10 +1259,11 @@ object TestUtils extends Logging {
   def consumeTopicRecords[K, V](servers: Seq[KafkaServer],
                                 topic: String,
                                 numMessages: Int,
+                                groupId: String = "group",
                                 securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
                                 trustStoreFile: Option[File] = None,
                                 waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
-    val consumer = createNewConsumer(TestUtils.getBrokerListStrFromServers(servers, securityProtocol),
+    val consumer = createNewConsumer(TestUtils.getBrokerListStrFromServers(servers, securityProtocol), groupId = groupId,
       securityProtocol = securityProtocol, trustStoreFile = trustStoreFile)
     try {
       consumer.subscribe(Collections.singleton(topic))
@@ -1273,7 +1275,7 @@ object TestUtils extends Logging {
                            waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Seq[ConsumerRecord[K, V]] = {
     val records = new ArrayBuffer[ConsumerRecord[K, V]]()
     waitUntilTrue(() => {
-      records ++= consumer.poll(50).asScala
+      records ++= consumer.poll(Duration.ofMillis(50)).asScala
       records.size >= numMessages
     }, s"Consumed ${records.size} records until timeout instead of the expected $numMessages records", waitTime)
     assertEquals("Consumed more records than expected", numMessages, records.size)
@@ -1288,11 +1290,11 @@ object TestUtils extends Logging {
     *
     * @return All the records consumed by the consumer within the specified duration.
     */
-  def consumeRecordsFor[K, V](consumer: KafkaConsumer[K, V], duration: Long): Seq[ConsumerRecord[K, V]] = {
+  def consumeRecordsFor[K, V](consumer: KafkaConsumer[K, V], duration: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Seq[ConsumerRecord[K, V]] = {
     val startTime = System.currentTimeMillis()
     val records = new ArrayBuffer[ConsumerRecord[K, V]]()
     waitUntilTrue(() => {
-      records ++= consumer.poll(50).asScala
+      records ++= consumer.poll(Duration.ofMillis(50)).asScala
       System.currentTimeMillis() - startTime > duration
     }, s"The timeout $duration was greater than the maximum wait time.")
     records
@@ -1375,14 +1377,14 @@ object TestUtils extends Logging {
   def pollUntilAtLeastNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
     val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
     TestUtils.waitUntilTrue(() => {
-      records ++= consumer.poll(50).asScala
+      records ++= consumer.poll(Duration.ofMillis(50)).asScala
       records.size >= numRecords
     }, s"Consumed ${records.size} records until timeout, but expected $numRecords records.")
     records
   }
 
   def resetToCommittedPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) = {
-    consumer.assignment.asScala.foreach { case(topicPartition) =>
+    consumer.assignment.asScala.foreach { topicPartition =>
       val offset = consumer.committed(topicPartition)
       if (offset != null)
         consumer.seek(topicPartition, offset.offset)

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.