You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/05/25 00:33:31 UTC
[kafka] branch trunk updated: KAFKA-6921;
Remove old Scala producer and related code
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7132a85 KAFKA-6921; Remove old Scala producer and related code
7132a85 is described below
commit 7132a85fc394bc0627fe1763c17cb523d8a8ff37
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu May 24 17:32:49 2018 -0700
KAFKA-6921; Remove old Scala producer and related code
* Removed Scala producers, request classes, kafka.tools.ProducerPerformance, encoders,
tests.
* Updated ConsoleProducer to remove Scala producer support (removed `BaseProducer`
and several options that are not used by the Java producer).
* Updated a few Scala consumer tests to use the new producer (including a minor
refactor of `produceMessages` methods in `TestUtils`).
* Updated `ClientUtils.fetchTopicMetadata` to use `SimpleConsumer` instead of
`SyncProducer`.
* Removed `TestKafkaAppender` as it looks useless and it defined an `Encoder`.
* Minor import clean-ups
No new tests added since behaviour should remain the same after these changes.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Manikumar Reddy O <ma...@gmail.com>, Dong Lin <li...@gmail.com>
Closes #5045 from ijuma/kafka-6921-remove-old-producer
---
.../src/main/scala/kafka/api/ProducerRequest.scala | 148 ------
.../main/scala/kafka/api/ProducerResponse.scala | 110 -----
core/src/main/scala/kafka/client/ClientUtils.scala | 48 +-
.../scala/kafka/javaapi/producer/Producer.scala | 52 ---
.../scala/kafka/metrics/KafkaMetricsGroup.scala | 10 -
.../main/scala/kafka/producer/BaseProducer.scala | 74 ---
.../scala/kafka/producer/BrokerPartitionInfo.scala | 105 -----
.../kafka/producer/ByteArrayPartitioner.scala | 30 --
.../scala/kafka/producer/DefaultPartitioner.scala | 30 --
.../main/scala/kafka/producer/KeyedMessage.scala | 44 --
.../main/scala/kafka/producer/Partitioner.scala | 35 --
core/src/main/scala/kafka/producer/Producer.scala | 142 ------
.../kafka/producer/ProducerClosedException.scala | 22 -
.../main/scala/kafka/producer/ProducerConfig.scala | 121 -----
.../main/scala/kafka/producer/ProducerPool.scala | 90 ----
.../kafka/producer/ProducerRequestStats.scala | 69 ---
.../main/scala/kafka/producer/ProducerStats.scala | 46 --
.../scala/kafka/producer/ProducerTopicStats.scala | 69 ---
.../main/scala/kafka/producer/SyncProducer.scala | 169 -------
.../scala/kafka/producer/SyncProducerConfig.scala | 72 ---
.../kafka/producer/async/AsyncProducerConfig.scala | 49 --
.../kafka/producer/async/DefaultEventHandler.scala | 359 ---------------
.../scala/kafka/producer/async/EventHandler.scala | 37 --
.../async/IllegalQueueStateException.scala | 26 --
.../producer/async/MissingConfigException.scala | 24 -
.../kafka/producer/async/ProducerSendThread.scala | 114 -----
core/src/main/scala/kafka/serializer/Encoder.scala | 79 ----
.../main/scala/kafka/tools/ConsoleProducer.scala | 94 +---
.../scala/kafka/tools/ProducerPerformance.scala | 308 -------------
.../kafka/api/CustomQuotaCallbackTest.scala | 1 -
.../server/DynamicBrokerReconfigurationTest.scala | 7 -
.../test/scala/other/kafka/TestKafkaAppender.scala | 50 --
.../scala/unit/kafka/admin/DeleteTopicTest.scala | 2 +-
.../admin/ReassignPartitionsClusterTest.scala | 20 +-
.../kafka/admin/ResetConsumerGroupOffsetTest.scala | 65 +--
.../api/RequestResponseSerializationTest.scala | 36 +-
.../test/scala/unit/kafka/common/ConfigTest.scala | 32 --
.../consumer/ZookeeperConsumerConnectorTest.scala | 41 +-
.../kafka/integration/AutoOffsetResetTest.scala | 17 +-
.../scala/unit/kafka/integration/FetcherTest.scala | 4 +-
.../unit/kafka/integration/PrimitiveApiTest.scala | 276 -----------
.../integration/ProducerConsumerTestHarness.scala | 48 --
.../integration/UncleanLeaderElectionTest.scala | 6 -
.../consumer/ZookeeperConsumerConnectorTest.scala | 44 +-
.../scala/unit/kafka/metrics/MetricsTest.scala | 41 +-
.../unit/kafka/producer/AsyncProducerTest.scala | 505 ---------------------
.../scala/unit/kafka/producer/ProducerTest.scala | 348 --------------
.../unit/kafka/producer/SyncProducerTest.scala | 253 -----------
.../kafka/server/AbstractFetcherThreadTest.scala | 1 -
.../kafka/server/DescribeLogDirsRequestTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 2 +-
.../server/ReplicaAlterLogDirsThreadTest.scala | 8 +-
.../kafka/server/ReplicaFetcherThreadTest.scala | 2 -
.../unit/kafka/tools/ConsoleProducerTest.scala | 10 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 160 +------
docs/upgrade.html | 5 +
.../apache/kafka/streams/scala/TopologyTest.scala | 7 +-
.../services/performance/producer_performance.py | 1 -
58 files changed, 168 insertions(+), 4402 deletions(-)
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
deleted file mode 100644
index 9cdb14b..0000000
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio._
-
-import kafka.api.ApiUtils._
-import kafka.common._
-import kafka.message._
-import org.apache.kafka.common.protocol.ApiKeys
-
-@deprecated("This object has been deprecated and will be removed in a future release.", "1.0.0")
-object ProducerRequest {
- val CurrentVersion = 2.shortValue
-
- def readFrom(buffer: ByteBuffer): ProducerRequest = {
- val versionId: Short = buffer.getShort
- val correlationId: Int = buffer.getInt
- val clientId: String = readShortString(buffer)
- val requiredAcks: Short = buffer.getShort
- val ackTimeoutMs: Int = buffer.getInt
- //build the topic structure
- val topicCount = buffer.getInt
- val partitionDataPairs = (1 to topicCount).flatMap(_ => {
- // process topic
- val topic = readShortString(buffer)
- val partitionCount = buffer.getInt
- (1 to partitionCount).map(_ => {
- val partition = buffer.getInt
- val messageSetSize = buffer.getInt
- val messageSetBuffer = new Array[Byte](messageSetSize)
- buffer.get(messageSetBuffer,0,messageSetSize)
- (TopicAndPartition(topic, partition), new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))
- })
- })
-
- ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, collection.mutable.Map(partitionDataPairs:_*))
- }
-}
-
-@deprecated("This object has been deprecated and will be removed in a future release.", "1.0.0")
-case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
- correlationId: Int,
- clientId: String,
- requiredAcks: Short,
- ackTimeoutMs: Int,
- data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])
- extends RequestOrResponse(Some(ApiKeys.PRODUCE.id)) {
-
- /**
- * Partitions the data into a map of maps (one for each topic).
- */
- private lazy val dataGroupedByTopic = data.groupBy(_._1.topic)
- val topicPartitionMessageSizeMap = data.map(r => r._1 -> r._2.sizeInBytes).toMap
-
- def this(correlationId: Int,
- clientId: String,
- requiredAcks: Short,
- ackTimeoutMs: Int,
- data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) =
- this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
-
- def writeTo(buffer: ByteBuffer) {
- buffer.putShort(versionId)
- buffer.putInt(correlationId)
- writeShortString(buffer, clientId)
- buffer.putShort(requiredAcks)
- buffer.putInt(ackTimeoutMs)
-
- //save the topic structure
- buffer.putInt(dataGroupedByTopic.size) //the number of topics
- dataGroupedByTopic.foreach {
- case (topic, topicAndPartitionData) =>
- writeShortString(buffer, topic) //write the topic
- buffer.putInt(topicAndPartitionData.size) //the number of partitions
- topicAndPartitionData.foreach(partitionAndData => {
- val partition = partitionAndData._1.partition
- val partitionMessageData = partitionAndData._2
- val bytes = partitionMessageData.buffer
- buffer.putInt(partition)
- buffer.putInt(bytes.limit())
- buffer.put(bytes)
- bytes.rewind
- })
- }
- }
-
- def sizeInBytes: Int = {
- 2 + /* versionId */
- 4 + /* correlationId */
- shortStringLength(clientId) + /* client id */
- 2 + /* requiredAcks */
- 4 + /* ackTimeoutMs */
- 4 + /* number of topics */
- dataGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
- foldedTopics +
- shortStringLength(currTopic._1) +
- 4 + /* the number of partitions */
- {
- currTopic._2.foldLeft(0)((foldedPartitions, currPartition) => {
- foldedPartitions +
- 4 + /* partition id */
- 4 + /* byte-length of serialized messages */
- currPartition._2.sizeInBytes
- })
- }
- })
- }
-
- def numPartitions = data.size
-
- override def toString: String = {
- describe(true)
- }
-
- override def describe(details: Boolean): String = {
- val producerRequest = new StringBuilder
- producerRequest.append("Name: " + this.getClass.getSimpleName)
- producerRequest.append("; Version: " + versionId)
- producerRequest.append("; CorrelationId: " + correlationId)
- producerRequest.append("; ClientId: " + clientId)
- producerRequest.append("; RequiredAcks: " + requiredAcks)
- producerRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
- if(details)
- producerRequest.append("; TopicAndPartition: " + topicPartitionMessageSizeMap.mkString(","))
- producerRequest.toString()
- }
-
- def emptyData(){
- data.clear()
- }
-}
-
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
deleted file mode 100644
index 2d3c9cc..0000000
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import kafka.message.Message
-import org.apache.kafka.common.protocol.Errors
-
-import scala.collection.Map
-import kafka.common.TopicAndPartition
-import kafka.api.ApiUtils._
-
-@deprecated("This object has been deprecated and will be removed in a future release.", "1.0.0")
-object ProducerResponse {
- // readFrom assumes that the response is written using V2 format
- def readFrom(buffer: ByteBuffer): ProducerResponse = {
- val correlationId = buffer.getInt
- val topicCount = buffer.getInt
- val statusPairs = (1 to topicCount).flatMap(_ => {
- val topic = readShortString(buffer)
- val partitionCount = buffer.getInt
- (1 to partitionCount).map(_ => {
- val partition = buffer.getInt
- val error = Errors.forCode(buffer.getShort)
- val offset = buffer.getLong
- val timestamp = buffer.getLong
- (TopicAndPartition(topic, partition), ProducerResponseStatus(error, offset, timestamp))
- })
- })
-
- val throttleTime = buffer.getInt
- ProducerResponse(correlationId, Map(statusPairs:_*), ProducerRequest.CurrentVersion, throttleTime)
- }
-}
-
-case class ProducerResponseStatus(var error: Errors, offset: Long, timestamp: Long = Message.NoTimestamp)
-
-@deprecated("This object has been deprecated and will be removed in a future release.", "1.0.0")
-case class ProducerResponse(correlationId: Int,
- status: Map[TopicAndPartition, ProducerResponseStatus],
- requestVersion: Int = 0,
- throttleTime: Int = 0)
- extends RequestOrResponse() {
-
- /**
- * Partitions the status map into a map of maps (one for each topic).
- */
- private lazy val statusGroupedByTopic = status.groupBy(_._1.topic)
-
- def hasError = status.values.exists(_.error != Errors.NONE)
-
- val sizeInBytes = {
- val throttleTimeSize = if (requestVersion > 0) 4 else 0
- val groupedStatus = statusGroupedByTopic
- 4 + /* correlation id */
- 4 + /* topic count */
- groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => {
- foldedTopics +
- shortStringLength(currTopic._1) +
- 4 + /* partition count for this topic */
- currTopic._2.size * {
- 4 + /* partition id */
- 2 + /* error code */
- 8 + /* offset */
- 8 /* timestamp */
- }
- }) +
- throttleTimeSize
- }
-
- def writeTo(buffer: ByteBuffer) {
- val groupedStatus = statusGroupedByTopic
- buffer.putInt(correlationId)
- buffer.putInt(groupedStatus.size) // topic count
-
- groupedStatus.foreach(topicStatus => {
- val (topic, errorsAndOffsets) = topicStatus
- writeShortString(buffer, topic)
- buffer.putInt(errorsAndOffsets.size) // partition count
- errorsAndOffsets.foreach {
- case((TopicAndPartition(_, partition), ProducerResponseStatus(error, nextOffset, timestamp))) =>
- buffer.putInt(partition)
- buffer.putShort(error.code)
- buffer.putLong(nextOffset)
- buffer.putLong(timestamp)
- }
- })
- // Throttle time is only supported on V1 style requests
- if (requestVersion > 0)
- buffer.putInt(throttleTime)
- }
-
- override def describe(details: Boolean):String = { toString }
-}
-
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 5573256..53f3895 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -21,16 +21,15 @@ import org.apache.kafka.common.protocol.Errors
import scala.collection._
import kafka.cluster._
import kafka.api._
-import kafka.producer._
import kafka.common.{BrokerEndPointNotAvailableException, KafkaException}
import kafka.utils.{CoreUtils, Logging}
-import java.util.Properties
import util.Random
import kafka.network.BlockingChannel
import kafka.utils.ZkUtils
import java.io.IOException
+import kafka.consumer.SimpleConsumer
import org.apache.kafka.common.security.auth.SecurityProtocol
/**
@@ -39,28 +38,32 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
object ClientUtils extends Logging {
- /**
- * Used by the producer to send a metadata request since it has access to the ProducerConfig
+ /**
+ * Send a metadata request
* @param topics The topics for which the metadata needs to be fetched
- * @param brokers The brokers in the cluster as configured on the producer through metadata.broker.list
- * @param producerConfig The producer's config
+ * @param brokers The brokers in the cluster as configured on the client
+ * @param clientId The client's identifier
* @return topic metadata response
*/
- @deprecated("This method has been deprecated and will be removed in a future release.", "0.10.0.0")
- def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
+ def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], clientId: String, timeoutMs: Int,
+ correlationId: Int = 0): TopicMetadataResponse = {
var fetchMetaDataSucceeded: Boolean = false
var i: Int = 0
- val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
+ val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, clientId,
+ topics.toSeq)
var topicMetadataResponse: TopicMetadataResponse = null
var t: Throwable = null
// shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the
// same broker
val shuffledBrokers = Random.shuffle(brokers)
while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {
- val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
- info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics))
+ val broker = shuffledBrokers(i)
+ val consumer = new SimpleConsumer(broker.host, broker.port, timeoutMs, BlockingChannel.UseDefaultBufferSize,
+ clientId)
+ info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i),
+ correlationId, topics.size, topics))
try {
- topicMetadataResponse = producer.send(topicMetadataRequest)
+ topicMetadataResponse = consumer.send(topicMetadataRequest)
fetchMetaDataSucceeded = true
}
catch {
@@ -70,10 +73,10 @@ object ClientUtils extends Logging {
t = e
} finally {
i = i + 1
- producer.close()
+ consumer.close()
}
}
- if(!fetchMetaDataSucceeded) {
+ if (!fetchMetaDataSucceeded) {
throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t)
} else {
debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))
@@ -82,23 +85,6 @@ object ClientUtils extends Logging {
}
/**
- * Used by a non-producer client to send a metadata request
- * @param topics The topics for which the metadata needs to be fetched
- * @param brokers The brokers in the cluster as configured on the client
- * @param clientId The client's identifier
- * @return topic metadata response
- */
- def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], clientId: String, timeoutMs: Int,
- correlationId: Int = 0): TopicMetadataResponse = {
- val props = new Properties()
- props.put("metadata.broker.list", brokers.map(_.connectionString).mkString(","))
- props.put("client.id", clientId)
- props.put("request.timeout.ms", timeoutMs.toString)
- val producerConfig = new ProducerConfig(props)
- fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
- }
-
- /**
* Parse a list of broker urls in the form host1:port1, host2:port2, ...
*/
def parseBrokerList(brokerListStr: String): Seq[BrokerEndPoint] = {
diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
deleted file mode 100644
index b0b40b9..0000000
--- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.javaapi.producer
-
-import kafka.producer.ProducerConfig
-import kafka.producer.KeyedMessage
-import scala.collection.mutable
-
-@deprecated("This class has been deprecated and will be removed in a future release. " +
- "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
-class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only
-{
- def this(config: ProducerConfig) = this(new kafka.producer.Producer[K,V](config))
- /**
- * Sends the data to a single topic, partitioned by key, using either the
- * synchronous or the asynchronous producer
- * @param message the producer data object that encapsulates the topic, key and message data
- */
- def send(message: KeyedMessage[K,V]) {
- underlying.send(message)
- }
-
- /**
- * Use this API to send data to multiple topics
- * @param messages list of producer data objects that encapsulate the topic, key and message data
- */
- def send(messages: java.util.List[KeyedMessage[K,V]]) {
- import collection.JavaConversions._
- underlying.send((messages: mutable.Buffer[KeyedMessage[K,V]]).toSeq: _*)
- }
-
- /**
- * Close API to close the producer pool connections to all Kafka brokers. Also closes
- * the zookeeper client connection if one exists
- */
- def close() = underlying.close()
-}
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 75eaafd..f95d0ad 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.{Gauge, MetricName}
import kafka.consumer.{ConsumerTopicStatsRegistry, FetchRequestAndResponseStatsRegistry}
-import kafka.producer.{ProducerRequestStatsRegistry, ProducerStatsRegistry, ProducerTopicStatsRegistry}
import kafka.utils.Logging
import org.apache.kafka.common.utils.Sanitizer
@@ -179,18 +178,9 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
def removeAllConsumerMetrics(clientId: String) {
FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId)
ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId)
- ProducerRequestStatsRegistry.removeProducerRequestStats(clientId)
removeAllMetricsInList(KafkaMetricsGroup.consumerMetricNameList, clientId)
}
- @deprecated("This method has been deprecated and will be removed in a future release.", "0.10.0.0")
- def removeAllProducerMetrics(clientId: String) {
- ProducerRequestStatsRegistry.removeProducerRequestStats(clientId)
- ProducerTopicStatsRegistry.removeProducerTopicStats(clientId)
- ProducerStatsRegistry.removeProducerStats(clientId)
- removeAllMetricsInList(KafkaMetricsGroup.producerMetricNameList, clientId)
- }
-
private def removeAllMetricsInList(metricNameList: immutable.List[MetricName], clientId: String) {
metricNameList.foreach(metric => {
val pattern = (".*clientId=" + clientId + ".*").r
diff --git a/core/src/main/scala/kafka/producer/BaseProducer.scala b/core/src/main/scala/kafka/producer/BaseProducer.scala
deleted file mode 100644
index 83d9aa7..0000000
--- a/core/src/main/scala/kafka/producer/BaseProducer.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer
-
-import java.util.Properties
-
-// A base producer used whenever we need to have options for both old and new producers;
-// this class will be removed once we fully rolled out 0.9
-@deprecated("This trait has been deprecated and will be removed in a future release. " +
- "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
-trait BaseProducer {
- def send(topic: String, key: Array[Byte], value: Array[Byte])
- def close()
-}
-
-@deprecated("This class has been deprecated and will be removed in a future release. " +
- "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
-class NewShinyProducer(producerProps: Properties) extends BaseProducer {
- import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
- import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-
- // decide whether to send synchronously based on producer properties
- val sync = producerProps.getProperty("producer.type", "async").equals("sync")
-
- val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
-
- override def send(topic: String, key: Array[Byte], value: Array[Byte]) {
- val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, key, value)
- if(sync) {
- this.producer.send(record).get()
- } else {
- this.producer.send(record,
- new ErrorLoggingCallback(topic, key, value, false))
- }
- }
-
- override def close() {
- this.producer.close()
- }
-}
-
-@deprecated("This class has been deprecated and will be removed in a future release. " +
- "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
-class OldProducer(producerProps: Properties) extends BaseProducer {
-
- // default to byte array partitioner
- if (producerProps.getProperty("partitioner.class") == null)
- producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName)
- val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))
-
- override def send(topic: String, key: Array[Byte], value: Array[Byte]) {
- this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topic, key, value))
- }
-
- override def close() {
- this.producer.close()
- }
-}
-
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
deleted file mode 100644
index e77a50c..0000000
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package kafka.producer
-
-import org.apache.kafka.common.protocol.Errors
-
-import collection.mutable.HashMap
-import kafka.api.TopicMetadata
-import kafka.common.KafkaException
-import kafka.utils.Logging
-import kafka.client.ClientUtils
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class BrokerPartitionInfo(producerConfig: ProducerConfig,
- producerPool: ProducerPool,
- topicPartitionInfo: HashMap[String, TopicMetadata])
- extends Logging {
- val brokerList = producerConfig.brokerList
- val brokers = ClientUtils.parseBrokerList(brokerList)
-
- /**
- * Return a sequence of (brokerId, numPartitions).
- * @param topic the topic for which this information is to be returned
- * @return a sequence of (brokerId, numPartitions). Returns a zero-length
- * sequence if no brokers are available.
- */
- def getBrokerPartitionInfo(topic: String, correlationId: Int): Seq[PartitionAndLeader] = {
- debug("Getting broker partition info for topic %s".format(topic))
- // check if the cache has metadata for this topic
- val topicMetadata = topicPartitionInfo.get(topic)
- val metadata: TopicMetadata =
- topicMetadata match {
- case Some(m) => m
- case None =>
- // refresh the topic metadata cache
- updateInfo(Set(topic), correlationId)
- val topicMetadata = topicPartitionInfo.get(topic)
- topicMetadata match {
- case Some(m) => m
- case None => throw new KafkaException("Failed to fetch topic metadata for topic: " + topic)
- }
- }
- val partitionMetadata = metadata.partitionsMetadata
- if(partitionMetadata.isEmpty) {
- if(metadata.error != Errors.NONE) {
- throw new KafkaException(metadata.error.exception)
- } else {
- throw new KafkaException("Topic metadata %s has empty partition metadata and no error code".format(metadata))
- }
- }
- partitionMetadata.map { m =>
- m.leader match {
- case Some(leader) =>
- debug("Partition [%s,%d] has leader %d".format(topic, m.partitionId, leader.id))
- new PartitionAndLeader(topic, m.partitionId, Some(leader.id))
- case None =>
- debug("Partition [%s,%d] does not have a leader yet".format(topic, m.partitionId))
- new PartitionAndLeader(topic, m.partitionId, None)
- }
- }.sortWith((s, t) => s.partitionId < t.partitionId)
- }
-
- /**
- * It updates the cache by issuing a get topic metadata request to a random broker.
- * @param topics the topics for which the metadata is to be fetched
- */
- def updateInfo(topics: Set[String], correlationId: Int) {
- var topicsMetadata: Seq[TopicMetadata] = Nil
- val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
- topicsMetadata = topicMetadataResponse.topicsMetadata
- // throw partition specific exception
- topicsMetadata.foreach(tmd =>{
- trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
- if(tmd.error == Errors.NONE) {
- topicPartitionInfo.put(tmd.topic, tmd)
- } else
- warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, tmd.error.exception.getClass))
- tmd.partitionsMetadata.foreach(pmd =>{
- if (pmd.error != Errors.NONE && pmd.error == Errors.LEADER_NOT_AVAILABLE) {
- warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
- pmd.error.exception.getClass))
- } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
- })
- })
- producerPool.updateProducer(topicsMetadata)
- }
-
-}
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int])
diff --git a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
deleted file mode 100755
index 7848456..0000000
--- a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer
-
-
-import kafka.utils._
-import org.apache.kafka.common.utils.Utils
-
-@deprecated("This class has been deprecated and will be removed in a future release. " +
- "It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0")
-class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner {
- def partition(key: Any, numPartitions: Int): Int = {
- Utils.abs(java.util.Arrays.hashCode(key.asInstanceOf[Array[Byte]])) % numPartitions
- }
-}
diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
deleted file mode 100755
index f793811..0000000
--- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer
-
-
-import kafka.utils._
-import org.apache.kafka.common.utils.Utils
-
-@deprecated("This class has been deprecated and will be removed in a future release. " +
- "It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0")
-class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
- def partition(key: Any, numPartitions: Int): Int = {
- Utils.abs(key.hashCode) % numPartitions
- }
-}
diff --git a/core/src/main/scala/kafka/producer/KeyedMessage.scala b/core/src/main/scala/kafka/producer/KeyedMessage.scala
deleted file mode 100644
index 84ea232..0000000
--- a/core/src/main/scala/kafka/producer/KeyedMessage.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer
-
-/**
- * A topic, key, and value.
- * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.
- */
-@deprecated("This class has been deprecated and will be removed in a future release. " +
- "Please use org.apache.kafka.clients.producer.ProducerRecord instead.", "0.10.0.0")
-case class KeyedMessage[K, V](topic: String, key: K, partKey: Any, message: V) {
- if(topic == null)
- throw new IllegalArgumentException("Topic cannot be null.")
-
- def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)
-
- def this(topic: String, key: K, message: V) = this(topic, key, key, message)
-
- def partitionKey = {
- if(partKey != null)
- partKey
- else if(hasKey)
- key
- else
- null
- }
-
- def hasKey = key != null
-}
diff --git a/core/src/main/scala/kafka/producer/Partitioner.scala b/core/src/main/scala/kafka/producer/Partitioner.scala
deleted file mode 100644
index 5d24692..0000000
--- a/core/src/main/scala/kafka/producer/Partitioner.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package kafka.producer
-
-/**
- * A partitioner controls the mapping between user-provided keys and kafka partitions. Users can implement a custom
- * partitioner to change this mapping.
- *
- * Implementations will be constructed via reflection and are required to have a constructor that takes a single
- * VerifiableProperties instance--this allows passing configuration properties into the partitioner implementation.
- */
-@deprecated("This trait has been deprecated and will be removed in a future release. " +
- "Please use org.apache.kafka.clients.producer.Partitioner instead.", "0.10.0.0")
-trait Partitioner {
- /**
- * Uses the key to calculate a partition bucket id for routing
- * the data to the appropriate broker partition
- * @return an integer between 0 and numPartitions-1
- */
- def partition(key: Any, numPartitions: Int): Int
-}
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
deleted file mode 100755
index d6cf4c8..0000000
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.producer
-
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
-
-import kafka.common.{AppInfo, QueueFullException}
-import kafka.metrics._
-import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThread}
-import kafka.serializer.Encoder
-import kafka.utils._
-
-@deprecated("This class has been deprecated and will be removed in a future release. " +
- "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
-class Producer[K,V](val config: ProducerConfig,
- private val eventHandler: EventHandler[K,V]) // only for unit testing
- extends Logging {
-
- private val hasShutdown = new AtomicBoolean(false)
- private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
-
- private var sync: Boolean = true
- private var producerSendThread: ProducerSendThread[K,V] = null
- private val lock = new Object()
-
- config.producerType match {
- case "sync" =>
- case "async" =>
- sync = false
- producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
- queue,
- eventHandler,
- config.queueBufferingMaxMs,
- config.batchNumMessages,
- config.clientId)
- producerSendThread.start()
- }
-
- private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
-
- KafkaMetricsReporter.startReporters(config.props)
- AppInfo.registerInfo()
-
- def this(config: ProducerConfig) =
- this(config,
- new DefaultEventHandler[K,V](config,
- CoreUtils.createObject[Partitioner](config.partitionerClass, config.props),
- CoreUtils.createObject[Encoder[V]](config.serializerClass, config.props),
- CoreUtils.createObject[Encoder[K]](config.keySerializerClass, config.props),
- new ProducerPool(config)))
-
- /**
- * Sends the data, partitioned by key to the topic using either the
- * synchronous or the asynchronous producer
- * @param messages the producer data object that encapsulates the topic, key and message data
- */
- def send(messages: KeyedMessage[K,V]*) {
- lock synchronized {
- if (hasShutdown.get)
- throw new ProducerClosedException
- recordStats(messages)
- if (sync)
- eventHandler.handle(messages)
- else
- asyncSend(messages)
- }
- }
-
-
- private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
- for (message <- messages) {
- producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark()
- producerTopicStats.getProducerAllTopicsStats.messageRate.mark()
- }
- }
-
- private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
- for (message <- messages) {
- val added = config.queueEnqueueTimeoutMs match {
- case 0 =>
- queue.offer(message)
- case _ =>
- try {
- if (config.queueEnqueueTimeoutMs < 0) {
- queue.put(message)
- true
- } else {
- queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
- }
- }
- catch {
- case _: InterruptedException =>
- false
- }
- }
- if(!added) {
- producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
- producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
- throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
- }else {
- trace("Added to send queue an event: " + message.toString)
- trace("Remaining queue size: " + queue.remainingCapacity)
- }
- }
- }
-
- /**
- * Close API to close the producer pool connections to all Kafka brokers. Also closes
- * the zookeeper client connection if one exists
- */
- def close() = {
- lock synchronized {
- val canShutdown = hasShutdown.compareAndSet(false, true)
- if(canShutdown) {
- info("Shutting down producer")
- val startTime = System.nanoTime()
- KafkaMetricsGroup.removeAllProducerMetrics(config.clientId)
- if (producerSendThread != null)
- producerSendThread.shutdown
- eventHandler.close()
- info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")
- }
- }
- }
-}
-
-
diff --git a/core/src/main/scala/kafka/producer/ProducerClosedException.scala b/core/src/main/scala/kafka/producer/ProducerClosedException.scala
deleted file mode 100644
index 4f2f731..0000000
--- a/core/src/main/scala/kafka/producer/ProducerClosedException.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class ProducerClosedException() extends RuntimeException("producer already closed") {
-}
diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala
deleted file mode 100755
index c2715d0..0000000
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer
-
-import async.AsyncProducerConfig
-import java.util.Properties
-import kafka.utils.{CoreUtils, VerifiableProperties}
-import kafka.message.NoCompressionCodec
-import kafka.common.{InvalidConfigException, Config}
-
-@deprecated("This object has been deprecated and will be removed in a future release. " +
- "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
-object ProducerConfig extends Config {
- def validate(config: ProducerConfig) {
- validateClientId(config.clientId)
- validateBatchSize(config.batchNumMessages, config.queueBufferingMaxMessages)
- validateProducerType(config.producerType)
- }
-
- def validateClientId(clientId: String) {
- validateChars("client.id", clientId)
- }
-
- def validateBatchSize(batchSize: Int, queueSize: Int) {
- if (batchSize > queueSize)
- throw new InvalidConfigException("Batch size = " + batchSize + " can't be larger than queue size = " + queueSize)
- }
-
- def validateProducerType(producerType: String) {
- producerType match {
- case "sync" =>
- case "async"=>
- case _ => throw new InvalidConfigException("Invalid value " + producerType + " for producer.type, valid values are sync/async")
- }
- }
-}
-
-@deprecated("This class has been deprecated and will be removed in a future release. " +
- "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
-class ProducerConfig private (val props: VerifiableProperties)
- extends AsyncProducerConfig with SyncProducerConfigShared {
- import ProducerConfig._
-
- def this(originalProps: Properties) {
- this(new VerifiableProperties(originalProps))
- props.verify()
- }
-
- /** This is for bootstrapping and the producer will only use it for getting metadata
- * (topics, partitions and replicas). The socket connections for sending the actual data
- * will be established based on the broker information returned in the metadata. The
- * format is host1:port1,host2:port2, and the list can be a subset of brokers or
- * a VIP pointing to a subset of brokers.
- */
- val brokerList = props.getString("metadata.broker.list")
-
- /** the partitioner class for partitioning events amongst sub-topics */
- val partitionerClass = props.getString("partitioner.class", "kafka.producer.DefaultPartitioner")
-
- /** this parameter specifies whether the messages are sent asynchronously *
- * or not. Valid values are - async for asynchronous send *
- * sync for synchronous send */
- val producerType = props.getString("producer.type", "sync")
-
- /**
- * This parameter allows you to specify the compression codec for all data generated *
- * by this producer. The default is NoCompressionCodec
- */
- val compressionCodec = props.getCompressionCodec("compression.codec", NoCompressionCodec)
-
- /** This parameter allows you to set whether compression should be turned *
- * on for particular topics
- *
- * If the compression codec is anything other than NoCompressionCodec,
- *
- * Enable compression only for specified topics if any
- *
- * If the list of compressed topics is empty, then enable the specified compression codec for all topics
- *
- * If the compression codec is NoCompressionCodec, compression is disabled for all topics
- */
- val compressedTopics = CoreUtils.parseCsvList(props.getString("compressed.topics", null))
-
- /** The leader may be unavailable transiently, which can fail the sending of a message.
- * This property specifies the number of retries when such failures occur.
- */
- val messageSendMaxRetries = props.getInt("message.send.max.retries", 3)
-
- /** Before each retry, the producer refreshes the metadata of relevant topics. Since leader
- * election takes a bit of time, this property specifies the amount of time that the producer
- * waits before refreshing the metadata.
- */
- val retryBackoffMs = props.getInt("retry.backoff.ms", 100)
-
- /**
- * The producer generally refreshes the topic metadata from brokers when there is a failure
- * (partition missing, leader not available...). It will also poll regularly (default: every 10min
- * so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure.
- * If you set this to zero, the metadata will get refreshed after each message sent (not recommended)
- * Important note: the refresh happen only AFTER the message is sent, so if the producer never sends
- * a message the metadata is never refreshed
- */
- val topicMetadataRefreshIntervalMs = props.getInt("topic.metadata.refresh.interval.ms", 600000)
-
- validate(this)
-}
diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala
deleted file mode 100644
index 6d4e4b7..0000000
--- a/core/src/main/scala/kafka/producer/ProducerPool.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer
-
-import java.util.Properties
-
-import kafka.api.TopicMetadata
-import kafka.cluster.BrokerEndPoint
-import kafka.common.UnavailableProducerException
-import kafka.utils.Logging
-import kafka.utils.Implicits._
-
-import scala.collection.mutable.HashMap
-
-@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
-object ProducerPool {
- /**
- * Used in ProducerPool to initiate a SyncProducer connection with a broker.
- */
- def createSyncProducer(config: ProducerConfig, broker: BrokerEndPoint): SyncProducer = {
- val props = new Properties()
- props.put("host", broker.host)
- props.put("port", broker.port.toString)
- props ++= config.props.props
- new SyncProducer(new SyncProducerConfig(props))
- }
-}
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class ProducerPool(val config: ProducerConfig) extends Logging {
- private val syncProducers = new HashMap[Int, SyncProducer]
- private val lock = new Object()
-
- def updateProducer(topicMetadata: Seq[TopicMetadata]) {
- val newBrokers = new collection.mutable.HashSet[BrokerEndPoint]
- topicMetadata.foreach(tmd => {
- tmd.partitionsMetadata.foreach(pmd => {
- if(pmd.leader.isDefined) {
- newBrokers += pmd.leader.get
- }
- })
- })
- lock synchronized {
- newBrokers.foreach(b => {
- if(syncProducers.contains(b.id)){
- syncProducers(b.id).close()
- syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
- } else
- syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
- })
- }
- }
-
- def getProducer(brokerId: Int) : SyncProducer = {
- lock.synchronized {
- val producer = syncProducers.get(brokerId)
- producer match {
- case Some(p) => p
- case None => throw new UnavailableProducerException("Sync producer for broker id %d does not exist".format(brokerId))
- }
- }
- }
-
- /**
- * Closes all the producers in the pool
- */
- def close() = {
- lock.synchronized {
- info("Closing all sync producers")
- val iter = syncProducers.values.iterator
- while(iter.hasNext)
- iter.next.close
- }
- }
-}
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
deleted file mode 100644
index 92bbbcf..0000000
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.producer
-
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
-import java.util.concurrent.TimeUnit
-import kafka.utils.Pool
-import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker}
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup {
- val tags = metricId match {
- case ClientIdAndBroker(clientId, brokerHost, brokerPort) => Map("clientId" -> clientId, "brokerHost" -> brokerHost, "brokerPort" -> brokerPort.toString)
- case ClientIdAllBrokers(clientId) => Map("clientId" -> clientId)
- }
-
- val requestTimer = new KafkaTimer(newTimer("ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags))
- val requestSizeHist = newHistogram("ProducerRequestSize", biased = true, tags)
- val throttleTimeStats = newTimer("ProducerRequestThrottleRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags)
-}
-
-/**
- * Tracks metrics of requests made by a given producer client to all brokers.
- * @param clientId ClientId of the given producer
- */
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class ProducerRequestStats(clientId: String) {
- private val valueFactory = (k: ClientIdBroker) => new ProducerRequestMetrics(k)
- private val stats = new Pool[ClientIdBroker, ProducerRequestMetrics](Some(valueFactory))
- private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAllBrokers(clientId))
-
- def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats
-
- def getProducerRequestStats(brokerHost: String, brokerPort: Int): ProducerRequestMetrics = {
- stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerHost, brokerPort))
- }
-}
-
-/**
- * Stores the request stats information of each producer client in a (clientId -> ProducerRequestStats) map.
- */
-@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
-object ProducerRequestStatsRegistry {
- private val valueFactory = (k: String) => new ProducerRequestStats(k)
- private val globalStats = new Pool[String, ProducerRequestStats](Some(valueFactory))
-
- def getProducerRequestStats(clientId: String) = {
- globalStats.getAndMaybePut(clientId)
- }
-
- def removeProducerRequestStats(clientId: String) {
- globalStats.remove(clientId)
- }
-}
-
diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala
deleted file mode 100644
index 9466f26..0000000
--- a/core/src/main/scala/kafka/producer/ProducerStats.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.producer
-
-import kafka.metrics.KafkaMetricsGroup
-import java.util.concurrent.TimeUnit
-import kafka.utils.Pool
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class ProducerStats(clientId: String) extends KafkaMetricsGroup {
- val tags: Map[String, String] = Map("clientId" -> clientId)
- val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS, tags)
- val resendRate = newMeter("ResendsPerSec", "resends", TimeUnit.SECONDS, tags)
- val failedSendRate = newMeter("FailedSendsPerSec", "failed sends", TimeUnit.SECONDS, tags)
-}
-
-/**
- * Stores metrics of serialization and message sending activity of each producer client in a (clientId -> ProducerStats) map.
- */
-@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
-object ProducerStatsRegistry {
- private val valueFactory = (k: String) => new ProducerStats(k)
- private val statsRegistry = new Pool[String, ProducerStats](Some(valueFactory))
-
- def getProducerStats(clientId: String) = {
- statsRegistry.getAndMaybePut(clientId)
- }
-
- def removeProducerStats(clientId: String) {
- statsRegistry.remove(clientId)
- }
-}
diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
deleted file mode 100644
index 7bb9610..0000000
--- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.producer
-
-import kafka.metrics.KafkaMetricsGroup
-import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic}
-import kafka.utils.{Pool, threadsafe}
-import java.util.concurrent.TimeUnit
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-@threadsafe
-class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
- val tags = metricId match {
- case ClientIdAndTopic(clientId, topic) => Map("clientId" -> clientId, "topic" -> topic)
- case ClientIdAllTopics(clientId) => Map("clientId" -> clientId)
- }
-
- val messageRate = newMeter("MessagesPerSec", "messages", TimeUnit.SECONDS, tags)
- val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
- val droppedMessageRate = newMeter("DroppedMessagesPerSec", "drops", TimeUnit.SECONDS, tags)
-}
-
-/**
- * Tracks metrics for each topic the given producer client has produced data to.
- * @param clientId The clientId of the given producer client.
- */
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class ProducerTopicStats(clientId: String) {
- private val valueFactory = (k: ClientIdTopic) => new ProducerTopicMetrics(k)
- private val stats = new Pool[ClientIdTopic, ProducerTopicMetrics](Some(valueFactory))
- private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAllTopics(clientId)) // to differentiate from a topic named AllTopics
-
- def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats
-
- def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
- stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
- }
-}
-
-/**
- * Stores the topic stats information of each producer client in a (clientId -> ProducerTopicStats) map.
- */
-@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
-object ProducerTopicStatsRegistry {
- private val valueFactory = (k: String) => new ProducerTopicStats(k)
- private val globalStats = new Pool[String, ProducerTopicStats](Some(valueFactory))
-
- def getProducerTopicStats(clientId: String) = {
- globalStats.getAndMaybePut(clientId)
- }
-
- def removeProducerTopicStats(clientId: String) {
- globalStats.remove(clientId)
- }
-}
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
deleted file mode 100644
index b132293..0000000
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer
-
-import java.util.Random
-import java.util.concurrent.TimeUnit
-
-import kafka.api._
-import kafka.network.{RequestOrResponseSend, BlockingChannel}
-import kafka.utils._
-import org.apache.kafka.common.network.NetworkReceive
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.utils.Utils._
-
-@deprecated("This object has been deprecated and will be removed in a future release. " +
- "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
-object SyncProducer {
- val RequestKey: Short = 0
- val randomGenerator = new Random
-}
-
-/*
- * Send a message set.
- */
-@threadsafe
-@deprecated("This class has been deprecated and will be removed in a future release. " +
- "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
-class SyncProducer(val config: SyncProducerConfig) extends Logging {
-
- private val lock = new Object()
- @volatile private var shutdown: Boolean = false
- private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
- config.sendBufferBytes, config.requestTimeoutMs)
- val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
-
- trace("Instantiating Scala Sync Producer with properties: %s".format(config.props))
-
- private def verifyRequest(request: RequestOrResponse) = {
- /**
- * This seems a little convoluted, but the idea is to turn on verification simply changing log4j settings
- * Also, when verification is turned on, care should be taken to see that the logs don't fill up with unnecessary
- * data. So, leaving the rest of the logging at TRACE, while errors should be logged at ERROR level
- */
- if (isDebugEnabled) {
- val buffer = new RequestOrResponseSend("", request).buffer
- trace("verifying sendbuffer of size " + buffer.limit())
- val requestTypeId = buffer.getShort()
- if(requestTypeId == ApiKeys.PRODUCE.id) {
- val request = ProducerRequest.readFrom(buffer)
- trace(request.toString)
- }
- }
- }
-
- /**
- * Common functionality for the public send methods
- */
- private def doSend(request: RequestOrResponse, readResponse: Boolean = true): NetworkReceive = {
- lock synchronized {
- verifyRequest(request)
- getOrMakeConnection()
-
- var response: NetworkReceive = null
- try {
- blockingChannel.send(request)
- if(readResponse)
- response = blockingChannel.receive()
- else
- trace("Skipping reading response")
- } catch {
- case e: java.io.IOException =>
- // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
- disconnect()
- throw e
- case e: Throwable => throw e
- }
- response
- }
- }
-
- /**
- * Send a message. If the producerRequest had required.request.acks=0, then the
- * returned response object is null
- */
- def send(producerRequest: ProducerRequest): ProducerResponse = {
- val requestSize = producerRequest.sizeInBytes
- producerRequestStats.getProducerRequestStats(config.host, config.port).requestSizeHist.update(requestSize)
- producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize)
-
- var response: NetworkReceive = null
- val specificTimer = producerRequestStats.getProducerRequestStats(config.host, config.port).requestTimer
- val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer
- aggregateTimer.time {
- specificTimer.time {
- response = doSend(producerRequest, producerRequest.requiredAcks != 0)
- }
- }
- if(producerRequest.requiredAcks != 0) {
- val producerResponse = ProducerResponse.readFrom(response.payload)
- producerRequestStats.getProducerRequestStats(config.host, config.port).throttleTimeStats.update(producerResponse.throttleTime, TimeUnit.MILLISECONDS)
- producerRequestStats.getProducerRequestAllBrokersStats.throttleTimeStats.update(producerResponse.throttleTime, TimeUnit.MILLISECONDS)
- producerResponse
- }
- else
- null
- }
-
- def send(request: TopicMetadataRequest): TopicMetadataResponse = {
- val response = doSend(request)
- TopicMetadataResponse.readFrom(response.payload)
- }
-
- def close() = {
- lock synchronized {
- disconnect()
- shutdown = true
- }
- }
-
- /**
- * Disconnect from current channel, closing connection.
- * Side effect: channel field is set to null on successful disconnect
- */
- private def disconnect() {
- try {
- info("Disconnecting from " + formatAddress(config.host, config.port))
- blockingChannel.disconnect()
- } catch {
- case e: Exception => error("Error on disconnect: ", e)
- }
- }
-
- private def connect(): BlockingChannel = {
- if (!blockingChannel.isConnected && !shutdown) {
- try {
- blockingChannel.connect()
- info("Connected to " + formatAddress(config.host, config.port) + " for producing")
- } catch {
- case e: Exception => {
- disconnect()
- error("Producer connection to " + formatAddress(config.host, config.port) + " unsuccessful", e)
- throw e
- }
- }
- }
- blockingChannel
- }
-
- private def getOrMakeConnection() {
- if(!blockingChannel.isConnected) {
- connect()
- }
- }
-}
diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
deleted file mode 100644
index 207779c..0000000
--- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer
-
-import java.util.Properties
-import kafka.utils.VerifiableProperties
-
-@deprecated("This class has been deprecated and will be removed in a future release. " +
- "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
-class SyncProducerConfig private (val props: VerifiableProperties) extends SyncProducerConfigShared {
- def this(originalProps: Properties) {
- this(new VerifiableProperties(originalProps))
- // no need to verify the property since SyncProducerConfig is supposed to be used internally
- }
-
- /** the broker to which the producer sends events */
- val host = props.getString("host")
-
- /** the port on which the broker is running */
- val port = props.getInt("port")
-}
-
-@deprecated("This trait has been deprecated and will be removed in a future release. " +
- "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
-trait SyncProducerConfigShared {
- val props: VerifiableProperties
-
- val sendBufferBytes = props.getInt("send.buffer.bytes", 100*1024)
-
- /* the client application sending the producer requests */
- val clientId = props.getString("client.id", SyncProducerConfig.DefaultClientId)
-
- /*
- * The number of acknowledgments the producer requires the leader to have received before considering a request complete.
- * This controls the durability of the messages sent by the producer.
- *
- * request.required.acks = 0 - means the producer will not wait for any acknowledgement from the leader.
- * request.required.acks = 1 - means the leader will write the message to its local log and immediately acknowledge
- * request.required.acks = -1 - means the leader will wait for acknowledgement from all in-sync replicas before acknowledging the write
- */
-
- val requestRequiredAcks = props.getShortInRange("request.required.acks", SyncProducerConfig.DefaultRequiredAcks,(-1,1))
-
- /*
- * The ack timeout of the producer requests. Value must be non-negative and non-zero
- */
- val requestTimeoutMs = props.getIntInRange("request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs,
- (1, Integer.MAX_VALUE))
-}
-
-@deprecated("This object has been deprecated and will be removed in a future release. " +
- "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
-object SyncProducerConfig {
- val DefaultClientId = ""
- val DefaultRequiredAcks : Short = 0
- val DefaultAckTimeoutMs = 10000
-}
diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
deleted file mode 100644
index cc3a79d..0000000
--- a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package kafka.producer.async
-
-import kafka.utils.VerifiableProperties
-
-@deprecated("This trait has been deprecated and will be removed in a future release. " +
- "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
-trait AsyncProducerConfig {
- val props: VerifiableProperties
-
- /* maximum time, in milliseconds, for buffering data on the producer queue */
- val queueBufferingMaxMs = props.getInt("queue.buffering.max.ms", 5000)
-
- /** the maximum size of the blocking queue for buffering on the producer */
- val queueBufferingMaxMessages = props.getInt("queue.buffering.max.messages", 10000)
-
- /**
- * Timeout for event enqueue:
- * 0: events will be enqueued immediately or dropped if the queue is full
- * -ve: enqueue will block indefinitely if the queue is full
- * +ve: enqueue will block up to this many milliseconds if the queue is full
- */
- val queueEnqueueTimeoutMs = props.getInt("queue.enqueue.timeout.ms", -1)
-
- /** the number of messages batched at the producer */
- val batchNumMessages = props.getInt("batch.num.messages", 200)
-
- /** the serializer class for values */
- val serializerClass = props.getString("serializer.class", "kafka.serializer.DefaultEncoder")
-
- /** the serializer class for keys (defaults to the same as for values) */
- val keySerializerClass = props.getString("key.serializer.class", serializerClass)
-
-}
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
deleted file mode 100755
index 8c7465f..0000000
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ /dev/null
@@ -1,359 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer.async
-
-import kafka.common._
-import kafka.message.{ByteBufferMessageSet, Message, NoCompressionCodec}
-import kafka.producer._
-import kafka.serializer.Encoder
-import kafka.utils._
-import org.apache.kafka.common.errors.{LeaderNotAvailableException, UnknownTopicOrPartitionException}
-import org.apache.kafka.common.protocol.Errors
-
-import scala.util.Random
-import scala.collection.{Map, Seq}
-import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
-import java.util.concurrent.atomic._
-
-import kafka.api.{ProducerRequest, TopicMetadata}
-import org.apache.kafka.common.utils.{Time, Utils}
-import org.slf4j.event.Level
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class DefaultEventHandler[K,V](config: ProducerConfig,
- private val partitioner: Partitioner,
- private val encoder: Encoder[V],
- private val keyEncoder: Encoder[K],
- private val producerPool: ProducerPool,
- private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata],
- private val time: Time = Time.SYSTEM)
- extends EventHandler[K,V] with Logging {
-
- val isSync = ("sync" == config.producerType)
-
- val correlationId = new AtomicInteger(0)
- val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
-
- private val topicMetadataRefreshInterval = config.topicMetadataRefreshIntervalMs
- private var lastTopicMetadataRefreshTime = 0L
- private val topicMetadataToRefresh = Set.empty[String]
- private val sendPartitionPerTopicCache = HashMap.empty[String, Int]
-
- private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)
- private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
-
- def handle(events: Seq[KeyedMessage[K,V]]) {
- val serializedData = serialize(events)
- serializedData.foreach {
- keyed =>
- val dataSize = keyed.message.payloadSize
- producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
- producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize)
- }
- var outstandingProduceRequests = serializedData
- var remainingRetries = config.messageSendMaxRetries + 1
- val correlationIdStart = correlationId.get()
- debug("Handling %d events".format(events.size))
- while (remainingRetries > 0 && outstandingProduceRequests.nonEmpty) {
- topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
- if (topicMetadataRefreshInterval >= 0 &&
- Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
- CoreUtils.swallow(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement), this, Level.ERROR)
- sendPartitionPerTopicCache.clear()
- topicMetadataToRefresh.clear
- lastTopicMetadataRefreshTime = Time.SYSTEM.milliseconds
- }
- outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
- if (outstandingProduceRequests.nonEmpty) {
- info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
- // back off and update the topic metadata cache before attempting another send operation
- Thread.sleep(config.retryBackoffMs)
- // get topics of the outstanding produce requests and refresh metadata for those
- CoreUtils.swallow(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement), this, Level.ERROR)
- sendPartitionPerTopicCache.clear()
- remainingRetries -= 1
- producerStats.resendRate.mark()
- }
- }
- if(outstandingProduceRequests.nonEmpty) {
- producerStats.failedSendRate.mark()
- val correlationIdEnd = correlationId.get()
- error("Failed to send requests for topics %s with correlation ids in [%d,%d]"
- .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","),
- correlationIdStart, correlationIdEnd-1))
- throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)
- }
- }
-
- private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
- val partitionedDataOpt = partitionAndCollate(messages)
- partitionedDataOpt match {
- case Some(partitionedData) =>
- val failedProduceRequests = new ArrayBuffer[KeyedMessage[K, Message]]
- for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
- if (isTraceEnabled) {
- messagesPerBrokerMap.foreach(partitionAndEvent =>
- trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
- }
- val messageSetPerBrokerOpt = groupMessagesToSet(messagesPerBrokerMap)
- messageSetPerBrokerOpt match {
- case Some(messageSetPerBroker) =>
- val failedTopicPartitions = send(brokerid, messageSetPerBroker)
- failedTopicPartitions.foreach(topicPartition => {
- messagesPerBrokerMap.get(topicPartition).foreach(failedProduceRequests.appendAll)
- })
- case None => // failed to group messages
- messagesPerBrokerMap.values.foreach(m => failedProduceRequests.appendAll(m))
- }
- }
- failedProduceRequests
- case None => // failed to collate messages
- messages
- }
- }
-
- def serialize(events: Seq[KeyedMessage[K,V]]): Seq[KeyedMessage[K,Message]] = {
- val serializedMessages = new ArrayBuffer[KeyedMessage[K,Message]](events.size)
- events.foreach{e =>
- try {
- if(e.hasKey)
- serializedMessages += new KeyedMessage[K,Message](
- topic = e.topic,
- key = e.key,
- partKey = e.partKey,
- message = new Message(key = keyEncoder.toBytes(e.key),
- bytes = encoder.toBytes(e.message),
- timestamp = time.milliseconds,
- magicValue = Message.MagicValue_V1))
- else
- serializedMessages += new KeyedMessage[K,Message](
- topic = e.topic,
- key = e.key,
- partKey = e.partKey,
- message = new Message(bytes = encoder.toBytes(e.message),
- timestamp = time.milliseconds,
- magicValue = Message.MagicValue_V1))
- } catch {
- case t: Throwable =>
- producerStats.serializationErrorRate.mark()
- if (isSync) {
- throw t
- } else {
- // currently, if in async mode, we just log the serialization error. We need to revisit
- // this when doing kafka-496
- error("Error serializing message for topic %s".format(e.topic), t)
- }
- }
- }
- serializedMessages
- }
-
- def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
- val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
- try {
- for (message <- messages) {
- val topicPartitionsList = getPartitionListForTopic(message)
- val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)
- val brokerPartition = topicPartitionsList(partitionIndex)
-
- // postpone the failure until the send operation, so that requests for other brokers are handled correctly
- val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)
-
- var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null
- ret.get(leaderBrokerId) match {
- case Some(element) =>
- dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
- case None =>
- dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
- ret.put(leaderBrokerId, dataPerBroker)
- }
-
- val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)
- var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null
- dataPerBroker.get(topicAndPartition) match {
- case Some(element) =>
- dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]
- case None =>
- dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]
- dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
- }
- dataPerTopicPartition.append(message)
- }
- Some(ret)
- }catch { // Swallow recoverable exceptions and return None so that they can be retried.
- case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
- case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
- case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
- }
- }
-
- private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = {
- val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, correlationId.getAndIncrement)
- debug("Broker partitions registered for topic: %s are %s"
- .format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
- val totalNumPartitions = topicPartitionsList.length
- if(totalNumPartitions == 0)
- throw new NoBrokersForPartitionException("Partition key = " + m.key)
- topicPartitionsList
- }
-
- /**
- * Retrieves the partition id and throws an UnknownTopicOrPartitionException if
- * the value of partition is not between 0 and numPartitions-1
- * @param topic The topic
- * @param key the partition key
- * @param topicPartitionList the list of available partitions
- * @return the partition id
- */
- private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
- val numPartitions = topicPartitionList.size
- if(numPartitions <= 0)
- throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
- val partition =
- if(key == null) {
- // If the key is null, we don't really need a partitioner
- // So we look up in the send partition cache for the topic to decide the target partition
- val id = sendPartitionPerTopicCache.get(topic)
- id match {
- case Some(partitionId) =>
- // directly return the partitionId without checking availability of the leader,
- // since we want to postpone the failure until the send operation anyways
- partitionId
- case None =>
- val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
- if (availablePartitions.isEmpty)
- throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
- val index = Utils.abs(Random.nextInt) % availablePartitions.size
- val partitionId = availablePartitions(index).partitionId
- sendPartitionPerTopicCache.put(topic, partitionId)
- partitionId
- }
- } else
- partitioner.partition(key, numPartitions)
- if(partition < 0 || partition >= numPartitions)
- throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
- "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
- trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))
- partition
- }
-
- /**
- * Constructs and sends the produce request based on a map from (topic, partition) -> messages
- *
- * @param brokerId the broker that will receive the request
- * @param messagesPerTopic the messages as a map from (topic, partition) -> messages
- * @return the set (topic, partitions) messages which incurred an error sending or processing
- */
- private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
- if(brokerId < 0) {
- warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.keys.mkString(",")))
- messagesPerTopic.keys.toSeq
- } else if(messagesPerTopic.nonEmpty) {
- val currentCorrelationId = correlationId.getAndIncrement
- val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
- config.requestTimeoutMs, messagesPerTopic)
- var failedTopicPartitions = Seq.empty[TopicAndPartition]
- try {
- val syncProducer = producerPool.getProducer(brokerId)
- debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d"
- .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
- val response = syncProducer.send(producerRequest)
- debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d"
- .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
- if(response != null) {
- if (response.status.size != producerRequest.data.size)
- throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))
- if (isTraceEnabled) {
- val successfullySentData = response.status.filter(_._2.error == Errors.NONE)
- successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
- trace("Successfully sent message: %s".format(if(message.message.isNull) null else message.message.toString()))))
- }
- val failedPartitionsAndStatus = response.status.filter(_._2.error != Errors.NONE).toSeq
- failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
- if(failedTopicPartitions.nonEmpty) {
- val errorString = failedPartitionsAndStatus
- .sortWith((p1, p2) => p1._1.topic.compareTo(p2._1.topic) < 0 ||
- (p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition))
- .map{
- case(topicAndPartition, status) =>
- topicAndPartition.toString + ": " + status.error.exceptionName
- }.mkString(",")
- warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString))
- }
- failedTopicPartitions
- } else {
- Seq.empty[TopicAndPartition]
- }
- } catch {
- case t: Throwable =>
- warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s"
- .format(currentCorrelationId, brokerId, messagesPerTopic.keys.mkString(",")), t)
- messagesPerTopic.keys.toSeq
- }
- } else {
- List.empty
- }
- }
-
- private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]) = {
- /** enforce the compressed.topics config here.
- * If the compression codec is anything other than NoCompressionCodec,
- * Enable compression only for specified topics if any
- * If the list of compressed topics is empty, then enable the specified compression codec for all topics
- * If the compression codec is NoCompressionCodec, compression is disabled for all topics
- */
- try {
- val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
- val rawMessages = messages.map(_.message)
- (topicAndPartition,
- config.compressionCodec match {
- case NoCompressionCodec =>
- debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
- new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
- case _ =>
- config.compressedTopics.size match {
- case 0 =>
- debug("Sending %d messages with compression codec %d to %s"
- .format(messages.size, config.compressionCodec.codec, topicAndPartition))
- new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
- case _ =>
- if (config.compressedTopics.contains(topicAndPartition.topic)) {
- debug("Sending %d messages with compression codec %d to %s"
- .format(messages.size, config.compressionCodec.codec, topicAndPartition))
- new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
- }
- else {
- debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
- .format(messages.size, topicAndPartition, config.compressedTopics.toString))
- new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
- }
- }
- }
- )
- }
- Some(messagesPerTopicPartition)
- } catch {
- case t: Throwable => error("Failed to group messages", t); None
- }
- }
-
- def close() {
- if (producerPool != null)
- producerPool.close
- }
-}
diff --git a/core/src/main/scala/kafka/producer/async/EventHandler.scala b/core/src/main/scala/kafka/producer/async/EventHandler.scala
deleted file mode 100644
index 44fb1eb..0000000
--- a/core/src/main/scala/kafka/producer/async/EventHandler.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package kafka.producer.async
-
-import kafka.producer.KeyedMessage
-
-/**
- * Handler that dispatches the batched data from the queue.
- */
-@deprecated("This trait has been deprecated and will be removed in a future release.", "0.10.0.0")
-trait EventHandler[K,V] {
-
- /**
- * Callback to dispatch the batched data and send it to a Kafka server
- * @param events the data sent to the producer
- */
- def handle(events: Seq[KeyedMessage[K,V]])
-
- /**
- * Cleans up and shuts down the event handler
- */
- def close(): Unit
-}
diff --git a/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala b/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala
deleted file mode 100644
index 7779715..0000000
--- a/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer.async
-
-/**
- * Indicates that the given config parameter has invalid value
- */
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class IllegalQueueStateException(message: String) extends RuntimeException(message) {
- def this() = this(null)
-}
diff --git a/core/src/main/scala/kafka/producer/async/MissingConfigException.scala b/core/src/main/scala/kafka/producer/async/MissingConfigException.scala
deleted file mode 100644
index a42678b..0000000
--- a/core/src/main/scala/kafka/producer/async/MissingConfigException.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer.async
-
-/* Indicates any missing configuration parameter */
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class MissingConfigException(message: String) extends RuntimeException(message) {
- def this() = this(null)
-}
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
deleted file mode 100644
index 0377093..0000000
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer.async
-
-import kafka.utils.Logging
-import java.util.concurrent.{BlockingQueue, CountDownLatch, TimeUnit}
-
-import collection.mutable.ArrayBuffer
-import kafka.producer.KeyedMessage
-import kafka.metrics.KafkaMetricsGroup
-import com.yammer.metrics.core.Gauge
-import org.apache.kafka.common.utils.Time
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
-class ProducerSendThread[K,V](val threadName: String,
- val queue: BlockingQueue[KeyedMessage[K,V]],
- val handler: EventHandler[K,V],
- val queueTime: Long,
- val batchSize: Int,
- val clientId: String) extends Thread(threadName) with Logging with KafkaMetricsGroup {
-
- private val shutdownLatch = new CountDownLatch(1)
- private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
-
- newGauge("ProducerQueueSize",
- new Gauge[Int] {
- def value = queue.size
- },
- Map("clientId" -> clientId))
-
- override def run {
- try {
- processEvents
- }catch {
- case e: Throwable => error("Error in sending events: ", e)
- }finally {
- shutdownLatch.countDown
- }
- }
-
- def shutdown(): Unit = {
- info("Begin shutting down ProducerSendThread")
- queue.put(shutdownCommand)
- shutdownLatch.await
- info("Shutdown ProducerSendThread complete")
- }
-
- private def processEvents() {
- var lastSend = Time.SYSTEM.milliseconds
- var events = new ArrayBuffer[KeyedMessage[K,V]]
- var full: Boolean = false
-
- // drain the queue until you get a shutdown command
- Iterator.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - Time.SYSTEM.milliseconds), TimeUnit.MILLISECONDS))
- .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
- currentQueueItem =>
- val elapsed = Time.SYSTEM.milliseconds - lastSend
- // check if the queue time is reached. This happens when the poll method above returns after a timeout and
- // returns a null object
- val expired = currentQueueItem == null
- if(currentQueueItem != null) {
- trace("Dequeued item for topic %s, partition key: %s, data: %s"
- .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))
- events += currentQueueItem
- }
-
- // check if the batch size is reached
- full = events.size >= batchSize
-
- if(full || expired) {
- if(expired)
- debug(elapsed + " ms elapsed. Queue time reached. Sending..")
- if(full)
- debug("Batch full. Sending..")
- // if either queue time has reached or batch size has reached, dispatch to event handler
- tryToHandle(events)
- lastSend = Time.SYSTEM.milliseconds
- events = new ArrayBuffer[KeyedMessage[K,V]]
- }
- }
- // send the last batch of events
- tryToHandle(events)
- if(queue.size > 0)
- throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
- .format(queue.size))
- }
-
- def tryToHandle(events: Seq[KeyedMessage[K,V]]) {
- val size = events.size
- try {
- debug("Handling " + size + " events")
- if(size > 0)
- handler.handle(events)
- }catch {
- case e: Throwable => error("Error in handling batch of " + size + " events", e)
- }
- }
-
-}
diff --git a/core/src/main/scala/kafka/serializer/Encoder.scala b/core/src/main/scala/kafka/serializer/Encoder.scala
deleted file mode 100644
index b1277e1..0000000
--- a/core/src/main/scala/kafka/serializer/Encoder.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.serializer
-
-import java.nio.ByteBuffer
-
-import kafka.utils.VerifiableProperties
-
-/**
- * An encoder is a method of turning objects into byte arrays.
- * An implementation is required to provide a constructor that
- * takes a VerifiableProperties instance.
- */
-trait Encoder[T] {
- def toBytes(t: T): Array[Byte]
-}
-
-/**
- * The default implementation is a no-op, it just returns the same array it takes in
- */
-class DefaultEncoder(props: VerifiableProperties = null) extends Encoder[Array[Byte]] {
- override def toBytes(value: Array[Byte]): Array[Byte] = value
-}
-
-class NullEncoder[T](props: VerifiableProperties = null) extends Encoder[T] {
- override def toBytes(value: T): Array[Byte] = null
-}
-
-/**
- * The string encoder takes an optional parameter serializer.encoding which controls
- * the character set used in encoding the string into bytes.
- */
-class StringEncoder(props: VerifiableProperties = null) extends Encoder[String] {
- val encoding =
- if(props == null)
- "UTF8"
- else
- props.getString("serializer.encoding", "UTF8")
-
- override def toBytes(s: String): Array[Byte] =
- if(s == null)
- null
- else
- s.getBytes(encoding)
-}
-
-/**
- * The long encoder translates longs into bytes.
- */
-class LongEncoder(props: VerifiableProperties = null) extends Encoder[Long] {
- override def toBytes(l: Long): Array[Byte] =
- ByteBuffer.allocate(8).putLong(l).array()
-}
-
-/**
- * The integer encoder translates integers into bytes.
- */
-class IntegerEncoder(props: VerifiableProperties = null) extends Encoder[Integer] {
- override def toBytes(i: Integer): Array[Byte] =
- if(i == null)
- null
- else
- ByteBuffer.allocate(4).putInt(i).array()
-}
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 39bb0ff..e5b72a3 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -19,35 +19,29 @@ package kafka.tools
import kafka.common._
import kafka.message._
-import kafka.serializer._
import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
import kafka.utils.Implicits._
-import kafka.producer.{NewShinyProducer, OldProducer}
import java.util.Properties
import java.io._
import java.nio.charset.StandardCharsets
import joptsimple._
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
object ConsoleProducer {
- def main(args: Array[String]) {
+ def main(args: Array[String]): Unit = {
try {
val config = new ProducerConfig(args)
val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
reader.init(System.in, getReaderProps(config))
- val producer =
- if(config.useOldProducer) {
- new OldProducer(getOldProducerProps(config))
- } else {
- new NewShinyProducer(getNewProducerProps(config))
- }
+ val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
@@ -55,12 +49,12 @@ object ConsoleProducer {
}
})
- var message: ProducerRecord[Array[Byte], Array[Byte]] = null
+ var record: ProducerRecord[Array[Byte], Array[Byte]] = null
do {
- message = reader.readMessage()
- if (message != null)
- producer.send(message.topic, message.key, message.value)
- } while (message != null)
+ record = reader.readMessage()
+ if (record != null)
+ send(producer, record, config.sync)
+ } while (record != null)
} catch {
case e: joptsimple.OptionException =>
System.err.println(e.getMessage)
@@ -72,47 +66,28 @@ object ConsoleProducer {
Exit.exit(0)
}
+ private def send(producer: KafkaProducer[Array[Byte], Array[Byte]],
+ record: ProducerRecord[Array[Byte], Array[Byte]], sync: Boolean): Unit = {
+ if (sync)
+ producer.send(record).get()
+ else
+ producer.send(record, new ErrorLoggingCallback(record.topic, record.key, record.value, false))
+ }
+
def getReaderProps(config: ProducerConfig): Properties = {
val props = new Properties
- props.put("topic",config.topic)
+ props.put("topic", config.topic)
props ++= config.cmdLineProps
props
}
- def getOldProducerProps(config: ProducerConfig): Properties = {
- val props = producerProps(config)
-
- props.put("metadata.broker.list", config.brokerList)
- props.put("compression.codec", config.compressionCodec)
- props.put("producer.type", if(config.sync) "sync" else "async")
- props.put("batch.num.messages", config.batchSize.toString)
- props.put("message.send.max.retries", config.messageSendMaxRetries.toString)
- props.put("retry.backoff.ms", config.retryBackoffMs.toString)
- props.put("queue.buffering.max.ms", config.sendTimeout.toString)
- props.put("queue.buffering.max.messages", config.queueSize.toString)
- props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString)
- props.put("request.required.acks", config.requestRequiredAcks)
- props.put("request.timeout.ms", config.requestTimeoutMs.toString)
- props.put("key.serializer.class", config.keyEncoderClass)
- props.put("serializer.class", config.valueEncoderClass)
- props.put("send.buffer.bytes", config.socketBuffer.toString)
- props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString)
- props.put("client.id", "console-producer")
-
- props
- }
-
- private def producerProps(config: ProducerConfig): Properties = {
+ def producerProps(config: ProducerConfig): Properties = {
val props =
if (config.options.has(config.producerConfigOpt))
Utils.loadProps(config.options.valueOf(config.producerConfigOpt))
else new Properties
- props ++= config.extraProducerProps
- props
- }
- def getNewProducerProps(config: ProducerConfig): Properties = {
- val props = producerProps(config)
+ props ++= config.extraProducerProps
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
@@ -168,17 +143,6 @@ object ConsoleProducer {
.describedAs("timeout_ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1000)
- val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " +
- " messages will queue awaiting sufficient batch size.")
- .withRequiredArg
- .describedAs("queue_size")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(10000)
- val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue")
- .withRequiredArg
- .describedAs("queue enqueuetimeout ms")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(Int.MaxValue)
val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests")
.withRequiredArg
.describedAs("request required acks")
@@ -214,16 +178,6 @@ object ConsoleProducer {
.describedAs("memory in bytes per partition")
.ofType(classOf[java.lang.Long])
.defaultsTo(16 * 1024L)
- val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.")
- .withRequiredArg
- .describedAs("encoder_class")
- .ofType(classOf[java.lang.String])
- .defaultsTo(classOf[DefaultEncoder].getName)
- val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.")
- .withRequiredArg
- .describedAs("encoder_class")
- .ofType(classOf[java.lang.String])
- .defaultsTo(classOf[DefaultEncoder].getName)
val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
"By default each line is read as a separate message.")
.withRequiredArg
@@ -248,14 +202,12 @@ object ConsoleProducer {
.withRequiredArg
.describedAs("config file")
.ofType(classOf[String])
- val useOldProducerOpt = parser.accepts("old-producer", "Use the old producer implementation.")
val options = parser.parse(args : _*)
- if(args.length == 0)
+ if (args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "Read data from standard input and publish it to Kafka.")
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt)
- val useOldProducer = options.has(useOldProducerOpt)
val topic = options.valueOf(topicOpt)
val brokerList = options.valueOf(brokerListOpt)
ToolsUtils.validatePortOrDie(parser,brokerList)
@@ -268,14 +220,10 @@ object ConsoleProducer {
else NoCompressionCodec.name
val batchSize = options.valueOf(batchSizeOpt)
val sendTimeout = options.valueOf(sendTimeoutOpt)
- val queueSize = options.valueOf(queueSizeOpt)
- val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt)
val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt)
val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt)
val messageSendMaxRetries = options.valueOf(messageSendMaxRetriesOpt)
val retryBackoffMs = options.valueOf(retryBackoffMsOpt)
- val keyEncoderClass = options.valueOf(keyEncoderOpt)
- val valueEncoderClass = options.valueOf(valueEncoderOpt)
val readerClass = options.valueOf(messageReaderOpt)
val socketBuffer = options.valueOf(socketBufferSizeOpt)
val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt).asScala)
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
deleted file mode 100644
index f96200d..0000000
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ /dev/null
@@ -1,308 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import kafka.metrics.KafkaMetricsReporter
-import kafka.producer.{NewShinyProducer, OldProducer}
-import kafka.utils.{CommandLineUtils, Exit, Logging, ToolsUtils, VerifiableProperties}
-import kafka.utils.Implicits._
-import kafka.message.CompressionCodec
-import kafka.serializer._
-import java.util.concurrent.{CountDownLatch, Executors}
-import java.util.concurrent.atomic.AtomicLong
-import java.util._
-import java.text.SimpleDateFormat
-import java.math.BigInteger
-import java.nio.charset.StandardCharsets
-
-import org.apache.kafka.common.utils.Utils
-
-/**
- * Load test for the producer
- */
-@deprecated("This class will be replaced by org.apache.kafka.tools.ProducerPerformance after the old producer client is removed", "0.9.0.0")
-object ProducerPerformance extends Logging {
-
- def main(args: Array[String]) {
- val config = new ProducerPerfConfig(args)
- if (!config.isFixedSize)
- logger.info("WARN: Throughput will be slower due to changing message size per request")
-
- val totalBytesSent = new AtomicLong(0)
- val totalMessagesSent = new AtomicLong(0)
- val executor = Executors.newFixedThreadPool(config.numThreads)
- val allDone = new CountDownLatch(config.numThreads)
- val startMs = System.currentTimeMillis
- val rand = new java.util.Random
-
- if (!config.hideHeader)
- println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " +
- "total.data.sent.in.nMsg, nMsg.sec")
-
- for (i <- 0 until config.numThreads) {
- executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand))
- }
-
- allDone.await()
- val endMs = System.currentTimeMillis
- val elapsedSecs = (endMs - startMs) / 1000.0
- val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024)
- println("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f".format(
- config.dateFormat.format(startMs), config.dateFormat.format(endMs),
- config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent,
- totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs))
- Exit.exit(0)
- }
-
- class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {
- val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info the list of broker host and port for bootstrap.")
- .withRequiredArg
- .describedAs("hostname:port,..,hostname:port")
- .ofType(classOf[String])
- val producerConfigOpt = parser.accepts("producer.config", "Producer config properties file.")
- .withRequiredArg
- .describedAs("config file")
- .ofType(classOf[String])
- val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics to produce to")
- .withRequiredArg
- .describedAs("topic1,topic2..")
- .ofType(classOf[String])
- val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The producer request timeout in ms")
- .withRequiredArg()
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(3000)
- val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number")
- .withRequiredArg()
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(3)
- val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds")
- .withRequiredArg()
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(100)
- val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " +
- "to complete")
- .withRequiredArg()
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(-1)
- val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.")
- val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.")
- val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
- .withRequiredArg
- .describedAs("number of threads")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1)
- val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " +
- "ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
- "in the form of 'Message:000...1:xxx...'")
- .withRequiredArg()
- .describedAs("initial message id")
- .ofType(classOf[java.lang.Integer])
- val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends")
- .withRequiredArg()
- .describedAs("message send time gap")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(0)
- val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
- val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
- "set, the csv metrics will be outputted here")
- .withRequiredArg
- .describedAs("metrics directory")
- .ofType(classOf[java.lang.String])
- val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")
- val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
- .withRequiredArg
- .describedAs("size")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(100)
- val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write in a single batch.")
- .withRequiredArg
- .describedAs("size")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(200)
- val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed")
- .withRequiredArg
- .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(0)
-
- val options = parser.parse(args: _*)
- CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt)
-
- val topicsStr = options.valueOf(topicsOpt)
- val topics = topicsStr.split(",")
- val numMessages = options.valueOf(numMessagesOpt).longValue
- val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
- val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
- val hideHeader = options.has(hideHeaderOpt)
- val brokerList = options.valueOf(brokerListOpt)
- ToolsUtils.validatePortOrDie(parser,brokerList)
- val messageSize = options.valueOf(messageSizeOpt).intValue
- var isFixedSize = !options.has(varyMessageSizeOpt)
- var isSync = options.has(syncOpt)
- var batchSize = options.valueOf(batchSizeOpt).intValue
- var numThreads = options.valueOf(numThreadsOpt).intValue
- val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue)
- val seqIdMode = options.has(initialMessageIdOpt)
- var initialMessageId: Int = 0
- if (seqIdMode)
- initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
- val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue()
- val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()
- val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue()
- val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue()
- val useNewProducer = options.has(useNewProducerOpt)
-
- val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
-
- val producerProps = if (options.has(producerConfigOpt))
- Utils.loadProps(options.valueOf(producerConfigOpt))
- else
- new Properties()
-
- if (csvMetricsReporterEnabled) {
- val props = new Properties()
- props.put("kafka.metrics.polling.interval.secs", "1")
- props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter")
- if (options.has(metricsDirectoryOpt))
- props.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt))
- else
- props.put("kafka.csv.metrics.dir", "kafka_metrics")
- props.put("kafka.csv.metrics.reporter.enabled", "true")
- val verifiableProps = new VerifiableProperties(props)
- KafkaMetricsReporter.startReporters(verifiableProps)
- }
-
- val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue()
- }
-
- class ProducerThread(val threadId: Int,
- val config: ProducerPerfConfig,
- val totalBytesSent: AtomicLong,
- val totalMessagesSent: AtomicLong,
- val allDone: CountDownLatch,
- val rand: Random) extends Runnable {
- val seqIdNumDigit = 10 // no. of digits for max int value
-
- val messagesPerThread = config.numMessages / config.numThreads
- debug("Messages per thread = " + messagesPerThread)
- val props = new Properties()
- val producer =
- if (config.useNewProducer) {
- import org.apache.kafka.clients.producer.ProducerConfig
- props ++= config.producerProps
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
- props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString)
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance")
- props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString)
- props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString)
- props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString)
- props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name)
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
- new NewShinyProducer(props)
- } else {
- props ++= config.producerProps
- props.put("metadata.broker.list", config.brokerList)
- props.put("compression.codec", config.compressionCodec.codec.toString)
- props.put("send.buffer.bytes", (64 * 1024).toString)
- if (!config.isSync) {
- props.put("producer.type", "async")
- props.put("batch.num.messages", config.batchSize.toString)
- props.put("queue.enqueue.timeout.ms", "-1")
- }
- props.put("client.id", "producer-performance")
- props.put("request.required.acks", config.producerRequestRequiredAcks.toString)
- props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
- props.put("message.send.max.retries", config.producerNumRetries.toString)
- props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString)
- props.put("serializer.class", classOf[DefaultEncoder].getName)
- props.put("key.serializer.class", classOf[NullEncoder[Long]].getName)
- new OldProducer(props)
- }
-
- // generate the sequential message ID
- private val SEP = ":" // message field separator
- private val messageIdLabel = "MessageID"
- private val threadIdLabel = "ThreadID"
- private val topicLabel = "Topic"
- private var leftPaddedSeqId: String = ""
-
- private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
- // Each thread gets a unique range of sequential no. for its ids.
- // Eg. 1000 msg in 10 threads => 100 msg per thread
- // thread 0 IDs : 0 ~ 99
- // thread 1 IDs : 100 ~ 199
- // thread 2 IDs : 200 ~ 299
- // . . .
- leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId))
-
- val msgHeader = topicLabel + SEP +
- topic + SEP +
- threadIdLabel + SEP +
- threadId + SEP +
- messageIdLabel + SEP +
- leftPaddedSeqId + SEP
-
- val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x')
- debug(seqMsgString)
- seqMsgString.getBytes(StandardCharsets.UTF_8)
- }
-
- private def generateProducerData(topic: String, messageId: Long): Array[Byte] = {
- val msgSize = if (config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
- if (config.seqIdMode) {
- val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
- generateMessageWithSeqId(topic, seqId, msgSize)
- } else {
- new Array[Byte](msgSize)
- }
- }
-
- override def run {
- var bytesSent = 0L
- var nSends = 0
- var i: Long = 0L
- var message: Array[Byte] = null
-
- while (i < messagesPerThread) {
- try {
- config.topics.foreach(
- topic => {
- message = generateProducerData(topic, i)
- producer.send(topic, BigInteger.valueOf(i).toByteArray, message)
- bytesSent += message.size
- nSends += 1
- if (config.messageSendGapMs > 0)
- Thread.sleep(config.messageSendGapMs)
- })
- } catch {
- case e: Throwable => error("Error when sending message " + new String(message, StandardCharsets.UTF_8), e)
- }
- i += 1
- }
- try {
- producer.close()
- } catch {
- case e: Throwable => error("Error when closing producer", e)
- }
- totalBytesSent.addAndGet(bytesSent)
- totalMessagesSent.addAndGet(nSends)
- allDone.countDown()
- }
- }
-}
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 018971b..ffe7ffd 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -31,7 +31,6 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.{Cluster, Reconfigurable}
import org.apache.kafka.common.config.SaslConfigs
-import org.apache.kafka.common.errors.SaslAuthenticationException
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.security.scram.ScramCredential
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index a4854ae..45b3fdc 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1068,13 +1068,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)), expectFailure)
}
- private def alterSslTruststore(adminClient: AdminClient, props: Properties, listener: String, expectFailure: Boolean = false): Unit = {
- val configPrefix = listenerPrefix(listener)
- val newProps = securityProps(props, TRUSTSTORE_PROPS, configPrefix)
- reconfigureServers(newProps, perBrokerConfig = true,
- (s"$configPrefix$SSL_TRUSTSTORE_LOCATION_CONFIG", props.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)), expectFailure)
- }
-
private def alterSslKeystoreUsingConfigCommand(props: Properties, listener: String): Unit = {
val configPrefix = listenerPrefix(listener)
val newProps = securityProps(props, KEYSTORE_PROPS, configPrefix)
diff --git a/core/src/test/scala/other/kafka/TestKafkaAppender.scala b/core/src/test/scala/other/kafka/TestKafkaAppender.scala
deleted file mode 100644
index ecdcac0..0000000
--- a/core/src/test/scala/other/kafka/TestKafkaAppender.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka
-
-import org.apache.log4j.PropertyConfigurator
-import kafka.utils.{Exit, Logging}
-import serializer.Encoder
-
-object TestKafkaAppender extends Logging {
-
- def main(args:Array[String]) {
-
- if(args.length < 1) {
- println("USAGE: " + TestKafkaAppender.getClass.getName + " log4j_config")
- Exit.exit(1)
- }
-
- try {
- PropertyConfigurator.configure(args(0))
- } catch {
- case e: Exception =>
- System.err.println("KafkaAppender could not be initialized ! Exiting..")
- e.printStackTrace()
- Exit.exit(1)
- }
-
- for (_ <- 1 to 10)
- info("test")
- }
-}
-
-class AppenderStringSerializer(encoding: String = "UTF-8") extends Encoder[AnyRef] {
- def toBytes(event: AnyRef): Array[Byte] = event.toString.getBytes(encoding)
-}
-
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 4c033c4..057814b 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -24,7 +24,7 @@ import org.junit.Assert._
import org.junit.{After, Test}
import java.util.Properties
-import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
+import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.controller.{OfflineReplica, PartitionAndReplica, ReplicaDeletionSuccessful}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index b42d7f7..4f40b27 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -34,6 +34,8 @@ import scala.collection.Seq
import scala.util.Random
import java.io.File
+import org.apache.kafka.clients.producer.ProducerRecord
+
class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
val partitionId = 0
var servers: Seq[KafkaServer] = null
@@ -271,9 +273,9 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
//Given throttle set so replication will take a certain number of secs
val initialThrottle = Throttle(10 * 1000 * 1000, -1, () => zkUpdateDelay)
val expectedDurationSecs = 5
- val numMessages: Int = 500
- val msgSize: Int = 100 * 1000
- produceMessages(servers, topicName, numMessages, acks = 0, msgSize)
+ val numMessages = 500
+ val msgSize = 100 * 1000
+ produceMessages(topicName, numMessages, acks = 0, msgSize)
assertEquals(expectedDurationSecs, numMessages * msgSize / initialThrottle.interBrokerLimit)
//Start rebalance which will move replica on 100 -> replica on 102
@@ -321,8 +323,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
//Given throttle set so replication will take a while
val throttle: Long = 1000 * 1000
- produceMessages(servers, "topic1", 100, acks = 0, 100 * 1000)
- produceMessages(servers, "topic2", 100, acks = 0, 100 * 1000)
+ produceMessages("topic1", 100, acks = 0, 100 * 1000)
+ produceMessages("topic2", 100, acks = 0, 100 * 1000)
//Start rebalance
val newAssignment = Map(
@@ -358,7 +360,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
//Given throttle set so replication will take at least 20 sec (we won't wait this long)
val initialThrottle: Long = 1000 * 1000
- produceMessages(servers, topicName, numMessages = 200, acks = 0, valueBytes = 100 * 1000)
+ produceMessages(topicName, numMessages = 200, acks = 0, valueLength = 100 * 1000)
//Start rebalance
val newAssignment = generateAssignment(zkClient, Array(101, 102), json(topicName), true)._1
@@ -630,4 +632,10 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
val topicStr = topic.map { t => "{\"topic\": \"" + t + "\"}" }.mkString(",")
s"""{"topics": [$topicStr],"version":1}"""
}
+
+ private def produceMessages(topic: String, numMessages: Int, acks: Int, valueLength: Int): Unit = {
+ val records = (0 until numMessages).map(_ => new ProducerRecord[Array[Byte], Array[Byte]](topic,
+ new Array[Byte](valueLength)))
+ TestUtils.produceMessages(servers, records, acks)
+ }
}
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 9674486..04fc428 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -19,6 +19,7 @@ import java.util.{Calendar, Date, Properties}
import kafka.admin.ConsumerGroupCommand.ConsumerGroupService
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.junit.Assert._
import org.junit.Test
@@ -93,7 +94,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsNewConsumerExistingTopic(): Unit = {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "new.group", "--topic", topic,
"--to-offset", "50")
- TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+ produceMessages(topic, 100)
resetAndAssertOffsets(args, expectedOffset = 50, dryRun = true)
resetAndAssertOffsets(args ++ Array("--dry-run"), expectedOffset = 50, dryRun = true)
resetAndAssertOffsets(args ++ Array("--execute"), expectedOffset = 50, group = "new.group")
@@ -105,7 +106,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
val calendar = Calendar.getInstance()
calendar.add(Calendar.DATE, -1)
- TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+ produceMessages(topic, 100)
val executor = addConsumerGroupExecutor(numConsumers = 1, topic)
awaitConsumerProgress(count = 100L)
@@ -120,9 +121,9 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsToZonedDateTime() {
val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
- TestUtils.produceMessages(servers, topic, 50, acks = 1, 100 * 1000)
+ produceMessages(topic, 50)
val checkpoint = new Date()
- TestUtils.produceMessages(servers, topic, 50, acks = 1, 100 * 1000)
+ produceMessages(topic, 50)
val executor = addConsumerGroupExecutor(numConsumers = 1, topic)
awaitConsumerProgress(count = 100L)
@@ -137,7 +138,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsByDuration() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--by-duration", "PT1M", "--execute")
- produceConsumeAndShutdown(totalMessages = 100)
+ produceConsumeAndShutdown(topic, totalMessages = 100)
resetAndAssertOffsets(args, expectedOffset = 0)
}
@@ -145,7 +146,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsByDurationToEarliest() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--by-duration", "PT0.1S", "--execute")
- produceConsumeAndShutdown(totalMessages = 100)
+ produceConsumeAndShutdown(topic, totalMessages = 100)
resetAndAssertOffsets(args, expectedOffset = 100)
}
@@ -153,7 +154,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsToEarliest() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--to-earliest", "--execute")
- produceConsumeAndShutdown(totalMessages = 100)
+ produceConsumeAndShutdown(topic, totalMessages = 100)
resetAndAssertOffsets(args, expectedOffset = 0)
}
@@ -161,8 +162,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsToLatest() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--to-latest", "--execute")
- produceConsumeAndShutdown(totalMessages = 100)
- TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+ produceConsumeAndShutdown(topic, totalMessages = 100)
+ produceMessages(topic, 100)
resetAndAssertOffsets(args, expectedOffset = 200)
}
@@ -170,8 +171,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsToCurrentOffset() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--to-current", "--execute")
- produceConsumeAndShutdown(totalMessages = 100)
- TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+ produceConsumeAndShutdown(topic, totalMessages = 100)
+ produceMessages(topic, 100)
resetAndAssertOffsets(args, expectedOffset = 100)
}
@@ -179,7 +180,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsToSpecificOffset() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--to-offset", "1", "--execute")
- produceConsumeAndShutdown(totalMessages = 100)
+ produceConsumeAndShutdown(topic, totalMessages = 100)
resetAndAssertOffsets(args, expectedOffset = 1)
}
@@ -187,8 +188,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsShiftPlus() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--shift-by", "50", "--execute")
- produceConsumeAndShutdown(totalMessages = 100)
- TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+ produceConsumeAndShutdown(topic, totalMessages = 100)
+ produceMessages(topic, 100)
resetAndAssertOffsets(args, expectedOffset = 150)
}
@@ -196,8 +197,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsShiftMinus() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--shift-by", "-50", "--execute")
- produceConsumeAndShutdown(totalMessages = 100)
- TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+ produceConsumeAndShutdown(topic, totalMessages = 100)
+ produceMessages(topic, 100)
resetAndAssertOffsets(args, expectedOffset = 50)
}
@@ -205,8 +206,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsShiftByLowerThanEarliest() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--shift-by", "-150", "--execute")
- produceConsumeAndShutdown(totalMessages = 100)
- TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+ produceConsumeAndShutdown(topic, totalMessages = 100)
+ produceMessages(topic, 100)
resetAndAssertOffsets(args, expectedOffset = 0)
}
@@ -214,8 +215,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsShiftByHigherThanLatest() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--shift-by", "150", "--execute")
- produceConsumeAndShutdown(totalMessages = 100)
- TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+ produceConsumeAndShutdown(topic, totalMessages = 100)
+ produceMessages(topic, 100)
resetAndAssertOffsets(args, expectedOffset = 200)
}
@@ -223,7 +224,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsToEarliestOnOneTopic() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic,
"--to-earliest", "--execute")
- produceConsumeAndShutdown(totalMessages = 100)
+ produceConsumeAndShutdown(topic, totalMessages = 100)
resetAndAssertOffsets(args, expectedOffset = 0)
}
@@ -236,7 +237,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
s"$topic:1", "--to-earliest", "--execute")
val consumerGroupCommand = getConsumerGroupService(args)
- produceConsumeAndShutdown(totalMessages = 100, numConsumers = 2, topic)
+ produceConsumeAndShutdown(topic, totalMessages = 100, numConsumers = 2)
val priorCommittedOffsets = committedOffsets(topic = topic)
val tp0 = new TopicPartition(topic, 0)
@@ -258,8 +259,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
"--topic", topic2, "--to-earliest", "--execute")
val consumerGroupCommand = getConsumerGroupService(args)
- produceConsumeAndShutdown(100, 1, topic1)
- produceConsumeAndShutdown(100, 1, topic2)
+ produceConsumeAndShutdown(topic1, 100, 1)
+ produceConsumeAndShutdown(topic2, 100, 1)
val tp1 = new TopicPartition(topic1, 0)
val tp2 = new TopicPartition(topic2, 0)
@@ -285,8 +286,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
s"$topic1:1", "--topic", s"$topic2:1", "--to-earliest", "--execute")
val consumerGroupCommand = getConsumerGroupService(args)
- produceConsumeAndShutdown(100, 2, topic1)
- produceConsumeAndShutdown(100, 2, topic2)
+ produceConsumeAndShutdown(topic1, 100, 2)
+ produceConsumeAndShutdown(topic2, 100, 2)
val priorCommittedOffsets1 = committedOffsets(topic1)
val priorCommittedOffsets2 = committedOffsets(topic2)
@@ -314,7 +315,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
"--to-offset", "2", "--export")
val consumerGroupCommand = getConsumerGroupService(cgcArgs)
- produceConsumeAndShutdown(100, 2, topic)
+ produceConsumeAndShutdown(topic, 100, 2)
val file = File.createTempFile("reset", ".csv")
file.deleteOnExit()
@@ -334,8 +335,14 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
adminZkClient.deleteTopic(topic)
}
- private def produceConsumeAndShutdown(totalMessages: Int, numConsumers: Int = 1, topic: String = topic) {
- TestUtils.produceMessages(servers, topic, totalMessages, acks = 1, 100 * 1000)
+ private def produceMessages(topic: String, numMessages: Int): Unit = {
+ val records = (0 until numMessages).map(_ => new ProducerRecord[Array[Byte], Array[Byte]](topic,
+ new Array[Byte](100 * 1000)))
+ TestUtils.produceMessages(servers, records, acks = 1)
+ }
+
+ private def produceConsumeAndShutdown(topic: String, totalMessages: Int, numConsumers: Int = 1) {
+ produceMessages(topic, totalMessages)
val executor = addConsumerGroupExecutor(numConsumers, topic)
awaitConsumerProgress(topic, totalMessages)
executor.shutdown()
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 4bcf61d..33f9352 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -80,16 +80,6 @@ object SerializationTestUtils {
private val brokers = List(createBroker(0, "localhost", 1011), createBroker(0, "localhost", 1012),
createBroker(0, "localhost", 1013))
- def createTestProducerRequest: ProducerRequest = {
- new ProducerRequest(1, "client 1", 0, 1000, topicDataProducerRequest)
- }
-
- def createTestProducerResponse: ProducerResponse =
- ProducerResponse(1, Map(
- TopicAndPartition(topic1, 0) -> ProducerResponseStatus(Errors.forCode(0.toShort), 10001),
- TopicAndPartition(topic2, 0) -> ProducerResponseStatus(Errors.forCode(0.toShort), 20001)
- ), ProducerRequest.CurrentVersion, 100)
-
def createTestFetchRequest: FetchRequest = new FetchRequest(requestInfo = requestInfos.toVector)
def createTestFetchResponse: FetchResponse = FetchResponse(1, topicDataFetchResponse.toVector)
@@ -163,8 +153,6 @@ object SerializationTestUtils {
}
class RequestResponseSerializationTest extends JUnitSuite {
- private val producerRequest = SerializationTestUtils.createTestProducerRequest
- private val producerResponse = SerializationTestUtils.createTestProducerResponse
private val fetchRequest = SerializationTestUtils.createTestFetchRequest
private val offsetRequest = SerializationTestUtils.createTestOffsetRequest
private val offsetResponse = SerializationTestUtils.createTestOffsetResponse
@@ -182,8 +170,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
def testSerializationAndDeserialization() {
val requestsAndResponses =
- collection.immutable.Seq(producerRequest, producerResponse,
- fetchRequest, offsetRequest, offsetResponse,
+ collection.immutable.Seq(fetchRequest, offsetRequest, offsetResponse,
offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitRequestV2, offsetCommitResponse,
offsetFetchRequest, offsetFetchResponse,
consumerMetadataRequest, consumerMetadataResponse,
@@ -202,27 +189,6 @@ class RequestResponseSerializationTest extends JUnitSuite {
}
@Test
- def testProduceResponseVersion() {
- val oldClientResponse = ProducerResponse(1, Map(
- TopicAndPartition("t1", 0) -> ProducerResponseStatus(Errors.NONE, 10001),
- TopicAndPartition("t2", 0) -> ProducerResponseStatus(Errors.NONE, 20001)
- ))
-
- val newClientResponse = ProducerResponse(1, Map(
- TopicAndPartition("t1", 0) -> ProducerResponseStatus(Errors.NONE, 10001),
- TopicAndPartition("t2", 0) -> ProducerResponseStatus(Errors.NONE, 20001)
- ), 1, 100)
-
- // new response should have 4 bytes more than the old response since delayTime is an INT32
- assertEquals(oldClientResponse.sizeInBytes + 4, newClientResponse.sizeInBytes)
-
- val buffer = ByteBuffer.allocate(newClientResponse.sizeInBytes)
- newClientResponse.writeTo(buffer)
- buffer.rewind()
- assertEquals(ProducerResponse.readFrom(buffer).throttleTime, 100)
- }
-
- @Test
def testFetchResponseVersion() {
val oldClientResponse = FetchResponse(1, Map(
TopicAndPartition("t1", 0) -> new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes)))
diff --git a/core/src/test/scala/unit/kafka/common/ConfigTest.scala b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
index 2d20b1e..a506d52 100644
--- a/core/src/test/scala/unit/kafka/common/ConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
@@ -20,43 +20,11 @@ package kafka.common
import org.junit.Assert._
import collection.mutable.ArrayBuffer
import org.junit.Test
-import kafka.producer.ProducerConfig
import kafka.consumer.ConsumerConfig
class ConfigTest {
@Test
- @deprecated("This test is deprecated and it will be removed in a future release.", "0.10.0.0")
- def testInvalidClientIds() {
- val invalidClientIds = new ArrayBuffer[String]()
- val badChars = Array('/', '\\', ',', '\u0000', ':', "\"", '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '=')
- for (weirdChar <- badChars) {
- invalidClientIds += "Is" + weirdChar + "illegal"
- }
-
- for (i <- 0 until invalidClientIds.size) {
- try {
- ProducerConfig.validateClientId(invalidClientIds(i))
- fail("Should throw InvalidClientIdException.")
- }
- catch {
- case _: InvalidConfigException => // This is good
- }
- }
-
- val validClientIds = new ArrayBuffer[String]()
- validClientIds += ("valid", "CLIENT", "iDs", "ar6", "VaL1d", "_0-9_.", "")
- for (i <- 0 until validClientIds.size) {
- try {
- ProducerConfig.validateClientId(validClientIds(i))
- }
- catch {
- case _: Exception => fail("Should not throw exception.")
- }
- }
- }
-
- @Test
def testInvalidGroupIds() {
val invalidGroupIds = new ArrayBuffer[String]()
val badChars = Array('/', '\\', ',', '\u0000', ':', "\"", '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '=')
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index b4381a4..91d0af4 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -23,11 +23,11 @@ import org.junit.Assert._
import kafka.common.MessageStreamsExistException
import kafka.integration.KafkaServerTestHarness
import kafka.javaapi.consumer.ConsumerRebalanceListener
-import kafka.message._
import kafka.serializer._
import kafka.server._
import kafka.utils.TestUtils._
import kafka.utils._
+import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.security.JaasUtils
import org.apache.log4j.{Level, Logger}
import org.junit.{After, Before, Test}
@@ -97,8 +97,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
zkConsumerConnector0.shutdown
// send some messages to each broker
- val sentMessages1 = sendMessages(servers, topic, nMessages, 0) ++
- sendMessages(servers, topic, nMessages, 1)
+ val sentMessages1 = produceMessages(nMessages, acks = 0) ++ produceMessages(nMessages, acks = 1)
// wait to make sure the topic and partition have a leader for the successful case
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
@@ -131,8 +130,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// send some messages to each broker
- val sentMessages2 = sendMessages(servers, topic, nMessages, 0) ++
- sendMessages(servers, topic, nMessages, 1)
+ val sentMessages2 = produceMessages(nMessages, acks = 0) ++ produceMessages(nMessages, acks = 1)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -152,8 +150,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
// send some messages to each broker
- val sentMessages3 = sendMessages(servers, topic, nMessages, 0) ++
- sendMessages(servers, topic, nMessages, 1)
+ val sentMessages3 = produceMessages(nMessages, acks = 0) ++ produceMessages(nMessages, acks = 1)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -180,14 +177,19 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
requestHandlerLogger.setLevel(Level.ERROR)
}
+ private def produceMessages(numMessages: Int, acks: Int = -1,
+ compressionType: CompressionType = CompressionType.NONE): Seq[String] = {
+ TestUtils.generateAndProduceMessages(servers, topic, numMessages, acks, compressionType)
+ }
+
@Test
def testCompression() {
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL)
// send some messages to each broker
- val sentMessages1 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
- sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
+ val sentMessages1 = produceMessages(nMessages, acks = 0, CompressionType.GZIP) ++
+ produceMessages(nMessages, acks = 1, CompressionType.GZIP)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -219,8 +221,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// send some messages to each broker
- val sentMessages2 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
- sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
+ val sentMessages2 = produceMessages(nMessages, acks = 0, CompressionType.GZIP) ++
+ produceMessages(nMessages, acks = 1, CompressionType.GZIP)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -240,8 +242,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
// send some messages to each broker
- val sentMessages3 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
- sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
+ val sentMessages3 = produceMessages(nMessages, acks = 0, CompressionType.GZIP) ++
+ produceMessages(nMessages, acks = 1, CompressionType.GZIP)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -263,8 +265,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
@Test
def testCompressionSetConsumption() {
// send some messages to each broker
- val sentMessages = sendMessages(servers, topic, 200, 0, DefaultCompressionCodec) ++
- sendMessages(servers, topic, 200, 1, DefaultCompressionCodec)
+ val sentMessages = produceMessages(200, acks = 0, CompressionType.GZIP) ++
+ produceMessages(200, acks = 1, CompressionType.GZIP)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@@ -290,8 +292,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
requestHandlerLogger.setLevel(Level.FATAL)
// send some messages to each broker
- val sentMessages = sendMessages(servers, topic, nMessages, 0, NoCompressionCodec) ++
- sendMessages(servers, topic, nMessages, 1, NoCompressionCodec)
+ val sentMessages = produceMessages(nMessages, acks = 0) ++ produceMessages(nMessages, acks = 1)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@@ -332,7 +333,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
createTopic(topic, numPartitions = 1, replicationFactor = 1)
// send some messages to each broker
- val sentMessages1 = sendMessages(servers, topic, nMessages)
+ val sentMessages1 = produceMessages(nMessages)
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@@ -359,8 +360,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
@Test
def testConsumerRebalanceListener() {
// Send messages to create topic
- sendMessages(servers, topic, nMessages, 0)
- sendMessages(servers, topic, nMessages, 1)
+ produceMessages(nMessages, acks = 0)
+ produceMessages(nMessages, acks = 1)
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index a49523f..5abc352 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -17,13 +17,12 @@
package kafka.integration
-import kafka.utils.{ZKGroupTopicDirs, Logging}
-import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer}
+import kafka.utils.{Logging, ZKGroupTopicDirs}
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, ConsumerTimeoutException}
import kafka.server._
import kafka.utils.TestUtils
-import kafka.serializer._
-import kafka.producer.{Producer, KeyedMessage}
-
+import kafka.utils.TestUtils.createNewProducer
+import org.apache.kafka.clients.producer.ProducerRecord
import org.junit.{After, Before, Test}
import org.apache.log4j.{Level, Logger}
import org.junit.Assert._
@@ -79,12 +78,10 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
createTopic(topic, 1, 1)
- val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
- TestUtils.getBrokerListStrFromServers(servers),
- keyEncoder = classOf[StringEncoder].getName)
+ val producer = createNewProducer(TestUtils.getBrokerListStrFromServers(servers), retries = 5)
- for(_ <- 0 until numMessages)
- producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes))
+ val futures = (0 until numMessages).map(_ => producer.send(new ProducerRecord(topic, topic.getBytes, "test".getBytes)))
+ futures.foreach(_.get)
// update offset in ZooKeeper for consumer to jump "forward" in time
val dirs = new ZKGroupTopicDirs(group, topic)
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 0a8c49f..18373f2 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -77,11 +77,11 @@ class FetcherTest extends KafkaServerTestHarness {
@Test
def testFetcher() {
val perNode = 2
- var count = TestUtils.produceMessages(servers, topic, perNode).size
+ var count = TestUtils.generateAndProduceMessages(servers, topic, perNode).size
fetch(count)
assertQueueEmpty()
- count = TestUtils.produceMessages(servers, topic, perNode).size
+ count = TestUtils.generateAndProduceMessages(servers, topic, perNode).size
fetch(count)
assertQueueEmpty()
}
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
deleted file mode 100755
index 0cf95e9..0000000
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.integration
-
-import java.nio.ByteBuffer
-
-import org.junit.Assert._
-import kafka.api.{FetchRequest, FetchRequestBuilder, PartitionFetchInfo}
-import kafka.server.{KafkaConfig, KafkaRequestHandler}
-import kafka.producer.{KeyedMessage, Producer}
-import org.apache.log4j.{Level, Logger}
-import org.junit.Test
-
-import scala.collection._
-import kafka.common.{ErrorMapping, OffsetOutOfRangeException, TopicAndPartition, UnknownTopicOrPartitionException}
-import kafka.utils.{StaticPartitioner, TestUtils}
-import kafka.serializer.StringEncoder
-import java.util.Properties
-
-import org.apache.kafka.common.TopicPartition
-
-/**
- * End to end tests of the primitive apis against a local server
- */
-@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
-class PrimitiveApiTest extends ProducerConsumerTestHarness {
- val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
-
- def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
-
- @Test
- def testFetchRequestCanProperlySerialize() {
- val request = new FetchRequestBuilder()
- .clientId("test-client")
- .maxWait(10001)
- .minBytes(4444)
- .addFetch("topic1", 0, 0, 10000)
- .addFetch("topic2", 1, 1024, 9999)
- .addFetch("topic1", 1, 256, 444)
- .build()
- val serializedBuffer = ByteBuffer.allocate(request.sizeInBytes)
- request.writeTo(serializedBuffer)
- serializedBuffer.rewind()
- val deserializedRequest = FetchRequest.readFrom(serializedBuffer)
- assertEquals(request, deserializedRequest)
- }
-
- @Test
- def testEmptyFetchRequest() {
- val partitionRequests = immutable.Map[TopicAndPartition, PartitionFetchInfo]()
- val request = new FetchRequest(requestInfo = partitionRequests.toVector)
- val fetched = consumer.fetch(request)
- assertTrue(!fetched.hasError && fetched.data.isEmpty)
- }
-
- @Test
- def testDefaultEncoderProducerAndFetch() {
- val topic = "test-topic"
-
- producer.send(new KeyedMessage[String, String](topic, "test-message"))
-
- val replica = servers.head.replicaManager.getReplica(new TopicPartition(topic, 0)).get
- assertTrue("HighWatermark should equal logEndOffset with just 1 replica",
- replica.logEndOffset.messageOffset > 0 && replica.logEndOffset.equals(replica.highWatermark))
-
- val request = new FetchRequestBuilder()
- .clientId("test-client")
- .addFetch(topic, 0, 0, 10000)
- .build()
- val fetched = consumer.fetch(request)
- assertEquals("Returned correlationId doesn't match that in request.", 0, fetched.correlationId)
-
- val messageSet = fetched.messageSet(topic, 0)
- assertTrue(messageSet.iterator.hasNext)
-
- val fetchedMessageAndOffset = messageSet.head
- assertEquals("test-message", TestUtils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
- }
-
- @Test
- def testDefaultEncoderProducerAndFetchWithCompression() {
- val topic = "test-topic"
- val props = new Properties()
- props.put("compression.codec", "gzip")
-
- val stringProducer1 = TestUtils.createProducer[String, String](
- TestUtils.getBrokerListStrFromServers(servers),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[StringEncoder].getName,
- partitioner = classOf[StaticPartitioner].getName,
- producerProps = props)
-
- stringProducer1.send(new KeyedMessage[String, String](topic, "test-message"))
-
- val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
- val messageSet = fetched.messageSet(topic, 0)
- assertTrue(messageSet.iterator.hasNext)
-
- val fetchedMessageAndOffset = messageSet.head
- assertEquals("test-message", TestUtils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
- }
-
- private def produceAndMultiFetch(producer: Producer[String, String]) {
- for(topic <- List("test1", "test2", "test3", "test4"))
- createTopic(topic)
-
- // send some messages
- val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
- {
- val messages = new mutable.HashMap[String, Seq[String]]
- val builder = new FetchRequestBuilder()
- for( (topic, partition) <- topics) {
- val messageList = List("a_" + topic, "b_" + topic)
- val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
- messages += topic -> messageList
- producer.send(producerData:_*)
- builder.addFetch(topic, partition, 0, 10000)
- }
-
- val request = builder.build()
- val response = consumer.fetch(request)
- for((topic, partition) <- topics) {
- val fetched = response.messageSet(topic, partition)
- assertEquals(messages(topic), fetched.map(messageAndOffset => TestUtils.readString(messageAndOffset.message.payload)))
- }
- }
-
- // temporarily set request handler logger to a higher level
- requestHandlerLogger.setLevel(Level.FATAL)
-
- {
- // send some invalid offsets
- val builder = new FetchRequestBuilder()
- for((topic, partition) <- topics)
- builder.addFetch(topic, partition, -1, 10000)
-
- try {
- val request = builder.build()
- val response = consumer.fetch(request)
- response.data.foreach(pdata => ErrorMapping.maybeThrowException(pdata._2.error.code))
- fail("Expected exception when fetching message with invalid offset")
- } catch {
- case _: OffsetOutOfRangeException => // This is good.
- }
- }
-
- {
- // send some invalid partitions
- val builder = new FetchRequestBuilder()
- for((topic, _) <- topics)
- builder.addFetch(topic, -1, 0, 10000)
-
- try {
- val request = builder.build()
- val response = consumer.fetch(request)
- response.data.foreach(pdata => ErrorMapping.maybeThrowException(pdata._2.error.code))
- fail("Expected exception when fetching message with invalid partition")
- } catch {
- case _: UnknownTopicOrPartitionException => // This is good.
- }
- }
-
- // restore set request handler logger to a higher level
- requestHandlerLogger.setLevel(Level.ERROR)
- }
-
- @Test
- def testProduceAndMultiFetch() {
- produceAndMultiFetch(producer)
- }
-
- private def multiProduce(producer: Producer[String, String]) {
- val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
- topics.keys.map(topic => createTopic(topic))
-
- val messages = new mutable.HashMap[String, Seq[String]]
- val builder = new FetchRequestBuilder()
- for((topic, partition) <- topics) {
- val messageList = List("a_" + topic, "b_" + topic)
- val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
- messages += topic -> messageList
- producer.send(producerData:_*)
- builder.addFetch(topic, partition, 0, 10000)
- }
-
- val request = builder.build()
- val response = consumer.fetch(request)
- for((topic, partition) <- topics) {
- val fetched = response.messageSet(topic, partition)
- assertEquals(messages(topic), fetched.map(messageAndOffset => TestUtils.readString(messageAndOffset.message.payload)))
- }
- }
-
- @Test
- def testMultiProduce() {
- multiProduce(producer)
- }
-
- @Test
- def testConsumerEmptyTopic() {
- val newTopic = "new-topic"
- createTopic(newTopic, numPartitions = 1, replicationFactor = 1)
-
- val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
- assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
- }
-
- @Test
- def testPipelinedProduceRequests() {
- val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
- topics.keys.map(topic => createTopic(topic))
- val props = new Properties()
- props.put("request.required.acks", "0")
- val pipelinedProducer: Producer[String, String] =
- TestUtils.createProducer[String, String](
- TestUtils.getBrokerListStrFromServers(servers),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[StringEncoder].getName,
- partitioner = classOf[StaticPartitioner].getName,
- producerProps = props)
-
- // send some messages
- val messages = new mutable.HashMap[String, Seq[String]]
- val builder = new FetchRequestBuilder()
- for( (topic, partition) <- topics) {
- val messageList = List("a_" + topic, "b_" + topic)
- val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
- messages += topic -> messageList
- pipelinedProducer.send(producerData:_*)
- builder.addFetch(topic, partition, 0, 10000)
- }
-
- // wait until the messages are published
- TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(new TopicPartition("test1", 0)).get.logEndOffset == 2 },
- "Published messages should be in the log")
- TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(new TopicPartition("test2", 0)).get.logEndOffset == 2 },
- "Published messages should be in the log")
- TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(new TopicPartition("test3", 0)).get.logEndOffset == 2 },
- "Published messages should be in the log")
- TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog(new TopicPartition("test4", 0)).get.logEndOffset == 2 },
- "Published messages should be in the log")
-
- val replicaId = servers.head.config.brokerId
- TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica(new TopicPartition("test1", 0), replicaId).get.highWatermark.messageOffset == 2 },
- "High watermark should equal to log end offset")
- TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica(new TopicPartition("test2", 0), replicaId).get.highWatermark.messageOffset == 2 },
- "High watermark should equal to log end offset")
- TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica(new TopicPartition("test3", 0), replicaId).get.highWatermark.messageOffset == 2 },
- "High watermark should equal to log end offset")
- TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica(new TopicPartition("test4", 0), replicaId).get.highWatermark.messageOffset == 2 },
- "High watermark should equal to log end offset")
-
- // test if the consumer received the messages in the correct order when producer has enabled request pipelining
- val request = builder.build()
- val response = consumer.fetch(request)
- for( (topic, partition) <- topics) {
- val fetched = response.messageSet(topic, partition)
- assertEquals(messages(topic), fetched.map(messageAndOffset => TestUtils.readString(messageAndOffset.message.payload)))
- }
- }
-}
diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
deleted file mode 100644
index e3115e1..0000000
--- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.integration
-
-import kafka.consumer.SimpleConsumer
-import org.junit.{After, Before}
-import kafka.producer.Producer
-import kafka.utils.{StaticPartitioner, TestUtils}
-import kafka.serializer.StringEncoder
-
-@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
-trait ProducerConsumerTestHarness extends KafkaServerTestHarness {
- val host = "localhost"
- var producer: Producer[String, String] = null
- var consumer: SimpleConsumer = null
-
- @Before
- override def setUp() {
- super.setUp
- producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(servers),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[StringEncoder].getName,
- partitioner = classOf[StaticPartitioner].getName)
- consumer = new SimpleConsumer(host, TestUtils.boundPort(servers.head), 1000000, 64 * 1024, "")
- }
-
- @After
- override def tearDown() {
- producer.close()
- consumer.close()
- super.tearDown
- }
-}
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 608f3a6..a15ddb8 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -54,8 +54,6 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
val kafkaApisLogger = Logger.getLogger(classOf[kafka.server.KafkaApis])
val networkProcessorLogger = Logger.getLogger(classOf[kafka.network.Processor])
- val syncProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer])
- val eventHandlerLogger = Logger.getLogger(classOf[kafka.producer.async.DefaultEventHandler[Object, Object]])
@Before
override def setUp() {
@@ -73,8 +71,6 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
// temporarily set loggers to a higher level so that tests run quietly
kafkaApisLogger.setLevel(Level.FATAL)
networkProcessorLogger.setLevel(Level.FATAL)
- syncProducerLogger.setLevel(Level.FATAL)
- eventHandlerLogger.setLevel(Level.FATAL)
}
@After
@@ -85,8 +81,6 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
// restore log levels
kafkaApisLogger.setLevel(Level.ERROR)
networkProcessorLogger.setLevel(Level.ERROR)
- syncProducerLogger.setLevel(Level.ERROR)
- eventHandlerLogger.setLevel(Level.ERROR)
super.tearDown()
}
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index dbd9118..695b1b6 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -19,18 +19,17 @@ package kafka.javaapi.consumer
import java.util.Properties
-import kafka.server._
import kafka.serializer._
+import kafka.server._
import kafka.integration.KafkaServerTestHarness
-import kafka.producer.KeyedMessage
-import kafka.javaapi.producer.Producer
-import kafka.utils.IntEncoder
import kafka.utils.{Logging, TestUtils}
-import kafka.consumer.{KafkaStream, ConsumerConfig}
+import kafka.consumer.{ConsumerConfig, KafkaStream}
import kafka.common.MessageStreamsExistException
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
import org.junit.Test
-import scala.collection.JavaConversions
+import scala.collection.JavaConverters._
import org.apache.log4j.{Level, Logger}
import org.junit.Assert._
@@ -65,14 +64,14 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
- val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
+ val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map[String, Integer](topic -> numNodes*numParts/2).asJava, new StringDecoder(), new StringDecoder())
val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
assertEquals(sentMessages1.sorted, receivedMessages1.sorted)
// call createMesssageStreams twice should throw MessageStreamsExistException
try {
- zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
+ zkConsumerConnector1.createMessageStreams(Map[String, Integer](topic -> numNodes*numParts/2).asJava, new StringDecoder(), new StringDecoder())
fail("Should fail with MessageStreamsExistException")
} catch {
case _: MessageStreamsExistException => // expected
@@ -86,35 +85,22 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
messagesPerNode: Int,
header: String): List[String] = {
var messages: List[String] = Nil
- for(server <- servers) {
- val producer: kafka.producer.Producer[Int, String] =
- TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[IntEncoder].getName)
- val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer)
+ val producer = TestUtils.createNewProducer[Integer, String](TestUtils.getBrokerListStrFromServers(servers),
+ keySerializer = new IntegerSerializer, valueSerializer = new StringSerializer)
+ for (server <- servers) {
for (partition <- 0 until numParts) {
- val ms = 0.until(messagesPerNode).map(x => header + server.config.brokerId + "-" + partition + "-" + x)
+ val ms = (0 until messagesPerNode).map(x => header + server.config.brokerId + "-" + partition + "-" + x)
messages ++= ms
- import JavaConversions._
- javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]])
+ ms.map(new ProducerRecord[Integer, String](topic, partition, partition, _)).map(producer.send).foreach(_.get)
}
- javaProducer.close
}
+ producer.close()
messages
}
def getMessages(nMessagesPerThread: Int,
jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = {
- var messages: List[String] = Nil
- import scala.collection.JavaConversions._
- val topicMessageStreams = jTopicMessageStreams.mapValues(_.toList)
- messages = TestUtils.getMessages(topicMessageStreams, nMessagesPerThread)
- messages
- }
-
- private def toJavaMap(scalaMap: Map[String, Int]): java.util.Map[String, java.lang.Integer] = {
- val javaMap = new java.util.HashMap[String, java.lang.Integer]()
- scalaMap.foreach(m => javaMap.put(m._1, m._2.asInstanceOf[java.lang.Integer]))
- javaMap
+ val topicMessageStreams = jTopicMessageStreams.asScala.mapValues(_.asScala.toList)
+ TestUtils.getMessages(topicMessageStreams, nMessagesPerThread)
}
}
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 4227764..2423e4c 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -26,14 +26,11 @@ import org.junit.Test
import org.junit.Assert._
import kafka.integration.KafkaServerTestHarness
import kafka.server._
-import kafka.serializer._
import kafka.utils._
-import kafka.utils.TestUtils._
import scala.collection._
import scala.collection.JavaConverters._
import scala.util.matching.Regex
-import kafka.consumer.{ConsumerConfig, ZookeeperConsumerConnector}
import kafka.log.LogConfig
import org.apache.kafka.common.TopicPartition
@@ -50,26 +47,6 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
val nMessages = 2
@Test
- @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
- def testMetricsLeak() {
- val topic = "test-metrics-leak"
- // create topic topic1 with 1 partition on broker 0
- createTopic(topic, numPartitions = 1, replicationFactor = 1)
- // force creation not client's specific metrics.
- createAndShutdownStep(topic, "group0", "consumer0", "producer0")
-
- //this assertion is only used for creating the metrics for DelayedFetchMetrics, it should never fail, but should not be removed
- assertNotNull(DelayedFetchMetrics)
-
- val countOfStaticMetrics = Metrics.defaultRegistry.allMetrics.keySet.size
-
- for (i <- 0 to 5) {
- createAndShutdownStep(topic, "group" + i % 3, "consumer" + i % 2, "producer" + i % 2)
- assertEquals(countOfStaticMetrics, Metrics.defaultRegistry.allMetrics.keySet.size)
- }
- }
-
- @Test
def testMetricsReporterAfterDeletingTopic() {
val topic = "test-topic-metric"
adminZkClient.createTopic(topic, 1, 1)
@@ -84,7 +61,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
adminZkClient.createTopic(topic, 2, 1)
// Produce a few messages to create the metrics
// Don't consume messages as it may cause metrics to be re-created causing the test to fail, see KAFKA-5238
- TestUtils.produceMessages(servers, topic, nMessages)
+ TestUtils.generateAndProduceMessages(servers, topic, nMessages)
assertTrue("Topic metrics don't exist", topicMetricGroups(topic).nonEmpty)
servers.foreach(s => assertNotNull(s.brokerTopicStats.topicStats(topic)))
adminZkClient.deleteTopic(topic)
@@ -108,18 +85,6 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assert(metric.getMBeanName.endsWith(expectedMBeanName))
}
- @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
- def createAndShutdownStep(topic: String, group: String, consumerId: String, producerId: String): Unit = {
- sendMessages(servers, topic, nMessages)
- // create a consumer
- val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId))
- val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
- val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder, new StringDecoder)
- getMessages(topicMessageStreams1, nMessages)
-
- zkConsumerConnector1.shutdown()
- }
-
@Test
def testBrokerTopicMetricsBytesInOut(): Unit = {
val topic = "test-bytes-in-out"
@@ -132,7 +97,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, "2")
createTopic(topic, 1, numNodes, topicConfig)
// Produce a few messages to create the metrics
- TestUtils.produceMessages(servers, topic, nMessages)
+ TestUtils.generateAndProduceMessages(servers, topic, nMessages)
// Check the log size for each broker so that we can distinguish between failures caused by replication issues
// versus failures caused by the metrics
@@ -151,7 +116,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
val initialBytesOut = meterCount(bytesOut)
// Produce a few messages to make the metrics tick
- TestUtils.produceMessages(servers, topic, nMessages)
+ TestUtils.generateAndProduceMessages(servers, topic, nMessages)
assertTrue(meterCount(replicationBytesIn) > initialReplicationBytesIn)
assertTrue(meterCount(replicationBytesOut) > initialReplicationBytesOut)
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
deleted file mode 100755
index 370a1ad..0000000
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ /dev/null
@@ -1,505 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer
-
-import java.util.Properties
-import java.util.concurrent.LinkedBlockingQueue
-
-import org.apache.kafka.common.protocol.Errors
-import org.junit.Assert.{assertEquals, assertTrue}
-import org.easymock.EasyMock
-import org.junit.Test
-import kafka.api._
-import kafka.cluster.BrokerEndPoint
-import kafka.common._
-import kafka.message._
-import kafka.producer.async._
-import kafka.serializer._
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils._
-
-import scala.collection.Map
-import scala.collection.mutable.ArrayBuffer
-import kafka.utils._
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.utils.Time
-
-@deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0")
-class AsyncProducerTest {
-
- class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner {
- def partition(data: Any, numPartitions: Int): Int = -1
- }
-
- // One of the few cases we can just set a fixed port because the producer is mocked out here since this uses mocks
- val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port = 65534))
- val configs = props.map(KafkaConfig.fromProps)
- val brokerList = configs.map { config =>
- val endPoint = config.advertisedListeners.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get
- org.apache.kafka.common.utils.Utils.formatAddress(endPoint.host, endPoint.port)
- }.mkString(",")
-
- @Test
- def testProducerQueueSize() {
- // a mock event handler that blocks
- val mockEventHandler = new EventHandler[String,String] {
-
- def handle(events: Seq[KeyedMessage[String,String]]) {
- Thread.sleep(500)
- }
-
- def close(): Unit = ()
- }
-
- val props = new Properties()
- props.put("serializer.class", "kafka.serializer.StringEncoder")
- props.put("metadata.broker.list", brokerList)
- props.put("producer.type", "async")
- props.put("queue.buffering.max.messages", "10")
- props.put("batch.num.messages", "1")
- props.put("queue.enqueue.timeout.ms", "0")
-
- val config = new ProducerConfig(props)
- val produceData = getProduceData(12)
- val producer = new Producer[String, String](config, mockEventHandler)
- try {
- // send all 10 messages, should hit the batch size and then reach broker
- producer.send(produceData: _*)
- fail("Queue should be full")
- }
- catch {
- case _: QueueFullException => //expected
- }finally {
- producer.close()
- }
- }
-
- @Test
- def testProduceAfterClosed() {
- val produceData = getProduceData(10)
- val producer = createProducer[String, String](
- brokerList,
- encoder = classOf[StringEncoder].getName)
-
- producer.close
-
- try {
- producer.send(produceData: _*)
- fail("should complain that producer is already closed")
- }
- catch {
- case _: ProducerClosedException => //expected
- }
- }
-
- @Test
- def testBatchSize() {
- /**
- * Send a total of 10 messages with batch size of 5. Expect 2 calls to the handler, one for each batch.
- */
- val producerDataList = getProduceData(10)
- val mockHandler = EasyMock.createStrictMock(classOf[DefaultEventHandler[String,String]])
- mockHandler.handle(producerDataList.take(5))
- EasyMock.expectLastCall
- mockHandler.handle(producerDataList.takeRight(5))
- EasyMock.expectLastCall
- EasyMock.replay(mockHandler)
-
- val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10)
- val producerSendThread =
- new ProducerSendThread[String,String]("thread1", queue, mockHandler, Integer.MAX_VALUE, 5, "")
- producerSendThread.start()
-
- for (producerData <- producerDataList)
- queue.put(producerData)
-
- producerSendThread.shutdown
- EasyMock.verify(mockHandler)
- }
-
- @Test
- def testQueueTimeExpired() {
- /**
- * Send a total of 2 messages with batch size of 5 and queue time of 200ms.
- * Expect 1 calls to the handler after 200ms.
- */
- val producerDataList = getProduceData(2)
- val mockHandler = EasyMock.createStrictMock(classOf[DefaultEventHandler[String,String]])
- mockHandler.handle(producerDataList)
- EasyMock.expectLastCall
- EasyMock.replay(mockHandler)
-
- val queueExpirationTime = 200
- val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10)
- val producerSendThread =
- new ProducerSendThread[String,String]("thread1", queue, mockHandler, queueExpirationTime, 5, "")
- producerSendThread.start()
-
- for (producerData <- producerDataList)
- queue.put(producerData)
-
- Thread.sleep(queueExpirationTime + 100)
- EasyMock.verify(mockHandler)
- producerSendThread.shutdown
- }
-
- @Test
- def testPartitionAndCollateEvents() {
- val producerDataList = new ArrayBuffer[KeyedMessage[Int,Message]]
- // use bogus key and partition key override for some messages
- producerDataList.append(new KeyedMessage[Int,Message]("topic1", key = 0, message = new Message("msg1".getBytes)))
- producerDataList.append(new KeyedMessage[Int,Message]("topic2", key = -99, partKey = 1, message = new Message("msg2".getBytes)))
- producerDataList.append(new KeyedMessage[Int,Message]("topic1", key = 2, message = new Message("msg3".getBytes)))
- producerDataList.append(new KeyedMessage[Int,Message]("topic1", key = -101, partKey = 3, message = new Message("msg4".getBytes)))
- producerDataList.append(new KeyedMessage[Int,Message]("topic2", key = 4, message = new Message("msg5".getBytes)))
-
- val props = new Properties()
- props.put("metadata.broker.list", brokerList)
- val broker1 = new BrokerEndPoint(0, "localhost", 9092)
- val broker2 = new BrokerEndPoint(1, "localhost", 9093)
-
- // form expected partitions metadata
- val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2))
- val partition2Metadata = new PartitionMetadata(1, Some(broker2), List(broker1, broker2))
- val topic1Metadata = new TopicMetadata("topic1", List(partition1Metadata, partition2Metadata))
- val topic2Metadata = new TopicMetadata("topic2", List(partition1Metadata, partition2Metadata))
-
- val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
- topicPartitionInfos.put("topic1", topic1Metadata)
- topicPartitionInfos.put("topic2", topic2Metadata)
-
- val intPartitioner = new Partitioner {
- def partition(key: Any, numPartitions: Int): Int = key.asInstanceOf[Int] % numPartitions
- }
- val config = new ProducerConfig(props)
-
- val producerPool = new ProducerPool(config)
- val handler = new DefaultEventHandler[Int,String](config,
- partitioner = intPartitioner,
- encoder = null.asInstanceOf[Encoder[String]],
- keyEncoder = new IntEncoder(),
- producerPool = producerPool,
- topicPartitionInfos = topicPartitionInfos)
-
- val topic1Broker1Data =
- ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
- new KeyedMessage[Int,Message]("topic1", 2, new Message("msg3".getBytes)))
- val topic1Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", -101, 3, new Message("msg4".getBytes)))
- val topic2Broker1Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2", 4, new Message("msg5".getBytes)))
- val topic2Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2", -99, 1, new Message("msg2".getBytes)))
- val expectedResult = Some(Map(
- 0 -> Map(
- TopicAndPartition("topic1", 0) -> topic1Broker1Data,
- TopicAndPartition("topic2", 0) -> topic2Broker1Data),
- 1 -> Map(
- TopicAndPartition("topic1", 1) -> topic1Broker2Data,
- TopicAndPartition("topic2", 1) -> topic2Broker2Data)
- ))
-
- val actualResult = handler.partitionAndCollate(producerDataList)
- assertEquals(expectedResult, actualResult)
- }
-
- @Test
- def testSerializeEvents() {
- val produceData = TestUtils.getMsgStrings(5).map(m => new KeyedMessage[String,String]("topic1",m))
- val props = new Properties()
- props.put("metadata.broker.list", brokerList)
- val config = new ProducerConfig(props)
- // form expected partitions metadata
- val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
- val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
- topicPartitionInfos.put("topic1", topic1Metadata)
-
- val producerPool = new ProducerPool(config)
-
- val handler = new DefaultEventHandler[String,String](config,
- partitioner = null.asInstanceOf[Partitioner],
- encoder = new StringEncoder,
- keyEncoder = new StringEncoder,
- producerPool = producerPool,
- topicPartitionInfos = topicPartitionInfos)
-
- val serializedData = handler.serialize(produceData)
- val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, TestUtils.readString(d.message.payload)))
-
- // Test that the serialize handles seq from a Stream
- val streamedSerializedData = handler.serialize(Stream(produceData:_*))
- val deserializedStreamData = streamedSerializedData.map(d => new KeyedMessage[String,String](d.topic, TestUtils.readString(d.message.payload)))
-
- TestUtils.checkEquals(produceData.iterator, deserializedData.iterator)
- TestUtils.checkEquals(produceData.iterator, deserializedStreamData.iterator)
- }
-
- @Test
- def testInvalidPartition() {
- val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
- producerDataList.append(new KeyedMessage[String,Message]("topic1", "key1", new Message("msg1".getBytes)))
- val props = new Properties()
- props.put("metadata.broker.list", brokerList)
- val config = new ProducerConfig(props)
-
- // form expected partitions metadata
- val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
-
- val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
- topicPartitionInfos.put("topic1", topic1Metadata)
-
- val producerPool = new ProducerPool(config)
-
- val handler = new DefaultEventHandler[String,String](config,
- partitioner = new NegativePartitioner,
- encoder = null.asInstanceOf[Encoder[String]],
- keyEncoder = null.asInstanceOf[Encoder[String]],
- producerPool = producerPool,
- topicPartitionInfos = topicPartitionInfos)
- try {
- handler.partitionAndCollate(producerDataList)
- }
- catch {
- // should not throw any exception
- case _: Throwable => fail("Should not throw any exception")
-
- }
- }
-
- @Test
- def testNoBroker() {
- val props = new Properties()
- props.put("metadata.broker.list", brokerList)
-
- val config = new ProducerConfig(props)
- // create topic metadata with 0 partitions
- val topic1Metadata = new TopicMetadata("topic1", Seq.empty)
-
- val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
- topicPartitionInfos.put("topic1", topic1Metadata)
-
- val producerPool = new ProducerPool(config)
-
- val producerDataList = new ArrayBuffer[KeyedMessage[String,String]]
- producerDataList.append(new KeyedMessage[String,String]("topic1", "msg1"))
- val handler = new DefaultEventHandler[String,String](config,
- partitioner = null.asInstanceOf[Partitioner],
- encoder = new StringEncoder,
- keyEncoder = new StringEncoder,
- producerPool = producerPool,
- topicPartitionInfos = topicPartitionInfos)
- try {
- handler.handle(producerDataList)
- fail("Should fail with FailedToSendMessageException")
- }
- catch {
- case _: FailedToSendMessageException => // we retry on any exception now
- }
- }
-
- @Test
- def testIncompatibleEncoder() {
- val props = new Properties()
- // no need to retry since the send will always fail
- props.put("message.send.max.retries", "0")
- val producer= createProducer[String, String](
- brokerList = brokerList,
- encoder = classOf[DefaultEncoder].getName,
- keyEncoder = classOf[DefaultEncoder].getName,
- producerProps = props)
-
- try {
- producer.send(getProduceData(1): _*)
- fail("Should fail with ClassCastException due to incompatible Encoder")
- } catch {
- case _: ClassCastException =>
- } finally {
- producer.close()
- }
- }
-
- @Test
- def testRandomPartitioner() {
- val props = new Properties()
- props.put("metadata.broker.list", brokerList)
- val config = new ProducerConfig(props)
-
- // create topic metadata with 0 partitions
- val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
- val topic2Metadata = getTopicMetadata("topic2", 0, 0, "localhost", 9092)
-
- val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
- topicPartitionInfos.put("topic1", topic1Metadata)
- topicPartitionInfos.put("topic2", topic2Metadata)
-
- val producerPool = new ProducerPool(config)
- val handler = new DefaultEventHandler[String,String](config,
- partitioner = null.asInstanceOf[Partitioner],
- encoder = null.asInstanceOf[Encoder[String]],
- keyEncoder = null.asInstanceOf[Encoder[String]],
- producerPool = producerPool,
- topicPartitionInfos = topicPartitionInfos)
- val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
- producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg1".getBytes)))
- producerDataList.append(new KeyedMessage[String,Message]("topic2", new Message("msg2".getBytes)))
- producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg3".getBytes)))
-
- val partitionedDataOpt = handler.partitionAndCollate(producerDataList)
- partitionedDataOpt match {
- case Some(partitionedData) =>
- for (dataPerBroker <- partitionedData.values) {
- for (tp <- dataPerBroker.keys)
- assertTrue(tp.partition == 0)
- }
- case None =>
- fail("Failed to collate requests by topic, partition")
- }
- }
-
- @Test
- def testFailedSendRetryLogic() {
- val props = new Properties()
- props.put("metadata.broker.list", brokerList)
- props.put("request.required.acks", "1")
- props.put("serializer.class", classOf[StringEncoder].getName)
- props.put("key.serializer.class", classOf[NullEncoder[Int]].getName)
- props.put("producer.num.retries", "3")
-
- val config = new ProducerConfig(props)
-
- val topic1 = "topic1"
- val topic1Metadata = getTopicMetadata(topic1, Array(0, 1), 0, "localhost", 9092)
- val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
- topicPartitionInfos.put("topic1", topic1Metadata)
-
- val msgs = TestUtils.getMsgStrings(2)
-
- import SyncProducerConfig.{DefaultAckTimeoutMs, DefaultClientId}
-
- // produce request for topic1 and partitions 0 and 1. Let the first request fail
- // entirely. The second request will succeed for partition 1 but fail for partition 0.
- // On the third try for partition 0, let it succeed.
- val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1,
- correlationId = 5, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
- val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1,
- correlationId = 11, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
- val response1 = ProducerResponse(0,
- Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NOT_LEADER_FOR_PARTITION, 0L)),
- (TopicAndPartition("topic1", 1), ProducerResponseStatus(Errors.NONE, 0L))))
- val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 15,
- timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
- val response2 = ProducerResponse(0,
- Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NONE, 0L))))
- val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
- // don't care about config mock
- val myProps = new Properties()
- myProps.put("host", "localhost")
- myProps.put("port", "9092")
- val myConfig = new SyncProducerConfig(myProps)
- EasyMock.expect(mockSyncProducer.config).andReturn(myConfig).anyTimes()
- EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
- EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response1)
- EasyMock.expect(mockSyncProducer.send(request3)).andReturn(response2)
- EasyMock.replay(mockSyncProducer)
-
- val producerPool = EasyMock.createMock(classOf[ProducerPool])
- EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer).times(3)
- EasyMock.expect(producerPool.close())
- EasyMock.replay(producerPool)
- val time = new Time {
- override def nanoseconds: Long = 0L
- override def milliseconds: Long = 0L
- override def sleep(ms: Long): Unit = {}
- override def hiResClockMs: Long = 0L
- }
- val handler = new DefaultEventHandler(config,
- partitioner = new FixedValuePartitioner(),
- encoder = new StringEncoder(),
- keyEncoder = new NullEncoder[Int](),
- producerPool = producerPool,
- topicPartitionInfos = topicPartitionInfos,
- time = time)
- val data = msgs.map(m => new KeyedMessage(topic1, 0, m)) ++ msgs.map(m => new KeyedMessage(topic1, 1, m))
- handler.handle(data)
- handler.close()
-
- EasyMock.verify(mockSyncProducer)
- EasyMock.verify(producerPool)
- }
-
- @Test
- def testJavaProducer() {
- val topic = "topic1"
- val msgs = TestUtils.getMsgStrings(5)
- val scalaProducerData = msgs.map(m => new KeyedMessage[String, String](topic, m))
- val javaProducerData: java.util.List[KeyedMessage[String, String]] = {
- import scala.collection.JavaConversions._
- scalaProducerData
- }
-
- val mockScalaProducer = EasyMock.createMock(classOf[kafka.producer.Producer[String, String]])
- mockScalaProducer.send(scalaProducerData.head)
- EasyMock.expectLastCall()
- mockScalaProducer.send(scalaProducerData: _*)
- EasyMock.expectLastCall()
- EasyMock.replay(mockScalaProducer)
-
- val javaProducer = new kafka.javaapi.producer.Producer[String, String](mockScalaProducer)
- javaProducer.send(javaProducerData.get(0))
- javaProducer.send(javaProducerData)
-
- EasyMock.verify(mockScalaProducer)
- }
-
- @Test
- def testInvalidConfiguration() {
- val props = new Properties()
- props.put("serializer.class", "kafka.serializer.StringEncoder")
- props.put("producer.type", "async")
- try {
- new ProducerConfig(props)
- fail("should complain about wrong config")
- }
- catch {
- case _: IllegalArgumentException => //expected
- }
- }
-
- def getProduceData(nEvents: Int): Seq[KeyedMessage[String,String]] = {
- val producerDataList = new ArrayBuffer[KeyedMessage[String,String]]
- for (i <- 0 until nEvents)
- producerDataList.append(new KeyedMessage[String,String]("topic1", null, "msg" + i))
- producerDataList
- }
-
- private def getTopicMetadata(topic: String, partition: Int, brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
- getTopicMetadata(topic, List(partition), brokerId, brokerHost, brokerPort)
- }
-
- private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
- val broker1 = new BrokerEndPoint(brokerId, brokerHost, brokerPort)
- new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1))))
- }
-
- def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
- new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => new Message(m.getBytes, 0L, Message.MagicValue_V1)): _*)
- }
-
- def messagesToSet(key: Array[Byte], messages: Seq[Array[Byte]]): ByteBufferMessageSet = {
- new ByteBufferMessageSet(
- NoCompressionCodec,
- messages.map(m => new Message(key = key, bytes = m, timestamp = 0L, magicValue = Message.MagicValue_V1)): _*)
- }
-}
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
deleted file mode 100755
index dc2a5ed..0000000
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ /dev/null
@@ -1,348 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer
-
-import java.nio.ByteBuffer
-import java.util
-import java.util.Properties
-
-import kafka.api.FetchRequestBuilder
-import kafka.common.FailedToSendMessageException
-import kafka.consumer.SimpleConsumer
-import kafka.message.{Message, MessageAndOffset}
-import kafka.serializer.StringEncoder
-import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
-import kafka.utils._
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.utils.Time
-import org.apache.log4j.{Level, Logger}
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
-import org.scalatest.exceptions.TestFailedException
-
-@deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0")
-class ProducerTest extends ZooKeeperTestHarness with Logging{
- private val brokerId1 = 0
- private val brokerId2 = 1
- private var server1: KafkaServer = null
- private var server2: KafkaServer = null
- private var consumer1: SimpleConsumer = null
- private var consumer2: SimpleConsumer = null
- private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
- private var servers = List.empty[KafkaServer]
-
- // Creation of consumers is deferred until they are actually needed. This allows us to kill brokers that use random
- // ports and then get a consumer instance that will be pointed at the correct port
- def getConsumer1() = {
- if (consumer1 == null)
- consumer1 = new SimpleConsumer("localhost", TestUtils.boundPort(server1), 1000000, 64*1024, "")
- consumer1
- }
-
- def getConsumer2() = {
- if (consumer2 == null)
- consumer2 = new SimpleConsumer("localhost", TestUtils.boundPort(server2), 1000000, 64*1024, "")
- consumer2
- }
-
- @Before
- override def setUp() {
- super.setUp()
- // set up 2 brokers with 4 partitions each
- val props1 = TestUtils.createBrokerConfig(brokerId1, zkConnect, false)
- props1.put("num.partitions", "4")
- val config1 = KafkaConfig.fromProps(props1)
- val props2 = TestUtils.createBrokerConfig(brokerId2, zkConnect, false)
- props2.put("num.partitions", "4")
- val config2 = KafkaConfig.fromProps(props2)
- server1 = TestUtils.createServer(config1)
- server2 = TestUtils.createServer(config2)
- servers = List(server1,server2)
-
- // temporarily set request handler logger to a higher level
- requestHandlerLogger.setLevel(Level.FATAL)
- }
-
- @After
- override def tearDown() {
- // restore set request handler logger to a higher level
- requestHandlerLogger.setLevel(Level.ERROR)
-
- if (consumer1 != null)
- consumer1.close()
- if (consumer2 != null)
- consumer2.close()
-
- TestUtils.shutdownServers(Seq(server1, server2))
- super.tearDown()
- }
-
- @Test
- def testUpdateBrokerPartitionInfo() {
- val topic = "new-topic"
- TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers)
-
- val props = new Properties()
- // no need to retry since the send will always fail
- props.put("message.send.max.retries", "0")
- val producer1 = TestUtils.createProducer[String, String](
- brokerList = "localhost:80,localhost:81",
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[StringEncoder].getName,
- producerProps = props)
-
- try {
- producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
- fail("Test should fail because the broker list provided are not valid")
- } catch {
- case _: FailedToSendMessageException => // this is expected
- } finally producer1.close()
-
- val producer2 = TestUtils.createProducer[String, String](
- brokerList = "localhost:80," + TestUtils.getBrokerListStrFromServers(Seq(server1)),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[StringEncoder].getName)
-
- try{
- producer2.send(new KeyedMessage[String, String](topic, "test", "test1"))
- } catch {
- case e: Throwable => fail("Should succeed sending the message", e)
- } finally {
- producer2.close()
- }
-
- val producer3 = TestUtils.createProducer[String, String](
- brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[StringEncoder].getName)
-
- try{
- producer3.send(new KeyedMessage[String, String](topic, "test", "test1"))
- } catch {
- case e: Throwable => fail("Should succeed sending the message", e)
- } finally {
- producer3.close()
- }
- }
-
- @Test
- def testSendToNewTopic() {
- val props1 = new util.Properties()
- props1.put("request.required.acks", "-1")
-
- val topic = "new-topic"
- // create topic with 1 partition and await leadership
- TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers)
-
- val producer1 = TestUtils.createProducer[String, String](
- brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[StringEncoder].getName,
- partitioner = classOf[StaticPartitioner].getName,
- producerProps = props1)
- val startTime = System.currentTimeMillis()
- // Available partition ids should be 0.
- producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
- producer1.send(new KeyedMessage[String, String](topic, "test", "test2"))
- val endTime = System.currentTimeMillis()
- // get the leader
- val leaderOpt = zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
- assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
- val leader = leaderOpt.get
-
- val messageSet = if(leader == server1.config.brokerId) {
- val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
- response1.messageSet("new-topic", 0).iterator.toBuffer
- }else {
- val response2 = getConsumer2().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
- response2.messageSet("new-topic", 0).iterator.toBuffer
- }
- assertEquals("Should have fetched 2 messages", 2, messageSet.size)
- // Message 1
- assertTrue(ByteBuffer.wrap("test1".getBytes).equals(messageSet.head.message.payload))
- assertTrue(ByteBuffer.wrap("test".getBytes).equals(messageSet.head.message.key))
- assertTrue(messageSet.head.message.timestamp >= startTime && messageSet.head.message.timestamp < endTime)
- assertEquals(TimestampType.CREATE_TIME, messageSet.head.message.timestampType)
- assertEquals(Message.MagicValue_V1, messageSet.head.message.magic)
-
- // Message 2
- assertTrue(ByteBuffer.wrap("test2".getBytes).equals(messageSet(1).message.payload))
- assertTrue(ByteBuffer.wrap("test".getBytes).equals(messageSet(1).message.key))
- assertTrue(messageSet(1).message.timestamp >= startTime && messageSet(1).message.timestamp < endTime)
- assertEquals(TimestampType.CREATE_TIME, messageSet(1).message.timestampType)
- assertEquals(Message.MagicValue_V1, messageSet(1).message.magic)
- producer1.close()
-
- val props2 = new util.Properties()
- props2.put("request.required.acks", "3")
- // no need to retry since the send will always fail
- props2.put("message.send.max.retries", "0")
-
- try {
- val producer2 = TestUtils.createProducer[String, String](
- brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[StringEncoder].getName,
- partitioner = classOf[StaticPartitioner].getName,
- producerProps = props2)
- producer2.close
- fail("we don't support request.required.acks greater than 1")
- }
- catch {
- case _: IllegalArgumentException => // this is expected
- }
- }
-
-
- @Test
- def testSendWithDeadBroker() {
- val props = new Properties()
- props.put("request.required.acks", "1")
- // No need to retry since the topic will be created beforehand and normal send will succeed on the first try.
- // Reducing the retries will save the time on the subsequent failure test.
- props.put("message.send.max.retries", "0")
-
- val topic = "new-topic"
- // create topic
- TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0)),
- servers = servers)
-
- val producer = TestUtils.createProducer[String, String](
- brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[StringEncoder].getName,
- partitioner = classOf[StaticPartitioner].getName,
- producerProps = props)
- val startTime = System.currentTimeMillis()
- try {
- // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only
- // on broker 0
- producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
- } catch {
- case e: Throwable => fail("Unexpected exception: " + e)
- }
- val endTime = System.currentTimeMillis()
- // kill the broker
- server1.shutdown
- server1.awaitShutdown()
-
- try {
- // These sends should fail since there are no available brokers
- producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
- fail("Should fail since no leader exists for the partition.")
- } catch {
- case e : TestFailedException => throw e // catch and re-throw the failure message
- case _: Throwable => // otherwise success
- }
-
- // restart server 1
- server1.startup()
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
- TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
- TestUtils.waitUntilLeaderIsKnown(servers, topic, 0)
-
- try {
- // cross check if broker 1 got the messages
- val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
- val messageSet1 = response1.messageSet(topic, 0).iterator
- assertTrue("Message set should have 1 message", messageSet1.hasNext)
- val message = messageSet1.next.message
- assertTrue(ByteBuffer.wrap("test1".getBytes).equals(message.payload))
- assertTrue(ByteBuffer.wrap("test".getBytes).equals(message.key))
- assertTrue(message.timestamp >= startTime && message.timestamp < endTime)
- assertEquals(TimestampType.CREATE_TIME, message.timestampType)
- assertEquals(Message.MagicValue_V1, message.magic)
- assertFalse("Message set should have another message", messageSet1.hasNext)
- } catch {
- case e: Exception => fail("Not expected", e)
- }
- producer.close
- }
-
- @Test
- def testAsyncSendCanCorrectlyFailWithTimeout() {
- val topic = "new-topic"
- // create topics in ZK
- TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0, 1)), servers = servers)
-
- val timeoutMs = 500
- val props = new Properties()
- props.put("request.timeout.ms", timeoutMs.toString)
- props.put("request.required.acks", "1")
- props.put("message.send.max.retries", "0")
- props.put("client.id","ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout")
- val producer = TestUtils.createProducer[String, String](
- brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[StringEncoder].getName,
- partitioner = classOf[StaticPartitioner].getName,
- producerProps = props)
-
- // do a simple test to make sure plumbing is okay
- try {
- // this message should be assigned to partition 0 whose leader is on broker 0
- producer.send(new KeyedMessage(topic, "test", "test"))
- // cross check if the broker received the messages
- // we need the loop because the broker won't return the message until it has been replicated and the producer is
- // using acks=1
- var messageSet1: Iterator[MessageAndOffset] = null
- TestUtils.waitUntilTrue(() => {
- val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
- messageSet1 = response1.messageSet(topic, 0).iterator
- messageSet1.hasNext
- }, "Message set should have 1 message")
- assertEquals(ByteBuffer.wrap("test".getBytes), messageSet1.next.message.payload)
-
- // stop IO threads and request handling, but leave networking operational
- // any requests should be accepted and queue up, but not handled
- server1.requestHandlerPool.shutdown()
-
- val t1 = Time.SYSTEM.milliseconds
- try {
- // this message should be assigned to partition 0 whose leader is on broker 0, but
- // broker 0 will not respond within timeoutMs millis.
- producer.send(new KeyedMessage(topic, "test", "test"))
- fail("Exception should have been thrown")
- } catch {
- case _: FailedToSendMessageException => /* success */
- }
- val t2 = Time.SYSTEM.milliseconds
- // make sure we don't wait fewer than timeoutMs
- assertTrue((t2-t1) >= timeoutMs)
-
- } finally producer.close()
- }
-
- @Test
- def testSendNullMessage() {
- val producer = TestUtils.createProducer[String, String](
- brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[StringEncoder].getName,
- partitioner = classOf[StaticPartitioner].getName)
-
- try {
- TestUtils.createTopic(zkClient, "new-topic", 2, 1, servers)
- producer.send(new KeyedMessage("new-topic", "key", null))
- } finally {
- producer.close()
- }
- }
-}
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
deleted file mode 100644
index 5c1d4da..0000000
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer
-
-import java.net.SocketTimeoutException
-import java.util.Properties
-
-import kafka.api.{ProducerRequest, ProducerResponseStatus}
-import kafka.common.TopicAndPartition
-import kafka.integration.KafkaServerTestHarness
-import kafka.message._
-import kafka.server.KafkaConfig
-import kafka.utils._
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.record.{DefaultRecordBatch, DefaultRecord}
-import org.junit.Test
-import org.junit.Assert._
-
-@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
-class SyncProducerTest extends KafkaServerTestHarness {
- private val messageBytes = new Array[Byte](2)
- // turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
- def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head))
-
- private def produceRequest(topic: String,
- partition: Int,
- message: ByteBufferMessageSet,
- acks: Int,
- timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs,
- correlationId: Int = 0,
- clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = {
- TestUtils.produceRequest(topic, partition, message, acks, timeout, correlationId, clientId)
- }
-
- @Test
- def testReachableServer() {
- val server = servers.head
- val props = TestUtils.getSyncProducerConfig(boundPort(server))
-
- val producer = new SyncProducer(new SyncProducerConfig(props))
-
- val firstStart = Time.SYSTEM.milliseconds
- var response = producer.send(produceRequest("test", 0,
- new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
- assertNotNull(response)
- assertTrue((Time.SYSTEM.milliseconds - firstStart) < 12000)
-
- val secondStart = Time.SYSTEM.milliseconds
- response = producer.send(produceRequest("test", 0,
- new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
- assertNotNull(response)
- assertTrue((Time.SYSTEM.milliseconds - secondStart) < 12000)
-
- response = producer.send(produceRequest("test", 0,
- new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
- assertNotNull(response)
- }
-
- @Test
- def testEmptyProduceRequest() {
- val server = servers.head
- val props = TestUtils.getSyncProducerConfig(boundPort(server))
-
-
- val correlationId = 0
- val clientId = SyncProducerConfig.DefaultClientId
- val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
- val ack: Short = 1
- val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
-
- val producer = new SyncProducer(new SyncProducerConfig(props))
- val response = producer.send(emptyRequest)
- assertTrue(response != null)
- assertTrue(!response.hasError && response.status.isEmpty)
- }
-
- @Test
- def testMessageSizeTooLarge() {
- val server = servers.head
- val props = TestUtils.getSyncProducerConfig(boundPort(server))
-
- val producer = new SyncProducer(new SyncProducerConfig(props))
- createTopic("test", numPartitions = 1, replicationFactor = 1)
-
- val message1 = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))
- val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
- val response1 = producer.send(produceRequest("test", 0, messageSet1, acks = 1))
-
- assertEquals(1, response1.status.count(_._2.error != Errors.NONE))
- assertEquals(Errors.MESSAGE_TOO_LARGE, response1.status(TopicAndPartition("test", 0)).error)
- assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).offset)
-
- val safeSize = configs.head.messageMaxBytes - DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD
- val message2 = new Message(new Array[Byte](safeSize))
- val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
- val response2 = producer.send(produceRequest("test", 0, messageSet2, acks = 1))
-
- assertEquals(1, response1.status.count(_._2.error != Errors.NONE))
- assertEquals(Errors.NONE, response2.status(TopicAndPartition("test", 0)).error)
- assertEquals(0, response2.status(TopicAndPartition("test", 0)).offset)
- }
-
- @Test
- def testMessageSizeTooLargeWithAckZero() {
- val server = servers.head
- val props = TestUtils.getSyncProducerConfig(boundPort(server))
-
- props.put("request.required.acks", "0")
-
- val producer = new SyncProducer(new SyncProducerConfig(props))
- adminZkClient.createTopic("test", 1, 1)
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0)
-
- // This message will be dropped silently since message size too large.
- producer.send(produceRequest("test", 0,
- new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))), acks = 0))
-
- // Send another message whose size is large enough to exceed the buffer size so
- // the socket buffer will be flushed immediately;
- // this send should fail since the socket has been closed
- try {
- producer.send(produceRequest("test", 0,
- new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))), acks = 0))
- } catch {
- case _ : java.io.IOException => // success
- }
- }
-
- @Test
- def testProduceCorrectlyReceivesResponse() {
- val server = servers.head
- val props = TestUtils.getSyncProducerConfig(boundPort(server))
-
- val producer = new SyncProducer(new SyncProducerConfig(props))
- val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
-
- // #1 - test that we get an error when partition does not belong to broker in response
- val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1,
- timeout = SyncProducerConfig.DefaultAckTimeoutMs, clientId = SyncProducerConfig.DefaultClientId)
- val response = producer.send(request)
-
- assertNotNull(response)
- assertEquals(request.correlationId, response.correlationId)
- assertEquals(3, response.status.size)
- response.status.values.foreach {
- case ProducerResponseStatus(error, nextOffset, timestamp) =>
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, error)
- assertEquals(-1L, nextOffset)
- assertEquals(Message.NoTimestamp, timestamp)
- }
-
- // #2 - test that we get correct offsets when partition is owned by broker
- adminZkClient.createTopic("topic1", 1, 1)
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic1", 0)
- adminZkClient.createTopic("topic3", 1, 1)
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic3", 0)
-
- val response2 = producer.send(request)
- assertNotNull(response2)
- assertEquals(request.correlationId, response2.correlationId)
- assertEquals(3, response2.status.size)
-
- // the first and last message should have been accepted by broker
- assertEquals(Errors.NONE, response2.status(TopicAndPartition("topic1", 0)).error)
- assertEquals(Errors.NONE, response2.status(TopicAndPartition("topic3", 0)).error)
- assertEquals(0, response2.status(TopicAndPartition("topic1", 0)).offset)
- assertEquals(0, response2.status(TopicAndPartition("topic3", 0)).offset)
-
- // the middle message should have been rejected because broker doesn't lead partition
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
- response2.status(TopicAndPartition("topic2", 0)).error)
- assertEquals(-1, response2.status(TopicAndPartition("topic2", 0)).offset)
- }
-
- @Test
- def testProducerCanTimeout() {
- val timeoutMs = 500
-
- val server = servers.head
- val props = TestUtils.getSyncProducerConfig(boundPort(server))
- val producer = new SyncProducer(new SyncProducerConfig(props))
-
- val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
- val request = produceRequest("topic1", 0, messages, acks = 1)
-
- // stop IO threads and request handling, but leave networking operational
- // any requests should be accepted and queue up, but not handled
- server.requestHandlerPool.shutdown()
-
- val t1 = Time.SYSTEM.milliseconds
- try {
- producer.send(request)
- fail("Should have received timeout exception since request handling is stopped.")
- } catch {
- case _: SocketTimeoutException => /* success */
- }
- val t2 = Time.SYSTEM.milliseconds
- // make sure we don't wait fewer than timeoutMs for a response
- assertTrue((t2-t1) >= timeoutMs)
- }
-
- @Test
- def testProduceRequestWithNoResponse() {
- val server = servers.head
-
- val port = TestUtils.boundPort(server)
- val props = TestUtils.getSyncProducerConfig(port)
- val correlationId = 0
- val clientId = SyncProducerConfig.DefaultClientId
- val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
- val ack: Short = 0
- val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
- val producer = new SyncProducer(new SyncProducerConfig(props))
- val response = producer.send(emptyRequest)
- assertTrue(response == null)
- }
-
- @Test
- def testNotEnoughReplicas() {
- val topicName = "minisrtest"
- val server = servers.head
- val props = TestUtils.getSyncProducerConfig(boundPort(server))
-
- props.put("request.required.acks", "-1")
-
- val producer = new SyncProducer(new SyncProducerConfig(props))
- val topicProps = new Properties()
- topicProps.put("min.insync.replicas","2")
- adminZkClient.createTopic(topicName, 1, 1,topicProps)
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicName, 0)
-
- val response = producer.send(produceRequest(topicName, 0,
- new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),-1))
-
- assertEquals(Errors.NOT_ENOUGH_REPLICAS, response.status(TopicAndPartition(topicName, 0)).error)
- }
-}
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index bf6db2f..7ad9371 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -21,7 +21,6 @@ import AbstractFetcherThread._
import com.yammer.metrics.Metrics
import kafka.cluster.BrokerEndPoint
import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}
-import kafka.server.OffsetTruncationState
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
index 8d1eb2c..35e3262 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
@@ -41,7 +41,7 @@ class DescribeLogDirsRequestTest extends BaseRequestTest {
val offlineDir = new File(servers.head.config.logDirs.tail.head).getAbsolutePath
servers.head.replicaManager.handleLogDirFailure(offlineDir)
createTopic(topic, partitionNum, 1)
- TestUtils.produceMessages(servers, topic, 10)
+ TestUtils.generateAndProduceMessages(servers, topic, 10)
val request = new DescribeLogDirsRequest.Builder(null).build()
val response = connectAndSend(request, ApiKeys.DESCRIBE_LOG_DIRS, controllerSocketServer)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 96f74a0..8212100 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -568,7 +568,7 @@ class KafkaApisTest {
capturedResponse
}
- private def setupBasicMetadataCache(topic: String, numPartitions: Int = 1): Unit = {
+ private def setupBasicMetadataCache(topic: String, numPartitions: Int): Unit = {
val replicas = List(0.asInstanceOf[Integer]).asJava
val partitionState = new UpdateMetadataRequest.PartitionState(1, 0, 1, replicas, 0, replicas, Collections.emptyList())
val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index c7a07ec..dcbeb21 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -21,24 +21,20 @@ import kafka.api.Request
import kafka.cluster.{BrokerEndPoint, Replica, Partition}
import kafka.log.LogManager
import kafka.server.AbstractFetcherThread.ResultWithPartitions
-import kafka.server.FetchPartitionData
import kafka.server.epoch.LeaderEpochCache
import org.apache.kafka.common.errors.{ReplicaNotAvailableException, KafkaStorageException}
import kafka.utils.{DelayedItem, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, FetchMetadata => JFetchMetadata}
-import org.apache.kafka.common.requests.FetchResponse.PartitionData
+import org.apache.kafka.common.requests.EpochEndOffset
import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH_OFFSET, UNDEFINED_EPOCH}
-import org.apache.kafka.common.utils.SystemTime
import org.easymock.EasyMock._
import org.easymock.{Capture, CaptureType, EasyMock, IAnswer}
import org.junit.Assert._
import org.junit.Test
import scala.collection.JavaConverters._
-import scala.collection.Seq
-import scala.collection.{Map, mutable}
+import scala.collection.{Seq, Map}
class ReplicaAlterLogDirsThreadTest {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index f8f4948..ac5b7ed 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -215,8 +215,6 @@ class ReplicaFetcherThreadTest {
// Create a capture to track what partitions/offsets are truncated
val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL)
- val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
-
// Setup all the dependencies
val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
val quota = createNiceMock(classOf[ReplicationQuotaManager])
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
index da80c0d..0e4b5b3 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
@@ -17,7 +17,6 @@
package kafka.tools
-import kafka.producer.ProducerConfig
import ConsoleProducer.LineMessageReader
import org.apache.kafka.clients.producer.KafkaProducer
import org.junit.{Assert, Test}
@@ -45,18 +44,11 @@ class ConsoleProducerTest {
val config = new ConsoleProducer.ProducerConfig(validArgs)
// New ProducerConfig constructor is package private, so we can't call it directly
// Creating new Producer to validate instead
- val producer = new KafkaProducer(ConsoleProducer.getNewProducerProps(config))
+ val producer = new KafkaProducer(ConsoleProducer.producerProps(config))
producer.close()
}
@Test
- @deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0")
- def testValidConfigsOldProducer() {
- val config = new ConsoleProducer.ProducerConfig(validArgs)
- new ProducerConfig(ConsoleProducer.getOldProducerProps(config))
- }
-
- @Test
def testInvalidConfigs() {
try {
new ConsoleProducer.ProducerConfig(invalidArgs)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 16b7e87..ec6c756 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -28,13 +28,10 @@ import javax.net.ssl.X509TrustManager
import kafka.api._
import kafka.cluster.{Broker, EndPoint}
-import kafka.common.TopicAndPartition
import kafka.consumer.{ConsumerConfig, ConsumerTimeoutException, KafkaStream}
import kafka.log._
import kafka.message._
-import kafka.producer._
import kafka.security.auth.{Acl, Authorizer, Resource}
-import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpointFile
import Implicits._
@@ -507,28 +504,6 @@ object TestUtils extends Logging {
builder.toString
}
- /**
- * Create a producer with a few pre-configured properties.
- * If certain properties need to be overridden, they can be provided in producerProps.
- */
- @deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0")
- def createProducer[K, V](brokerList: String,
- encoder: String = classOf[DefaultEncoder].getName,
- keyEncoder: String = classOf[DefaultEncoder].getName,
- partitioner: String = classOf[DefaultPartitioner].getName,
- producerProps: Properties = null): Producer[K, V] = {
- val props: Properties = getProducerConfig(brokerList)
-
- //override any explicitly specified properties
- if (producerProps != null)
- props ++= producerProps
-
- props.put("serializer.class", encoder)
- props.put("key.serializer.class", keyEncoder)
- props.put("partitioner.class", partitioner)
- new Producer[K, V](new kafka.producer.ProducerConfig(props))
- }
-
def securityConfigs(mode: Mode,
securityProtocol: SecurityProtocol,
trustStoreFile: Option[File],
@@ -672,18 +647,6 @@ object TestUtils extends Logging {
props
}
- @deprecated("This method has been deprecated and will be removed in a future release", "0.11.0.0")
- def getSyncProducerConfig(port: Int): Properties = {
- val props = new Properties()
- props.put("host", "localhost")
- props.put("port", port.toString)
- props.put("request.timeout.ms", "10000")
- props.put("request.required.acks", "1")
- props.put("serializer.class", classOf[StringEncoder].getName)
- props
- }
-
-
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
@@ -731,34 +694,6 @@ object TestUtils extends Logging {
buffer
}
- /**
- * Create a wired format request based on simple basic information
- */
- @deprecated("This method has been deprecated and it will be removed in a future release", "0.10.0.0")
- def produceRequest(topic: String,
- partition: Int,
- message: ByteBufferMessageSet,
- acks: Int,
- timeout: Int,
- correlationId: Int = 0,
- clientId: String): ProducerRequest = {
- produceRequestWithAcks(Seq(topic), Seq(partition), message, acks, timeout, correlationId, clientId)
- }
-
- @deprecated("This method has been deprecated and it will be removed in a future release", "0.10.0.0")
- def produceRequestWithAcks(topics: Seq[String],
- partitions: Seq[Int],
- message: ByteBufferMessageSet,
- acks: Int,
- timeout: Int,
- correlationId: Int = 0,
- clientId: String): ProducerRequest = {
- val data = topics.flatMap(topic =>
- partitions.map(partition => (TopicAndPartition(topic, partition), message))
- )
- new ProducerRequest(correlationId, clientId, acks.toShort, timeout, collection.mutable.Map(data:_*))
- }
-
def makeLeaderForPartition(zkClient: KafkaZkClient,
topic: String,
leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int],
@@ -1040,73 +975,32 @@ object TestUtils extends Logging {
logDirFailureChannel = new LogDirFailureChannel(logDirs.size))
}
- @deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0")
- def sendMessages(servers: Seq[KafkaServer],
- topic: String,
- numMessages: Int,
- partition: Int = -1,
- compression: CompressionCodec = NoCompressionCodec): List[String] = {
- val header = "test-%d".format(partition)
- val props = new Properties()
- props.put("compression.codec", compression.codec.toString)
- val ms = 0.until(numMessages).map(x => header + "-" + x)
-
- // Specific Partition
- if (partition >= 0) {
- val producer: Producer[Int, String] =
- createProducer(TestUtils.getBrokerListStrFromServers(servers),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[IntEncoder].getName,
- partitioner = classOf[FixedValuePartitioner].getName,
- producerProps = props)
-
- producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)): _*)
- debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
- producer.close()
- ms.toList
- } else {
- // Use topic as the key to determine partition
- val producer: Producer[String, String] = createProducer(
- TestUtils.getBrokerListStrFromServers(servers),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[StringEncoder].getName,
- partitioner = classOf[DefaultPartitioner].getName,
- producerProps = props)
- producer.send(ms.map(m => new KeyedMessage[String, String](topic, topic, m)): _*)
- producer.close()
- debug("Sent %d messages for topic [%s]".format(ms.size, topic))
- ms.toList
- }
- }
-
def produceMessages(servers: Seq[KafkaServer],
- topic: String,
- numMessages: Int,
+ records: Seq[ProducerRecord[Array[Byte], Array[Byte]]],
acks: Int = -1,
- valueBytes: Int = -1): Seq[Array[Byte]] = {
-
- val producer = createNewProducer(
- TestUtils.getBrokerListStrFromServers(servers),
- retries = 5,
- acks = acks
- )
- val values = try {
- val curValues = (0 until numMessages).map(x => valueBytes match {
- case -1 => s"test-$x".getBytes
- case _ => new Array[Byte](valueBytes)
- })
-
- val futures = curValues.map { value =>
- producer.send(new ProducerRecord(topic, value))
- }
+ compressionType: CompressionType = CompressionType.NONE): Unit = {
+ val props = new Properties()
+ props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType.name)
+ val producer = createNewProducer(TestUtils.getBrokerListStrFromServers(servers), retries = 5, acks = acks)
+ try {
+ val futures = records.map(producer.send)
futures.foreach(_.get)
- curValues
} finally {
producer.close()
}
- debug(s"Sent ${values.size} messages for topic [$topic]")
+ val topics = records.map(_.topic).distinct
+ debug(s"Sent ${records.size} messages for topics ${topics.mkString(",")}")
+ }
+ def generateAndProduceMessages(servers: Seq[KafkaServer],
+ topic: String,
+ numMessages: Int,
+ acks: Int = -1,
+ compressionType: CompressionType = CompressionType.NONE): Seq[String] = {
+ val values = (0 until numMessages).map(x => s"test-$x")
+ val records = values.map(v => new ProducerRecord[Array[Byte], Array[Byte]](topic, v.getBytes))
+ produceMessages(servers, records, acks, compressionType)
values
}
@@ -1130,7 +1024,7 @@ object TestUtils extends Logging {
*/
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def getMessages(topicMessageStreams: Map[String, List[KafkaStream[String, String]]],
- nMessagesPerThread: Int = -1): List[String] = {
+ nMessagesPerThread: Int = -1): List[String] = {
var messages: List[String] = Nil
val shouldGetAllMessages = nMessagesPerThread < 0
@@ -1544,19 +1438,3 @@ object TestUtils extends Logging {
}
}
-
-class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {
- override def toBytes(n: Int) = n.toString.getBytes
-}
-
-@deprecated("This class is deprecated and it will be removed in a future release.", "0.10.0.0")
-class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner {
- def partition(data: Any, numPartitions: Int): Int = {
- data.asInstanceOf[String].length % numPartitions
- }
-}
-
-@deprecated("This class has been deprecated and it will be removed in a future release.", "0.10.0.0")
-class FixedValuePartitioner(props: VerifiableProperties = null) extends Partitioner {
- def partition(data: Any, numPartitions: Int): Int = data.asInstanceOf[Int]
-}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 7ae69ce..c7b8aaa 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -74,6 +74,11 @@
JMX monitoring tools that do not automatically aggregate. To get the total count for a specific request type, the tool needs to be
updated to aggregate across different versions.
</li>
+ <li>The Scala producers, which have been deprecated since 0.10.0.0, have been removed. The Java producer has been the recommended option
+ since 0.9.0.0. Note that the behaviour of the default partitioner in the Java producer differs from the default partitioner
+ in the Scala producers. Users migrating should consider configuring a custom partitioner that retains the previous behaviour.</li>
+ <li>The ConsoleProducer no longer supports the Scala producer.</li>
+ <li>The deprecated kafka.tools.ProducerPerformance has been removed, please use org.apache.kafka.tools.ProducerPerformance.</li>
<li>New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from older version. </li>
<li><a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a> changed the retention time for Kafka Streams repartition topics by setting its default value to <code>Long.MAX_VALUE</code>.</li>
<li>Updated <code>ProcessorStateManager</code> APIs in Kafka Streams for registering state stores to the processor topology. For more details please read the Streams <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_200">Upgrade Guide</a>.</li>
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index e8b9f0f..ffae666 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -19,7 +19,6 @@
*/
package org.apache.kafka.streams.scala
-import java.util.Properties
import java.util.regex.Pattern
import org.scalatest.junit.JUnitSuite
@@ -28,11 +27,9 @@ import org.junit._
import org.apache.kafka.streams.scala.kstream._
-import org.apache.kafka.common.serialization._
-
import ImplicitConversions._
-import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ, _}
+import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _}
import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => KStreamJ, KGroupedStream => KGroupedStreamJ, _}
import collection.JavaConverters._
@@ -53,7 +50,6 @@ class TopologyTest extends JUnitSuite {
def getTopologyScala(): TopologyDescription = {
import Serdes._
- import collection.JavaConverters._
val streamBuilder = new StreamsBuilder
val textLines = streamBuilder.stream[String, String](inputTopic)
@@ -88,7 +84,6 @@ class TopologyTest extends JUnitSuite {
def getTopologyScala(): TopologyDescription = {
import Serdes._
- import collection.JavaConverters._
val streamBuilder = new StreamsBuilder
val textLines = streamBuilder.stream[String, String](inputTopic)
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index 18790a7..9cd56ec 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -88,7 +88,6 @@ class ProducerPerformanceService(HttpMetricsCollector, PerformanceService):
# tool from the development branch
tools_jar = self.path.jar(TOOLS_JAR_NAME, DEV_BRANCH)
tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
- tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
for jar in (tools_jar, tools_dependant_libs_jar):
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % jar
--
To stop receiving notification emails like this one, please contact
lindong@apache.org.