You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/03/18 02:12:46 UTC
kafka git commit: KAFKA-2982;
Mark the old Scala producer and related classes as deprecated
Repository: kafka
Updated Branches:
refs/heads/trunk dea0719e9 -> e89a9ce1a
KAFKA-2982; Mark the old Scala producer and related classes as deprecated
Also update server tests to always use new producer.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Gwen Shapira
Closes #1092 from ijuma/kafka-2982-deprecate-old-producers
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e89a9ce1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e89a9ce1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e89a9ce1
Branch: refs/heads/trunk
Commit: e89a9ce1a4383af32435c7f4ee04361b1b65797d
Parents: dea0719
Author: Ismael Juma <is...@juma.me.uk>
Authored: Thu Mar 17 18:12:40 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Mar 17 18:12:40 2016 -0700
----------------------------------------------------------------------
.../main/scala/kafka/client/ClientUtils.scala | 1 +
.../scala/kafka/javaapi/producer/Producer.scala | 2 +
.../scala/kafka/metrics/KafkaMetricsGroup.scala | 1 +
.../scala/kafka/producer/BaseProducer.scala | 6 ++
.../kafka/producer/BrokerPartitionInfo.scala | 3 +-
.../kafka/producer/ByteArrayPartitioner.scala | 2 +
.../kafka/producer/DefaultPartitioner.scala | 2 +
.../scala/kafka/producer/KeyedMessage.scala | 2 +
.../main/scala/kafka/producer/Partitioner.scala | 2 +
.../main/scala/kafka/producer/Producer.scala | 3 +-
.../producer/ProducerClosedException.scala | 1 +
.../scala/kafka/producer/ProducerConfig.scala | 4 +
.../scala/kafka/producer/ProducerPool.scala | 3 +-
.../kafka/producer/ProducerRequestStats.scala | 3 +
.../scala/kafka/producer/ProducerStats.scala | 2 +
.../kafka/producer/ProducerTopicStats.scala | 4 +-
.../scala/kafka/producer/SyncProducer.scala | 4 +
.../kafka/producer/SyncProducerConfig.scala | 6 ++
.../producer/async/AsyncProducerConfig.scala | 2 +
.../producer/async/DefaultEventHandler.scala | 1 +
.../kafka/producer/async/EventHandler.scala | 1 +
.../async/IllegalQueueStateException.scala | 1 +
.../producer/async/MissingConfigException.scala | 1 +
.../producer/async/ProducerSendThread.scala | 1 +
.../scala/kafka/tools/KafkaMigrationTool.java | 2 +-
.../kafka/api/BaseProducerSendTest.scala | 22 +----
.../kafka/api/PlaintextProducerSendTest.scala | 24 ++++-
.../kafka/api/ProducerFailureHandlingTest.scala | 9 +-
.../scala/kafka/tools/ConsoleProducerTest.scala | 3 +-
.../scala/unit/kafka/common/ConfigTest.scala | 1 +
.../ZookeeperConsumerConnectorTest.scala | 1 +
.../kafka/integration/AutoOffsetResetTest.scala | 1 +
.../unit/kafka/integration/FetcherTest.scala | 4 +-
.../kafka/integration/PrimitiveApiTest.scala | 1 +
.../ProducerConsumerTestHarness.scala | 1 +
.../integration/UncleanLeaderElectionTest.scala | 23 +++--
.../ZookeeperConsumerConnectorTest.scala | 2 +-
.../scala/unit/kafka/metrics/MetricsTest.scala | 6 +-
.../unit/kafka/network/SocketServerTest.scala | 14 ++-
.../unit/kafka/producer/AsyncProducerTest.scala | 21 +++--
.../unit/kafka/producer/ProducerTest.scala | 1 +
.../unit/kafka/producer/SyncProducerTest.scala | 34 ++++---
.../kafka/server/BaseReplicaFetchTest.scala | 22 +++--
.../unit/kafka/server/LogRecoveryTest.scala | 25 ++---
.../unit/kafka/server/ServerShutdownTest.scala | 36 +++----
.../test/scala/unit/kafka/utils/TestUtils.scala | 99 ++++++++++++--------
46 files changed, 263 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 2093749..fd1fc26 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -42,6 +42,7 @@ object ClientUtils extends Logging{
* @param producerConfig The producer's config
* @return topic metadata response
*/
+ @deprecated("This method has been deprecated and will be removed in a future release.", "0.10.0.0")
def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
var fetchMetaDataSucceeded: Boolean = false
var i: Int = 0
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/javaapi/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
index c465da5..44f9245 100644
--- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala
+++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
@@ -21,6 +21,8 @@ import kafka.producer.ProducerConfig
import kafka.producer.KeyedMessage
import scala.collection.mutable
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+ "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only
{
def this(config: ProducerConfig) = this(new kafka.producer.Producer[K,V](config))
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 72ecae1..12dfeb1 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -192,6 +192,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
removeAllMetricsInList(KafkaMetricsGroup.consumerMetricNameList, clientId)
}
+ @deprecated("This method has been deprecated and will be removed in a future release.", "0.10.0.0")
def removeAllProducerMetrics(clientId: String) {
ProducerRequestStatsRegistry.removeProducerRequestStats(clientId)
ProducerTopicStatsRegistry.removeProducerTopicStats(clientId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/BaseProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BaseProducer.scala b/core/src/main/scala/kafka/producer/BaseProducer.scala
index 9d0976f..83d9aa7 100644
--- a/core/src/main/scala/kafka/producer/BaseProducer.scala
+++ b/core/src/main/scala/kafka/producer/BaseProducer.scala
@@ -21,11 +21,15 @@ import java.util.Properties
// A base producer used whenever we need to have options for both old and new producers;
// this class will be removed once we fully rolled out 0.9
+@deprecated("This trait has been deprecated and will be removed in a future release. " +
+ "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
trait BaseProducer {
def send(topic: String, key: Array[Byte], value: Array[Byte])
def close()
}
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+ "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
class NewShinyProducer(producerProps: Properties) extends BaseProducer {
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
@@ -50,6 +54,8 @@ class NewShinyProducer(producerProps: Properties) extends BaseProducer {
}
}
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+ "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
class OldProducer(producerProps: Properties) extends BaseProducer {
// default to byte array partitioner
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 6fa00dd..4616c7e 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -24,7 +24,7 @@ import kafka.common.KafkaException
import kafka.utils.Logging
import kafka.client.ClientUtils
-
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class BrokerPartitionInfo(producerConfig: ProducerConfig,
producerPool: ProducerPool,
topicPartitionInfo: HashMap[String, TopicMetadata])
@@ -101,4 +101,5 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
}
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int])
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
index e6b100e..7848456 100755
--- a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
@@ -21,6 +21,8 @@ package kafka.producer
import kafka.utils._
import org.apache.kafka.common.utils.Utils
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+ "It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0")
class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner {
def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(java.util.Arrays.hashCode(key.asInstanceOf[Array[Byte]])) % numPartitions
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
index 1141ed1..6b10e51 100755
--- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
@@ -21,6 +21,8 @@ package kafka.producer
import kafka.utils._
import org.apache.kafka.common.utils.Utils
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+ "It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0")
class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
private val random = new java.util.Random
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/KeyedMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KeyedMessage.scala b/core/src/main/scala/kafka/producer/KeyedMessage.scala
index dbcf295..84ea232 100644
--- a/core/src/main/scala/kafka/producer/KeyedMessage.scala
+++ b/core/src/main/scala/kafka/producer/KeyedMessage.scala
@@ -21,6 +21,8 @@ package kafka.producer
* A topic, key, and value.
* If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.
*/
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+ "Please use org.apache.kafka.clients.producer.ProducerRecord instead.", "0.10.0.0")
case class KeyedMessage[K, V](topic: String, key: K, partKey: Any, message: V) {
if(topic == null)
throw new IllegalArgumentException("Topic cannot be null.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Partitioner.scala b/core/src/main/scala/kafka/producer/Partitioner.scala
index efe6d6d..5d24692 100644
--- a/core/src/main/scala/kafka/producer/Partitioner.scala
+++ b/core/src/main/scala/kafka/producer/Partitioner.scala
@@ -23,6 +23,8 @@ package kafka.producer
* Implementations will be constructed via reflection and are required to have a constructor that takes a single
* VerifiableProperties instance--this allows passing configuration properties into the partitioner implementation.
*/
+@deprecated("This trait has been deprecated and will be removed in a future release. " +
+ "Please use org.apache.kafka.clients.producer.Partitioner instead.", "0.10.0.0")
trait Partitioner {
/**
* Uses the key to calculate a partition bucket id for routing
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 4be06c8..c11ad21 100755
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -25,7 +25,8 @@ import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThre
import kafka.serializer.Encoder
import kafka.utils._
-
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+ "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
class Producer[K,V](val config: ProducerConfig,
private val eventHandler: EventHandler[K,V]) // only for unit testing
extends Logging {
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerClosedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerClosedException.scala b/core/src/main/scala/kafka/producer/ProducerClosedException.scala
index 27a5293..4f2f731 100644
--- a/core/src/main/scala/kafka/producer/ProducerClosedException.scala
+++ b/core/src/main/scala/kafka/producer/ProducerClosedException.scala
@@ -17,5 +17,6 @@
package kafka.producer
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerClosedException() extends RuntimeException("producer already closed") {
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala
index 08a4e51..c2715d0 100755
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala
@@ -23,6 +23,8 @@ import kafka.utils.{CoreUtils, VerifiableProperties}
import kafka.message.NoCompressionCodec
import kafka.common.{InvalidConfigException, Config}
+@deprecated("This object has been deprecated and will be removed in a future release. " +
+ "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
object ProducerConfig extends Config {
def validate(config: ProducerConfig) {
validateClientId(config.clientId)
@@ -48,6 +50,8 @@ object ProducerConfig extends Config {
}
}
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+ "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
class ProducerConfig private (val props: VerifiableProperties)
extends AsyncProducerConfig with SyncProducerConfigShared {
import ProducerConfig._
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerPool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala
index 5ad6812..60cef63 100644
--- a/core/src/main/scala/kafka/producer/ProducerPool.scala
+++ b/core/src/main/scala/kafka/producer/ProducerPool.scala
@@ -26,7 +26,7 @@ import kafka.utils.Logging
import scala.collection.mutable.HashMap
-
+@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
object ProducerPool {
/**
* Used in ProducerPool to initiate a SyncProducer connection with a broker.
@@ -40,6 +40,7 @@ object ProducerPool {
}
}
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerPool(val config: ProducerConfig) extends Logging {
private val syncProducers = new HashMap[Int, SyncProducer]
private val lock = new Object()
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
index b453f63..8ab948a 100644
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
import kafka.utils.Pool
import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker}
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup {
val tags = metricId match {
case ClientIdAndBroker(clientId, brokerHost, brokerPort) => Map("clientId" -> clientId, "brokerHost" -> brokerHost, "brokerPort" -> brokerPort.toString)
@@ -36,6 +37,7 @@ class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup
* Tracks metrics of requests made by a given producer client to all brokers.
* @param clientId ClientId of the given producer
*/
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerRequestStats(clientId: String) {
private val valueFactory = (k: ClientIdBroker) => new ProducerRequestMetrics(k)
private val stats = new Pool[ClientIdBroker, ProducerRequestMetrics](Some(valueFactory))
@@ -51,6 +53,7 @@ class ProducerRequestStats(clientId: String) {
/**
* Stores the request stats information of each producer client in a (clientId -> ProducerRequestStats) map.
*/
+@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
object ProducerRequestStatsRegistry {
private val valueFactory = (k: String) => new ProducerRequestStats(k)
private val globalStats = new Pool[String, ProducerRequestStats](Some(valueFactory))
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala
index 1d0fa88..9466f26 100644
--- a/core/src/main/scala/kafka/producer/ProducerStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerStats.scala
@@ -20,6 +20,7 @@ import kafka.metrics.KafkaMetricsGroup
import java.util.concurrent.TimeUnit
import kafka.utils.Pool
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerStats(clientId: String) extends KafkaMetricsGroup {
val tags: Map[String, String] = Map("clientId" -> clientId)
val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS, tags)
@@ -30,6 +31,7 @@ class ProducerStats(clientId: String) extends KafkaMetricsGroup {
/**
* Stores metrics of serialization and message sending activity of each producer client in a (clientId -> ProducerStats) map.
*/
+@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
object ProducerStatsRegistry {
private val valueFactory = (k: String) => new ProducerStats(k)
private val statsRegistry = new Pool[String, ProducerStats](Some(valueFactory))
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
index 97594c8..7bb9610 100644
--- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
@@ -21,7 +21,7 @@ import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic}
import kafka.utils.{Pool, threadsafe}
import java.util.concurrent.TimeUnit
-
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
@threadsafe
class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
val tags = metricId match {
@@ -38,6 +38,7 @@ class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
* Tracks metrics for each topic the given producer client has produced data to.
* @param clientId The clientId of the given producer client.
*/
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerTopicStats(clientId: String) {
private val valueFactory = (k: ClientIdTopic) => new ProducerTopicMetrics(k)
private val stats = new Pool[ClientIdTopic, ProducerTopicMetrics](Some(valueFactory))
@@ -53,6 +54,7 @@ class ProducerTopicStats(clientId: String) {
/**
* Stores the topic stats information of each producer client in a (clientId -> ProducerTopicStats) map.
*/
+@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0")
object ProducerTopicStatsRegistry {
private val valueFactory = (k: String) => new ProducerTopicStats(k)
private val globalStats = new Pool[String, ProducerTopicStats](Some(valueFactory))
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index ec3c4ab..de4f4ad 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -27,6 +27,8 @@ import org.apache.kafka.common.network.NetworkReceive
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.utils.Utils._
+@deprecated("This object has been deprecated and will be removed in a future release. " +
+ "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
object SyncProducer {
val RequestKey: Short = 0
val randomGenerator = new Random
@@ -36,6 +38,8 @@ object SyncProducer {
* Send a message set.
*/
@threadsafe
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+ "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
class SyncProducer(val config: SyncProducerConfig) extends Logging {
private val lock = new Object()
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
index a08ce00..207779c 100644
--- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
@@ -20,6 +20,8 @@ package kafka.producer
import java.util.Properties
import kafka.utils.VerifiableProperties
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+ "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
class SyncProducerConfig private (val props: VerifiableProperties) extends SyncProducerConfigShared {
def this(originalProps: Properties) {
this(new VerifiableProperties(originalProps))
@@ -33,6 +35,8 @@ class SyncProducerConfig private (val props: VerifiableProperties) extends SyncP
val port = props.getInt("port")
}
+@deprecated("This trait has been deprecated and will be removed in a future release. " +
+ "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
trait SyncProducerConfigShared {
val props: VerifiableProperties
@@ -59,6 +63,8 @@ trait SyncProducerConfigShared {
(1, Integer.MAX_VALUE))
}
+@deprecated("This object has been deprecated and will be removed in a future release. " +
+ "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
object SyncProducerConfig {
val DefaultClientId = ""
val DefaultRequiredAcks : Short = 0
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
index dd39de5..cc3a79d 100644
--- a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
@@ -18,6 +18,8 @@ package kafka.producer.async
import kafka.utils.VerifiableProperties
+@deprecated("This trait has been deprecated and will be removed in a future release. " +
+ "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0")
trait AsyncProducerConfig {
val props: VerifiableProperties
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 7abe48a..b79e64b 100755
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic._
import kafka.api.{TopicMetadata, ProducerRequest}
import org.apache.kafka.common.utils.Utils
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class DefaultEventHandler[K,V](config: ProducerConfig,
private val partitioner: Partitioner,
private val encoder: Encoder[V],
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/EventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/EventHandler.scala b/core/src/main/scala/kafka/producer/async/EventHandler.scala
index e724000..3a17bfb 100644
--- a/core/src/main/scala/kafka/producer/async/EventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/EventHandler.scala
@@ -21,6 +21,7 @@ import kafka.producer.KeyedMessage
/**
* Handler that dispatches the batched data from the queue.
*/
+@deprecated("This trait has been deprecated and will be removed in a future release.", "0.10.0.0")
trait EventHandler[K,V] {
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala b/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala
index 9ecdf76..7779715 100644
--- a/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala
+++ b/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala
@@ -20,6 +20,7 @@ package kafka.producer.async
/**
* Indicates that the given config parameter has invalid value
*/
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class IllegalQueueStateException(message: String) extends RuntimeException(message) {
def this() = this(null)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/MissingConfigException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/MissingConfigException.scala b/core/src/main/scala/kafka/producer/async/MissingConfigException.scala
index 304e0b2..a42678b 100644
--- a/core/src/main/scala/kafka/producer/async/MissingConfigException.scala
+++ b/core/src/main/scala/kafka/producer/async/MissingConfigException.scala
@@ -18,6 +18,7 @@
package kafka.producer.async
/* Indicates any missing configuration parameter */
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class MissingConfigException(message: String) extends RuntimeException(message) {
def this() = this(null)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 8a903f3..d423757 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -24,6 +24,7 @@ import kafka.producer.KeyedMessage
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerSendThread[K,V](val threadName: String,
val queue: BlockingQueue[KeyedMessage[K,V]],
val handler: EventHandler[K,V],
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index b1ab649..0b94902 100755
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -60,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* The user need to provide the configuration file for 0.7 consumer and 0.8 producer. For 0.8 producer,
* the "serializer.class" config is set to "kafka.serializer.DefaultEncoder" by the code.
*/
-@SuppressWarnings({"unchecked", "rawtypes"})
+@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
public class KafkaMigrationTool {
private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 2d89bf8..49ce748 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -48,7 +48,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
private var consumer2: SimpleConsumer = null
private val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
- private val topic = "topic"
+ protected val topic = "topic"
private val numRecords = 100
@Before
@@ -227,26 +227,6 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
}
- @Test
- def testWrongSerializer() {
- // send a record with a wrong type should receive a serialization exception
- try {
- val producer = createProducerWithWrongSerializer(brokerList)
- val record5 = new ProducerRecord[Array[Byte], Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes)
- producer.send(record5)
- fail("Should have gotten a SerializationException")
- } catch {
- case se: SerializationException => // this is ok
- }
- }
-
- private def createProducerWithWrongSerializer(brokerList: String): KafkaProducer[Array[Byte], Array[Byte]] = {
- val producerProps = new Properties()
- producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
- producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
- createProducer(brokerList, props = Some(producerProps))
- }
-
/**
* testClose checks the closing behavior
*
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index d017d13..111bc15 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -19,8 +19,9 @@ package kafka.api
import java.util.Properties
-import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.junit.Test
@@ -51,4 +52,25 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
return new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new ByteArraySerializer, new ByteArraySerializer)
}
+ @Test
+ def testWrongSerializer() {
+ // send a record with a wrong type should receive a serialization exception
+ try {
+ val producer = createProducerWithWrongSerializer(brokerList)
+ val record5 = new ProducerRecord[Array[Byte], Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes)
+ producer.send(record5)
+ fail("Should have gotten a SerializationException")
+ } catch {
+ case se: SerializationException => // this is ok
+ }
+ }
+
+ private def createProducerWithWrongSerializer(brokerList: String): KafkaProducer[Array[Byte], Array[Byte]] = {
+ val producerProps = new Properties()
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
+ return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 2bb203d..7a22c73 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -63,9 +63,12 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
override def setUp() {
super.setUp()
- producer1 = TestUtils.createNewProducer(brokerList, acks = 0, maxBlockMs = 10000L, bufferSize = producerBufferSize)
- producer2 = TestUtils.createNewProducer(brokerList, acks = 1, maxBlockMs = 10000L, bufferSize = producerBufferSize)
- producer3 = TestUtils.createNewProducer(brokerList, acks = -1, maxBlockMs = 10000L, bufferSize = producerBufferSize)
+ producer1 = TestUtils.createNewProducer(brokerList, acks = 0, requestTimeoutMs = 30000L, maxBlockMs = 10000L,
+ bufferSize = producerBufferSize)
+ producer2 = TestUtils.createNewProducer(brokerList, acks = 1, requestTimeoutMs = 30000L, maxBlockMs = 10000L,
+ bufferSize = producerBufferSize)
+ producer3 = TestUtils.createNewProducer(brokerList, acks = -1, requestTimeoutMs = 30000L, maxBlockMs = 10000L,
+ bufferSize = producerBufferSize)
}
@After
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala
index 7e211b7..4ddc7fe 100644
--- a/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala
+++ b/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala
@@ -51,9 +51,10 @@ class ConsoleProducerTest {
}
@Test
+ @deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0")
def testValidConfigsOldProducer() {
val config = new ConsoleProducer.ProducerConfig(validArgs)
- new producer.ProducerConfig(ConsoleProducer.getOldProducerProps(config));
+ new producer.ProducerConfig(ConsoleProducer.getOldProducerProps(config))
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/common/ConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ConfigTest.scala b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
index a42836c..26154f2 100644
--- a/core/src/test/scala/unit/kafka/common/ConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
@@ -26,6 +26,7 @@ import kafka.consumer.ConsumerConfig
class ConfigTest {
@Test
+ @deprecated("This test is deprecated and it will be removed in a future release.", "0.10.0.0")
def testInvalidClientIds() {
val invalidClientIds = new ArrayBuffer[String]()
val badChars = Array('/', '\\', ',', '\u0000', ':', "\"", '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '=')
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 28b1dd5..a69fba1 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -34,6 +34,7 @@ import org.junit.{Test, After, Before}
import scala.collection._
+@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging {
val RebalanceBackoffMs = 5000
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index a71ddf1..4515b94 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -28,6 +28,7 @@ import org.junit.{After, Before, Test}
import org.apache.log4j.{Level, Logger}
import org.junit.Assert._
+@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 5af5d1a..3dd0454 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -68,11 +68,11 @@ class FetcherTest extends KafkaServerTestHarness {
@Test
def testFetcher() {
val perNode = 2
- var count = TestUtils.sendMessages(servers, topic, perNode).size
+ var count = TestUtils.produceMessages(servers, topic, perNode).size
fetch(count)
assertQueueEmpty()
- count = TestUtils.sendMessages(servers, topic, perNode).size
+ count = TestUtils.produceMessages(servers, topic, perNode).size
fetch(count)
assertQueueEmpty()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index df752db..beb5d0e 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -34,6 +34,7 @@ import java.util.Properties
/**
* End to end tests of the primitive apis against a local server
*/
+@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHarness {
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
index cc5954d..2fdfc48 100644
--- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
@@ -23,6 +23,7 @@ import kafka.producer.Producer
import kafka.utils.{StaticPartitioner, TestUtils}
import kafka.serializer.StringEncoder
+@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
trait ProducerConsumerTestHarness extends KafkaServerTestHarness {
val host = "localhost"
var producer: Producer[String, String] = null
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 8e72ad3..b725d8b 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -23,6 +23,8 @@ import org.junit.{Test, After, Before}
import scala.util.Random
import org.apache.log4j.{Level, Logger}
import java.util.Properties
+import java.util.concurrent.ExecutionException
+
import kafka.admin.AdminUtils
import kafka.common.FailedToSendMessageException
import kafka.consumer.{Consumer, ConsumerConfig}
@@ -31,6 +33,7 @@ import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.CoreUtils
import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.errors.TimeoutException
import org.junit.Assert._
class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@@ -180,14 +183,14 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
debug("Follower for " + topic + " is: %s".format(followerId))
- sendMessage(servers, topic, "first")
+ produceMessage(servers, topic, "first")
waitUntilMetadataIsPropagated(servers, topic, partitionId)
assertEquals(List("first"), consumeAllMessages(topic))
// shutdown follower server
servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
- sendMessage(servers, topic, "second")
+ produceMessage(servers, topic, "second")
assertEquals(List("first", "second"), consumeAllMessages(topic))
// shutdown leader and then restart follower
@@ -197,7 +200,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
// wait until new leader is (uncleanly) elected
waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(followerId))
- sendMessage(servers, topic, "third")
+ produceMessage(servers, topic, "third")
// second message was lost due to unclean election
assertEquals(List("first", "third"), consumeAllMessages(topic))
@@ -215,14 +218,14 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
debug("Follower for " + topic + " is: %s".format(followerId))
- sendMessage(servers, topic, "first")
+ produceMessage(servers, topic, "first")
waitUntilMetadataIsPropagated(servers, topic, partitionId)
assertEquals(List("first"), consumeAllMessages(topic))
// shutdown follower server
servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
- sendMessage(servers, topic, "second")
+ produceMessage(servers, topic, "second")
assertEquals(List("first", "second"), consumeAllMessages(topic))
// shutdown leader and then restart follower
@@ -233,16 +236,20 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(-1))
// message production and consumption should both fail while leader is down
- intercept[FailedToSendMessageException] {
- sendMessage(servers, topic, "third")
+ try {
+ produceMessage(servers, topic, "third")
+ fail("Message produced while leader is down should fail, but it succeeded")
+ } catch {
+ case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] => // expected
}
+
assertEquals(List.empty[String], consumeAllMessages(topic))
// restart leader temporarily to send a successfully replicated message
servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup())
waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(leaderId))
- sendMessage(servers, topic, "third")
+ produceMessage(servers, topic, "third")
waitUntilMetadataIsPropagated(servers, topic, partitionId)
servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 88d95e8..e4c4697 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -36,7 +36,7 @@ import scala.collection.JavaConversions
import org.apache.log4j.{Level, Logger}
import org.junit.Assert._
-
+@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
val numNodes = 2
val numParts = 2
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index ee41fd7..3707deb 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -52,6 +52,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
}
@Test
+ @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
def testMetricsLeak() {
// create topic topic1 with 1 partition on broker 0
createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
@@ -78,13 +79,14 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assertFalse("Topic metrics exists after deleteTopic", checkTopicMetricsExists(topic))
}
+ @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = {
- val sentMessages1 = sendMessages(servers, topic, nMessages)
+ sendMessages(servers, topic, nMessages)
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
- val receivedMessages1 = getMessages(topicMessageStreams1, nMessages)
+ getMessages(topicMessageStreams1, nMessages)
zkConsumerConnector1.shutdown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index d94c314..5d28894 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -17,7 +17,6 @@
package kafka.network;
-
import java.net._
import javax.net.ssl._
import java.io._
@@ -33,7 +32,6 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.{ProduceRequest, RequestHeader}
import org.apache.kafka.common.utils.SystemTime
-import kafka.producer.SyncProducerConfig
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
@@ -103,9 +101,9 @@ class SocketServerTest extends JUnitSuite {
private def producerRequestBytes: Array[Byte] = {
val apiKey: Short = 0
val correlationId = -1
- val clientId = SyncProducerConfig.DefaultClientId
- val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
- val ack = SyncProducerConfig.DefaultRequiredAcks
+ val clientId = ""
+ val ackTimeoutMs = 10000
+ val ack = 0: Short
val emptyHeader = new RequestHeader(apiKey, clientId, correlationId)
val emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap[TopicPartition, ByteBuffer]())
@@ -249,9 +247,9 @@ class SocketServerTest extends JUnitSuite {
val apiKey = ApiKeys.PRODUCE.id
val correlationId = -1
- val clientId = SyncProducerConfig.DefaultClientId
- val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
- val ack = SyncProducerConfig.DefaultRequiredAcks
+ val clientId = ""
+ val ackTimeoutMs = 10000
+ val ack = 0: Short
val emptyHeader = new RequestHeader(apiKey, clientId, correlationId)
val emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap[TopicPartition, ByteBuffer]())
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index f711ca4..3088199 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -35,7 +35,13 @@ import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import kafka.utils._
+@deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0")
class AsyncProducerTest {
+
+ class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner {
+ def partition(data: Any, numPartitions: Int): Int = -1
+ }
+
// One of the few cases we can just set a fixed port because the producer is mocked out here since this uses mocks
val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port=65534))
val configs = props.map(KafkaConfig.fromProps)
@@ -373,15 +379,20 @@ class AsyncProducerTest {
val msgs = TestUtils.getMsgStrings(2)
+ import SyncProducerConfig.{DefaultAckTimeoutMs, DefaultClientId}
+
// produce request for topic1 and partitions 0 and 1. Let the first request fail
// entirely. The second request will succeed for partition 1 but fail for partition 0.
// On the third try for partition 0, let it succeed.
- val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 11)
- val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 17)
+ val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1,
+ correlationId = 11, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
+ val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1,
+ correlationId = 17, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
val response1 = ProducerResponse(0,
Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NOT_LEADER_FOR_PARTITION.code, 0L)),
(TopicAndPartition("topic1", 1), ProducerResponseStatus(Errors.NONE.code, 0L))))
- val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 21)
+ val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 21,
+ timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
val response2 = ProducerResponse(0,
Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NONE.code, 0L))))
val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
@@ -480,7 +491,3 @@ class AsyncProducerTest {
messages.map(m => new Message(key = key, bytes = m, timestamp = 0L, magicValue = Message.MagicValue_V1)): _*)
}
}
-
-class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner {
- def partition(data: Any, numPartitions: Int): Int = -1
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index de19f6f..4a1ad5a 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -37,6 +37,7 @@ import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.scalatest.exceptions.TestFailedException
+@deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0")
class ProducerTest extends ZooKeeperTestHarness with Logging{
private val brokerId1 = 0
private val brokerId2 = 1
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index c1034fe..8e234d2 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -21,7 +21,7 @@ import java.net.SocketTimeoutException
import java.util.Properties
import kafka.admin.AdminUtils
-import kafka.api.ProducerResponseStatus
+import kafka.api.{ProducerRequest, ProducerResponseStatus}
import kafka.common.TopicAndPartition
import kafka.integration.KafkaServerTestHarness
import kafka.message._
@@ -31,11 +31,22 @@ import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import org.junit.Test
import org.junit.Assert._
+@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
class SyncProducerTest extends KafkaServerTestHarness {
private val messageBytes = new Array[Byte](2)
// turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head))
+ private def produceRequest(topic: String,
+ partition: Int,
+ message: ByteBufferMessageSet,
+ acks: Int,
+ timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs,
+ correlationId: Int = 0,
+ clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = {
+ TestUtils.produceRequest(topic, partition, message, acks, timeout, correlationId, clientId)
+ }
+
@Test
def testReachableServer() {
val server = servers.head
@@ -46,7 +57,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
val producer = new SyncProducer(new SyncProducerConfig(props))
val firstStart = SystemTime.milliseconds
try {
- val response = producer.send(TestUtils.produceRequest("test", 0,
+ val response = producer.send(produceRequest("test", 0,
new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
assertNotNull(response)
} catch {
@@ -56,7 +67,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
assertTrue((firstEnd-firstStart) < 500)
val secondStart = SystemTime.milliseconds
try {
- val response = producer.send(TestUtils.produceRequest("test", 0,
+ val response = producer.send(produceRequest("test", 0,
new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
assertNotNull(response)
} catch {
@@ -65,7 +76,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
val secondEnd = SystemTime.milliseconds
assertTrue((secondEnd-secondStart) < 500)
try {
- val response = producer.send(TestUtils.produceRequest("test", 0,
+ val response = producer.send(produceRequest("test", 0,
new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
assertNotNull(response)
} catch {
@@ -101,7 +112,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))
val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
- val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1, acks = 1))
+ val response1 = producer.send(produceRequest("test", 0, messageSet1, acks = 1))
assertEquals(1, response1.status.count(_._2.error != Errors.NONE.code))
assertEquals(Errors.MESSAGE_TOO_LARGE.code, response1.status(TopicAndPartition("test", 0)).error)
@@ -110,7 +121,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
val safeSize = configs(0).messageMaxBytes - Message.MinMessageOverhead - Message.TimestampLength - MessageSet.LogOverhead - 1
val message2 = new Message(new Array[Byte](safeSize))
val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
- val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2, acks = 1))
+ val response2 = producer.send(produceRequest("test", 0, messageSet2, acks = 1))
assertEquals(1, response1.status.count(_._2.error != Errors.NONE.code))
assertEquals(Errors.NONE.code, response2.status(TopicAndPartition("test", 0)).error)
@@ -130,14 +141,14 @@ class SyncProducerTest extends KafkaServerTestHarness {
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "test", 0)
// This message will be dropped silently since message size too large.
- producer.send(TestUtils.produceRequest("test", 0,
+ producer.send(produceRequest("test", 0,
new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
// Send another message whose size is large enough to exceed the buffer size so
// the socket buffer will be flushed immediately;
// this send should fail since the socket has been closed
try {
- producer.send(TestUtils.produceRequest("test", 0,
+ producer.send(produceRequest("test", 0,
new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
} catch {
case e : java.io.IOException => // success
@@ -154,7 +165,8 @@ class SyncProducerTest extends KafkaServerTestHarness {
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
// #1 - test that we get an error when partition does not belong to broker in response
- val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1)
+ val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1,
+ timeout = SyncProducerConfig.DefaultAckTimeoutMs, clientId = SyncProducerConfig.DefaultClientId)
val response = producer.send(request)
assertNotNull(response)
@@ -199,7 +211,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
val producer = new SyncProducer(new SyncProducerConfig(props))
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
- val request = TestUtils.produceRequest("topic1", 0, messages, acks = 1)
+ val request = produceRequest("topic1", 0, messages, acks = 1)
// stop IO threads and request handling, but leave networking operational
// any requests should be accepted and queue up, but not handled
@@ -248,7 +260,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
AdminUtils.createTopic(zkUtils, topicName, 1, 1,topicProps)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topicName, 0)
- val response = producer.send(TestUtils.produceRequest(topicName, 0,
+ val response = producer.send(produceRequest(topicName, 0,
new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),-1))
assertEquals(Errors.NOT_ENOUGH_REPLICAS.code, response.status(TopicAndPartition(topicName, 0)).error)
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
index 5ecc2c0..c5b61de 100644
--- a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
@@ -20,13 +20,13 @@ package kafka.server
import java.io.File
import org.apache.kafka.common.protocol.SecurityProtocol
-import org.junit.{Test, After, Before}
+import org.junit.{After, Before, Test}
import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.TestUtils._
-import kafka.producer.KeyedMessage
-import kafka.serializer.StringEncoder
-import kafka.utils.{TestUtils}
+import kafka.utils.TestUtils
+import TestUtils._
import kafka.common._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.StringSerializer
abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness {
var brokers: Seq[KafkaServer] = null
@@ -63,11 +63,13 @@ abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness {
}
// send test messages to leader
- val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(brokers),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[StringEncoder].getName)
- val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m => new KeyedMessage(topic2, m, m))
- producer.send(messages:_*)
+ val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(brokers),
+ retries = 5,
+ keySerializer = new StringSerializer,
+ valueSerializer = new StringSerializer)
+ val records = testMessageList1.map(m => new ProducerRecord(topic1, m, m)) ++
+ testMessageList2.map(m => new ProducerRecord(topic2, m, m))
+ records.map(producer.send).foreach(_.get)
producer.close()
def logsMatch(): Boolean = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index d11c40f..e13bfd9 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -19,15 +19,14 @@ package kafka.server
import java.util.Properties
import kafka.utils.TestUtils._
-import kafka.utils.{IntEncoder, CoreUtils, TestUtils}
+import kafka.utils.{CoreUtils, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import kafka.common._
-import kafka.producer.{KeyedMessage, Producer}
-import kafka.serializer.StringEncoder
-
import java.io.File
-import org.junit.{Test, After, Before}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
+import org.junit.{After, Before, Test}
import org.junit.Assert._
class LogRecoveryTest extends ZooKeeperTestHarness {
@@ -54,7 +53,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
val message = "hello"
- var producer: Producer[Int, String] = null
+ var producer: KafkaProducer[Integer, String] = null
def hwFile1 = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename))
def hwFile2 = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
var servers = Seq.empty[KafkaServer]
@@ -64,16 +63,19 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
def updateProducer() = {
if (producer != null)
producer.close()
- producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromServers(servers),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[IntEncoder].getName)
+ producer = TestUtils.createNewProducer(
+ TestUtils.getBrokerListStrFromServers(servers),
+ retries = 5,
+ keySerializer = new IntegerSerializer,
+ valueSerializer = new StringSerializer
+ )
}
@Before
override def setUp() {
super.setUp()
- configs = TestUtils.createBrokerConfigs(2, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps))
+ configs = TestUtils.createBrokerConfigs(2, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
// start both servers
server1 = TestUtils.createServer(configProps1)
@@ -230,7 +232,6 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
}
private def sendMessages(n: Int = 1) {
- for(i <- 0 until n)
- producer.send(new KeyedMessage[Int, String](topic, 0, message))
+ (0 until n).map(_ => producer.send(new ProducerRecord(topic, 0, message))).foreach(_.get)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 8f081b9..67f62d9 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -18,15 +18,14 @@ package kafka.server
import kafka.zk.ZooKeeperTestHarness
import kafka.consumer.SimpleConsumer
-import kafka.producer._
-import kafka.utils.{IntEncoder, TestUtils, CoreUtils}
+import kafka.utils.{CoreUtils, TestUtils}
import kafka.utils.TestUtils._
import kafka.api.FetchRequestBuilder
import kafka.message.ByteBufferMessageSet
-import kafka.serializer.StringEncoder
-
import java.io.File
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
import org.junit.{Before, Test}
import org.junit.Assert._
@@ -46,27 +45,34 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
@Test
def testCleanShutdown() {
+
+ def createProducer(server: KafkaServer): KafkaProducer[Integer, String] =
+ TestUtils.createNewProducer(
+ TestUtils.getBrokerListStrFromServers(Seq(server)),
+ retries = 5,
+ keySerializer = new IntegerSerializer,
+ valueSerializer = new StringSerializer
+ )
+
var server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
server.startup()
- var producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromServers(Seq(server)),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[IntEncoder].getName)
+ var producer = createProducer(server)
// create topic
createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
// send some messages
- producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)
+ sent1.map(value => producer.send(new ProducerRecord(topic, 0, value))).foreach(_.get)
// do a clean shutdown and check that offset checkpoint file exists
server.shutdown()
- for(logDir <- config.logDirs) {
+ for (logDir <- config.logDirs) {
val OffsetCheckpointFile = new File(logDir, server.logManager.RecoveryPointCheckpointFile)
assertTrue(OffsetCheckpointFile.exists)
assertTrue(OffsetCheckpointFile.length() > 0)
}
producer.close()
-
+
/* now restart the server and check that the written data is still readable and everything still works */
server = new KafkaServer(config)
server.startup()
@@ -74,13 +80,11 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
// wait for the broker to receive the update metadata request after startup
TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0)
- producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromServers(Seq(server)),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[IntEncoder].getName)
+ producer = createProducer(server)
val consumer = new SimpleConsumer(host, server.boundPort(), 1000000, 64*1024, "")
var fetchedMessage: ByteBufferMessageSet = null
- while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
+ while (fetchedMessage == null || fetchedMessage.validBytes == 0) {
val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build())
fetchedMessage = fetched.messageSet(topic, 0)
}
@@ -88,10 +92,10 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
val newOffset = fetchedMessage.last.nextOffset
// send some more messages
- producer.send(sent2.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)
+ sent2.map(value => producer.send(new ProducerRecord(topic, 0, value))).foreach(_.get)
fetchedMessage = null
- while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
+ while (fetchedMessage == null || fetchedMessage.validBytes == 0) {
val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build())
fetchedMessage = fetched.messageSet(topic, 0)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 49fb85f..7b3e955 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -42,14 +42,15 @@ import kafka.consumer.{ConsumerConfig, ConsumerTimeoutException, KafkaStream}
import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
import kafka.common.TopicAndPartition
import kafka.admin.AdminUtils
-import kafka.producer.ProducerConfig
import kafka.log._
import kafka.utils.ZkUtils._
import org.junit.Assert._
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer, RangeAssignor}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.network.Mode
+import org.apache.kafka.common.record.CompressionType
+import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer}
import scala.collection.Map
import scala.collection.JavaConversions._
@@ -342,7 +343,7 @@ object TestUtils extends Logging {
// check if the actual iterator was longer
if (actual.hasNext) {
- var length2 = length;
+ var length2 = length
while (actual.hasNext) {
actual.next
length2 += 1
@@ -419,6 +420,7 @@ object TestUtils extends Logging {
* Create a producer with a few pre-configured properties.
* If certain properties need to be overridden, they can be provided in producerProps.
*/
+ @deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0")
def createProducer[K, V](brokerList: String,
encoder: String = classOf[DefaultEncoder].getName,
keyEncoder: String = classOf[DefaultEncoder].getName,
@@ -433,7 +435,7 @@ object TestUtils extends Logging {
props.put("serializer.class", encoder)
props.put("key.serializer.class", keyEncoder)
props.put("partitioner.class", partitioner)
- new Producer[K, V](new ProducerConfig(props))
+ new Producer[K, V](new kafka.producer.ProducerConfig(props))
}
private def securityConfigs(mode: Mode,
@@ -453,16 +455,18 @@ object TestUtils extends Logging {
/**
* Create a (new) producer with a few pre-configured properties.
*/
- def createNewProducer(brokerList: String,
+ def createNewProducer[K, V](brokerList: String,
acks: Int = -1,
maxBlockMs: Long = 60 * 1000L,
bufferSize: Long = 1024L * 1024L,
retries: Int = 0,
lingerMs: Long = 0,
+ requestTimeoutMs: Long = 10 * 1024L,
securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
trustStoreFile: Option[File] = None,
- props: Option[Properties] = None): KafkaProducer[Array[Byte], Array[Byte]] = {
- import org.apache.kafka.clients.producer.ProducerConfig
+ keySerializer: Serializer[K] = new ByteArraySerializer,
+ valueSerializer: Serializer[V] = new ByteArraySerializer,
+ props: Option[Properties] = None): KafkaProducer[K, V] = {
val producerProps = props.getOrElse(new Properties)
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
@@ -470,15 +474,15 @@ object TestUtils extends Logging {
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString)
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
+ producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs.toString)
/* Only use these if not already set */
val defaultProps = Map(
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> "100",
ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG -> "200",
- ProducerConfig.LINGER_MS_CONFIG -> lingerMs.toString,
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArraySerializer",
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArraySerializer"
+ ProducerConfig.LINGER_MS_CONFIG -> lingerMs.toString
)
+
defaultProps.foreach { case (key, value) =>
if (!producerProps.containsKey(key)) producerProps.put(key, value)
}
@@ -489,10 +493,10 @@ object TestUtils extends Logging {
* invoke it before this call in IntegrationTestHarness, otherwise the
* SSL client auth fails.
*/
- if(!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
+ if (!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
producerProps.putAll(producerSecurityConfigs(securityProtocol, trustStoreFile))
- new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
+ new KafkaProducer[K, V](producerProps, keySerializer, valueSerializer)
}
private def usesSslTransportLayer(securityProtocol: SecurityProtocol): Boolean = securityProtocol match {
@@ -558,8 +562,6 @@ object TestUtils extends Logging {
props.put("request.timeout.ms", "2000")
props.put("request.required.acks", "-1")
props.put("send.buffer.bytes", "65536")
- props.put("connect.timeout.ms", "100000")
- props.put("reconnect.interval", "10000")
props
}
@@ -620,23 +622,25 @@ object TestUtils extends Logging {
/**
* Create a wired format request based on simple basic information
*/
+ @deprecated("This method has been deprecated and it will be removed in a future release", "0.10.0.0")
def produceRequest(topic: String,
partition: Int,
message: ByteBufferMessageSet,
- acks: Int = SyncProducerConfig.DefaultRequiredAcks,
- timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs,
+ acks: Int,
+ timeout: Int,
correlationId: Int = 0,
- clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = {
+ clientId: String): ProducerRequest = {
produceRequestWithAcks(Seq(topic), Seq(partition), message, acks, timeout, correlationId, clientId)
}
+ @deprecated("This method has been deprecated and it will be removed in a future release", "0.10.0.0")
def produceRequestWithAcks(topics: Seq[String],
partitions: Seq[Int],
message: ByteBufferMessageSet,
- acks: Int = SyncProducerConfig.DefaultRequiredAcks,
- timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs,
+ acks: Int,
+ timeout: Int,
correlationId: Int = 0,
- clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = {
+ clientId: String): ProducerRequest = {
val data = topics.flatMap(topic =>
partitions.map(partition => (TopicAndPartition(topic, partition), message))
)
@@ -889,6 +893,8 @@ object TestUtils extends Logging {
time = time,
brokerState = new BrokerState())
}
+
+ @deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0")
def sendMessages(servers: Seq[KafkaServer],
topic: String,
numMessages: Int,
@@ -908,7 +914,7 @@ object TestUtils extends Logging {
partitioner = classOf[FixedValuePartitioner].getName,
producerProps = props)
- producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
+ producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)): _*)
debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
producer.close()
ms.toList
@@ -920,24 +926,43 @@ object TestUtils extends Logging {
keyEncoder = classOf[StringEncoder].getName,
partitioner = classOf[DefaultPartitioner].getName,
producerProps = props)
- producer.send(ms.map(m => new KeyedMessage[String, String](topic, topic, m)):_*)
+ producer.send(ms.map(m => new KeyedMessage[String, String](topic, topic, m)): _*)
producer.close()
debug("Sent %d messages for topic [%s]".format(ms.size, topic))
ms.toList
}
-
}
- def sendMessage(servers: Seq[KafkaServer],
- topic: String,
- message: String) = {
+ def produceMessages(servers: Seq[KafkaServer],
+ topic: String,
+ numMessages: Int): Seq[String] = {
+
+ val producer = createNewProducer(
+ TestUtils.getBrokerListStrFromServers(servers),
+ retries = 5,
+ requestTimeoutMs = 2000
+ )
+
+ val values = (0 until numMessages).map(x => s"test-$x")
+
+ val futures = values.map { value =>
+ producer.send(new ProducerRecord(topic, null, null, value.getBytes))
+ }
+ futures.foreach(_.get)
+ producer.close()
+
+ debug(s"Sent ${values.size} messages for topic [$topic]")
- val producer: Producer[String, String] =
- createProducer(TestUtils.getBrokerListStrFromServers(servers),
- encoder = classOf[StringEncoder].getName(),
- keyEncoder = classOf[StringEncoder].getName())
+ values
+ }
- producer.send(new KeyedMessage[String, String](topic, topic, message))
+ def produceMessage(servers: Seq[KafkaServer], topic: String, message: String) {
+ val producer = createNewProducer(
+ TestUtils.getBrokerListStrFromServers(servers),
+ retries = 5,
+ requestTimeoutMs = 2000
+ )
+ producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get
producer.close()
}
@@ -1056,18 +1081,14 @@ class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {
override def toBytes(n: Int) = n.toString.getBytes
}
-class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner{
+@deprecated("This class is deprecated and it will be removed in a future release.", "0.10.0.0")
+class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner {
def partition(data: Any, numPartitions: Int): Int = {
(data.asInstanceOf[String].length % numPartitions)
}
}
-class HashPartitioner(props: VerifiableProperties = null) extends Partitioner {
- def partition(data: Any, numPartitions: Int): Int = {
- (data.hashCode % numPartitions)
- }
-}
-
+@deprecated("This class has been deprecated and it will be removed in a future release.", "0.10.0.0")
class FixedValuePartitioner(props: VerifiableProperties = null) extends Partitioner {
def partition(data: Any, numPartitions: Int): Int = data.asInstanceOf[Int]
}