You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/01/28 02:24:35 UTC
samza git commit: SAMZA-227; upgrade kafka producer to new java client
Repository: samza
Updated Branches:
refs/heads/master 8ae92c6bd -> fa2c04f58
SAMZA-227; upgrade kafka producer to new java client
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fa2c04f5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fa2c04f5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fa2c04f5
Branch: refs/heads/master
Commit: fa2c04f581d89c00ccc9fe96b4216bd804c5c403
Parents: 8ae92c6
Author: Navina Ramesh <na...@gmail.com>
Authored: Tue Jan 27 16:53:45 2015 -0800
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Jan 27 16:53:45 2015 -0800
----------------------------------------------------------------------
build.gradle | 2 +
.../versioned/jobs/configuration-table.html | 31 +--
gradle/dependency-versions.gradle | 2 +-
.../kafka/KafkaCheckpointManager.scala | 9 +-
.../kafka/KafkaCheckpointManagerFactory.scala | 20 +-
.../org/apache/samza/config/KafkaConfig.scala | 51 +++-
.../samza/system/kafka/KafkaSystemFactory.scala | 31 +--
.../system/kafka/KafkaSystemProducer.scala | 157 +++++++-----
.../kafka/KafkaSystemProducerMetrics.scala | 17 +-
.../scala/org/apache/samza/util/KafkaUtil.scala | 12 +-
.../kafka/TestKafkaCheckpointManager.scala | 23 +-
.../apache/samza/config/TestKafkaConfig.scala | 4 +-
.../samza/system/kafka/MockKafkaProducer.java | 226 +++++++++++++++++
.../system/kafka/TestKafkaSystemAdmin.scala | 22 +-
.../system/kafka/TestKafkaSystemFactory.scala | 2 +-
.../system/kafka/TestKafkaSystemProducer.scala | 243 ++++++++++---------
.../scala/org/apache/samza/utils/TestUtils.java | 112 +++++++++
.../src/main/config/negate-number.properties | 11 +-
.../test/integration/TestStatefulTask.scala | 28 ++-
19 files changed, 709 insertions(+), 294 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index e119db1..5e64b4a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -155,9 +155,11 @@ project(":samza-kafka_$scalaVersion") {
compile "org.apache.zookeeper:zookeeper:$zookeeperVersion"
compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
compile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion"
+ compile "org.apache.kafka:kafka-clients:$kafkaVersion"
testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
+ testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
// Logging in tests is good.
testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 4ccc0e7..008fc78 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -675,9 +675,12 @@
</tr>
<tr>
- <td class="property" id="systems-samza-producer-metadata-broker-list">systems.<span class="system">system-name</span>.<br>producer.metadata.broker.list</td>
+ <td class="property" id="systems-samza-producer-bootstrap-servers">systems.<span class="system">system-name</span>.<br>producer.bootstrap.servers</td>
<td class="default"></td>
<td class="description">
+ <b>Note</b>:
+ <i>This variable was previously defined as "producer.metadata.broker.list", which has been deprecated with this version.</i>
+ <br />
A list of network endpoints where the Kafka brokers are running. This is given as
a comma-separated list of <code>hostname:port</code> pairs, for example
<code>kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092</code>.
@@ -689,34 +692,12 @@
</tr>
<tr>
- <td class="property" id="systems-samza-producer-producer-type">systems.<span class="system">system-name</span>.<br>producer.producer.type</td>
- <td class="default">sync</td>
- <td class="description">
- Controls whether messages emitted from a stream processor should be buffered before
- they are sent to Kafka. The options are:
- <dl>
- <dt><code>sync</code></dt>
- <dd>Any messages sent to output streams are synchronously flushed to the Kafka brokers
- before the next message from an input stream is processed.</dd>
- <dt><code>async</code></dt>
- <dd>Messages sent to output streams are buffered within the Samza container, and published
- to the Kafka brokers as a batch. This setting can increase throughput, but
- risks buffered messages being lost if a container abruptly fails. The maximum
- number of messages to buffer is controlled with
- systems.<span class="system">system-name</span>.producer.batch.num.messages
- and the maximum time (in milliseconds) to wait before flushing the buffer is set with
- systems.<span class="system">system-name</span>.producer.queue.buffering.max.ms.</dd>
- </dl>
- </td>
- </tr>
-
- <tr>
<td class="property" id="systems-samza-producer">systems.<span class="system">system-name</span>.<br>producer.*</td>
<td class="default"></td>
<td class="description">
- Any <a href="http://kafka.apache.org/documentation.html#producerconfigs">Kafka producer configuration</a>
+ Any <a href="http://kafka.apache.org/documentation.html#newproducerconfigs">Kafka producer configuration</a>
can be included here. For example, to change the request timeout, you can set
- systems.<span class="system">system-name</span>.producer.request.timeout.ms.
+ systems.<span class="system">system-name</span>.producer.timeout.ms.
(There is no need to configure <code>client.id</code> as it is automatically
configured by Samza.)
</td>
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 44dd426..7bbaa41 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -26,7 +26,7 @@
zkClientVersion = "0.3"
zookeeperVersion = "3.3.4"
metricsVersion = "2.2.0"
- kafkaVersion = "0.8.1.1"
+ kafkaVersion = "0.8.2-beta"
commonsHttpClientVersion = "3.1"
leveldbVersion = "1.8"
rocksdbVersion = "3.5.1"
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 1d5627d..c96394d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -31,8 +31,6 @@ import kafka.common.TopicExistsException
import kafka.common.UnknownTopicOrPartitionException
import kafka.consumer.SimpleConsumer
import kafka.message.InvalidMessageException
-import kafka.producer.KeyedMessage
-import kafka.producer.Producer
import kafka.utils.Utils
import org.I0Itec.zkclient.ZkClient
import org.apache.samza.SamzaException
@@ -45,6 +43,7 @@ import org.apache.samza.util.ExponentialSleepStrategy
import org.apache.samza.util.TopicMetadataStore
import scala.collection.mutable
import java.util.Properties
+import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
/**
* Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
@@ -62,7 +61,7 @@ class KafkaCheckpointManager(
bufferSize: Int,
fetchSize: Int,
metadataStore: TopicMetadataStore,
- connectProducer: () => Producer[Array[Byte], Array[Byte]],
+ connectProducer: () => Producer,
connectZk: () => ZkClient,
systemStreamPartitionGrouperFactoryString: String,
retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
@@ -71,7 +70,7 @@ class KafkaCheckpointManager(
import KafkaCheckpointManager._
var taskNames = Set[TaskName]()
- var producer: Producer[Array[Byte], Array[Byte]] = null
+ var producer: Producer = null
var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint
@@ -121,7 +120,7 @@ class KafkaCheckpointManager(
producer = connectProducer()
}
- producer.send(new KeyedMessage(checkpointTopic, key, 0, msg))
+ producer.send(new ProducerRecord(checkpointTopic, 0, key, msg)).get()
loop.done
},
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index f2defbd..0f64c08 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -20,7 +20,6 @@
package org.apache.samza.checkpoint.kafka
import org.apache.samza.util.Logging
-import kafka.producer.Producer
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient
import org.apache.samza.SamzaException
@@ -33,6 +32,7 @@ import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore }
import java.util.Properties
import scala.collection.JavaConversions._
+import org.apache.kafka.clients.producer.KafkaProducer
object KafkaCheckpointManagerFactory {
/**
@@ -41,17 +41,14 @@ object KafkaCheckpointManagerFactory {
val CHECKPOINT_LOG_VERSION_NUMBER = 1
val INJECTED_PRODUCER_PROPERTIES = Map(
- "request.required.acks" -> "-1",
+ "acks" -> "all",
// Forcibly disable compression because Kafka doesn't support compression
// on log compacted topics. Details in SAMZA-393.
- "compression.codec" -> "none",
- "producer.type" -> "sync",
- // Subtract one here, because DefaultEventHandler calls messageSendMaxRetries + 1.
- "message.send.max.retries" -> (Integer.MAX_VALUE - 1).toString)
+ "compression.codec" -> "none")
// Set the checkpoint topic configs to have a very small segment size and
- // enable log compaction. This keeps job startup time small since there
- // are fewer useless (overwritten) messages to read from the checkpoint
+ // enable log compaction. This keeps job startup time small since there
+ // are fewer useless (overwritten) messages to read from the checkpoint
// topic.
def getCheckpointTopicProperties(config: Config) = {
val segmentBytes = config
@@ -83,7 +80,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
val fetchSize = consumerConfig.fetchMessageMaxBytes // must be > buffer size
val connectProducer = () => {
- new Producer[Array[Byte], Array[Byte]](producerConfig)
+ new KafkaProducer(producerConfig.getProducerProperties)
}
val zkConnect = Option(consumerConfig.zkConnect)
.getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
@@ -92,9 +89,8 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
}
val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs"))
val jobId = config.getJobId.getOrElse("1")
- val brokersListString = Option(producerConfig.brokerList)
- .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
- val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, clientId, socketTimeout)
+ val bootstrapServers = producerConfig.bootsrapServers
+ val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, socketTimeout)
val checkpointTopic = getTopic(jobName, jobId)
// Find out the SSPGrouperFactory class so it can be included/verified in the key
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index e57b8ba..6d54350 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -22,14 +22,15 @@ package org.apache.samza.config
import java.util.regex.Pattern
-import org.apache.samza.SamzaException
import org.apache.samza.util.Util
+import org.apache.samza.util.Logging
import scala.collection.JavaConversions._
import kafka.consumer.ConsumerConfig
-import java.util.Properties
-import kafka.producer.ProducerConfig
-import java.util.UUID
+import java.util.{Properties, UUID}
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.samza.SamzaException
+import java.util
import scala.collection.JavaConverters._
import org.apache.samza.system.kafka.KafkaSystemFactory
import org.apache.samza.config.SystemConfig.Config2System
@@ -153,10 +154,48 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
injectedProps: Map[String, String] = Map()) = {
val subConf = config.subset("systems.%s.producer." format systemName, true)
- val producerProps = new Properties()
+ val producerProps = new util.HashMap[String, Object]()
producerProps.putAll(subConf)
producerProps.put("client.id", clientId)
producerProps.putAll(injectedProps)
- new ProducerConfig(producerProps)
+ new KafkaProducerConfig(systemName, clientId, producerProps)
+ }
+}
+
+class KafkaProducerConfig(val systemName: String,
+ val clientId: String = "",
+ properties: java.util.Map[String, Object] = new util.HashMap[String, Object]()) extends Logging {
+
+ // Copied from new Kafka API - Workaround until KAFKA-1794 is resolved
+ val RECONNECT_BACKOFF_MS_DEFAULT = 10L
+
+ //Overrides specific to samza-kafka (these are considered as defaults in Samza & can be overridden by user
+ val MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT: java.lang.Integer = 1.asInstanceOf[Integer]
+ val RETRIES_DEFAULT: java.lang.Integer = Integer.MAX_VALUE
+
+ def getProducerProperties = {
+ val producerProperties: java.util.Map[String, Object] = new util.HashMap[String, Object]()
+ producerProperties.putAll(properties)
+
+ // Always set (new) producer config for max_in_flight_requests_per_connection and retries_config to 1 & INT.MaxValue
+ // so that the producer does not optimistically send batches asychronously and thereby, messing up the ordering of outgoing messages
+ // Retries config is set to Max so that when all attempts fail, Samza also fails the send. We do not have any special handler
+ // for producer failure
+ // DO NOT let caller to override these 2 configs for Kafka
+ producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT)
+ producerProperties.put(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT)
+ producerProperties
+ }
+
+ val reconnectIntervalMs = Option(properties.get(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG))
+ .getOrElse(RECONNECT_BACKOFF_MS_DEFAULT).asInstanceOf[Long]
+
+ val bootsrapServers = {
+ if(properties.containsKey("metadata.broker.list"))
+ warn("Kafka producer configuration contains 'metadata.broker.list'. This configuration is deprecated . Samza has been upgraded " +
+ "to use Kafka's new producer API. Please update your configurations based on the documentation at http://kafka.apache.org/documentation.html#newproducerconfigs")
+ Option(properties.get("bootstrap.servers"))
+ .getOrElse(throw new SamzaException("No bootstrap servers defined in config for %s." format systemName))
+ .asInstanceOf[String]
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 4506ea3..38e3815 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -19,18 +19,14 @@
package org.apache.samza.system.kafka
-
-import java.util.Properties
-
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient.ZkClient
import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, ClientUtilTopicMetadataStore}
import org.apache.samza.config.Config
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.SamzaException
-import kafka.producer.Producer
+import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.samza.system.SystemFactory
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.ZKStringSerializer
class KafkaSystemFactory extends SystemFactory with Logging {
@@ -40,8 +36,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
// Kind of goofy to need a producer config for consumers, but we need metadata.
val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
- val brokerListString = Option(producerConfig.brokerList)
- .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
+ val bootstrapServers = producerConfig.bootsrapServers
val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
val timeout = consumerConfig.socketTimeoutMs
@@ -53,7 +48,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt
val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics)
- val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
+ val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, timeout)
new KafkaSystemConsumer(
systemName = systemName,
@@ -72,11 +67,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
val clientId = KafkaUtil.getClientId("samza-producer", config)
val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
- val batchSize = Option(producerConfig.batchNumMessages)
- .getOrElse(1000)
- val reconnectIntervalMs = Option(producerConfig.retryBackoffMs)
- .getOrElse(1000)
- val getProducer = () => { new Producer[Object, Object](producerConfig) }
+ val getProducer = () => { new KafkaProducer(producerConfig.getProducerProperties) }
val metrics = new KafkaSystemProducerMetrics(systemName, registry)
// Unlike consumer, no need to use encoders here, since they come for free
@@ -85,8 +76,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
new KafkaSystemProducer(
systemName,
- batchSize,
- new ExponentialSleepStrategy(initialDelayMs = reconnectIntervalMs),
+ new ExponentialSleepStrategy(initialDelayMs = producerConfig.reconnectIntervalMs),
getProducer,
metrics)
}
@@ -94,8 +84,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
def getAdmin(systemName: String, config: Config) = {
val clientId = KafkaUtil.getClientId("samza-admin", config)
val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
- val brokerListString = Option(producerConfig.brokerList)
- .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
+ val bootstrapServers = producerConfig.bootsrapServers
val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
val timeout = consumerConfig.socketTimeoutMs
val bufferSize = consumerConfig.socketReceiveBufferBytes
@@ -106,13 +95,13 @@ class KafkaSystemFactory extends SystemFactory with Logging {
{
val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).getOrElse("2").toInt
val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName))
- info("Creating topic meta information for topic: " + topicName + " with replication factor: " + replicationFactor)
+ info("Creating topic meta information for topic: %s with replication factor: %s" format (topicName, replicationFactor))
(topicName, changelogInfo)
}}.toMap
new KafkaSystemAdmin(
systemName,
- brokerListString,
+ bootstrapServers,
timeout,
bufferSize,
clientId,
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index a0e1ccb..cd1fced 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -19,25 +19,32 @@
package org.apache.samza.system.kafka
-import scala.collection.mutable.ArrayBuffer
import org.apache.samza.util.Logging
-import kafka.producer.KeyedMessage
-import kafka.producer.Producer
+import org.apache.kafka.clients.producer.{RecordMetadata, Callback, ProducerRecord, Producer}
import org.apache.samza.system.SystemProducer
import org.apache.samza.system.OutgoingMessageEnvelope
import org.apache.samza.util.ExponentialSleepStrategy
import org.apache.samza.util.TimerUtils
-
-class KafkaSystemProducer(
- systemName: String,
- batchSize: Int,
- retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
- getProducer: () => Producer[Object, Object],
- metrics: KafkaSystemProducerMetrics,
- val clock: () => Long = () => System.currentTimeMillis) extends SystemProducer with Logging with TimerUtils {
-
- var sourceBuffers = Map[String, ArrayBuffer[KeyedMessage[Object, Object]]]()
- var producer: Producer[Object, Object] = null
+import org.apache.samza.util.KafkaUtil
+import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
+import java.util.{Map => javaMap}
+import org.apache.samza.SamzaException
+import org.apache.kafka.common.errors.RetriableException
+import org.apache.kafka.common.PartitionInfo
+import java.util
+import java.util.concurrent.Future
+
+
+class KafkaSystemProducer(systemName: String,
+ retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
+ getProducer: () => Producer,
+ metrics: KafkaSystemProducerMetrics,
+ val clock: () => Long = () => System.currentTimeMillis) extends SystemProducer with Logging with TimerUtils
+{
+ var producer: Producer = null
+ val latestFuture: javaMap[String, Future[RecordMetadata]] = new util.HashMap[String, Future[RecordMetadata]]()
+ val sendFailed: AtomicBoolean = new AtomicBoolean(false)
+ var exceptionThrown: AtomicReference[Exception] = new AtomicReference[Exception]()
def start() {
}
@@ -49,59 +56,95 @@ class KafkaSystemProducer(
}
def register(source: String) {
- sourceBuffers += source -> ArrayBuffer()
-
- metrics.setBufferSize(source, () => sourceBuffers(source).size)
+ if(latestFuture.containsKey(source)) {
+ throw new SamzaException("%s is already registered with the %s system producer" format (source, systemName))
+ }
+ latestFuture.put(source, null)
}
def send(source: String, envelope: OutgoingMessageEnvelope) {
trace("Enqueueing message: %s, %s." format (source, envelope))
-
- metrics.sends.inc
-
- sourceBuffers(source) += new KeyedMessage[Object, Object](
- envelope.getSystemStream.getStream,
- envelope.getKey,
- envelope.getPartitionKey,
- envelope.getMessage)
-
- if (sourceBuffers(source).size >= batchSize) {
- flush(source)
+ if(producer == null) {
+ info("Creating a new producer for system %s." format systemName)
+ producer = getProducer()
+ debug("Created a new producer for system %s." format systemName)
}
+ // Java-based Kafka producer API requires an "Integer" type partitionKey and does not allow custom overriding of Partitioners
+ // Any kind of custom partitioning has to be done on the client-side
+ val topicName = envelope.getSystemStream.getStream
+ val partitions: java.util.List[PartitionInfo] = producer.partitionsFor(topicName)
+ val partitionKey = if(envelope.getPartitionKey != null) KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null
+ val record = new ProducerRecord(envelope.getSystemStream.getStream,
+ partitionKey,
+ envelope.getKey.asInstanceOf[Array[Byte]],
+ envelope.getMessage.asInstanceOf[Array[Byte]])
+
+ sendFailed.set(false)
+
+ retryBackoff.run(
+ loop => {
+ if(sendFailed.get()) {
+ throw exceptionThrown.get()
+ }
+ val futureRef: Future[RecordMetadata] =
+ producer.send(record, new Callback {
+ def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
+ if (exception == null) {
+ //send was successful. Don't retry
+ metrics.sendSuccess.inc
+ }
+ else {
+ //If there is an exception in the callback, it means that the Kafka producer has exhausted the max-retries
+ //Hence, fail container!
+ exceptionThrown.compareAndSet(null, exception)
+ sendFailed.set(true)
+ }
+ }
+ })
+ latestFuture.put(source, futureRef)
+ metrics.sends.inc
+ if(!sendFailed.get())
+ loop.done
+ },
+ (exception, loop) => {
+ if(exception != null && !exception.isInstanceOf[RetriableException]) { // Exception is thrown & not retriable
+ debug("Exception detail : ", exception)
+ //Close producer
+ stop()
+ producer = null
+ //Mark loop as done as we are not going to retry
+ loop.done
+ metrics.sendFailed.inc
+ throw new SamzaException("Failed to send message. Exception:\n %s".format(exception))
+ } else {
+ warn("Retrying send messsage due to RetriableException - %s. Turn on debugging to get a full stack trace".format(exception))
+ debug("Exception detail:", exception)
+ metrics.retries.inc
+ }
+ }
+ )
}
def flush(source: String) {
updateTimer(metrics.flushMs) {
- val buffer = sourceBuffers(source)
- trace("Flushing buffer with size: %s." format buffer.size)
metrics.flushes.inc
-
- retryBackoff.run(
- loop => {
- if (producer == null) {
- info("Creating a new producer for system %s." format systemName)
- producer = getProducer()
- debug("Created a new producer for system %s." format systemName)
- }
-
- producer.send(buffer: _*)
- loop.done
- metrics.flushSizes.inc(buffer.size)
- },
-
- (exception, loop) => {
- warn("Triggering a reconnect for %s because connection failed: %s" format (systemName, exception))
- debug("Exception detail: ", exception)
- metrics.reconnects.inc
-
- if (producer != null) {
- producer.close
- producer = null
- }
- })
-
- buffer.clear
- trace("Flushed buffer.")
+ //if latestFuture is null, it probably means that there has been no calls to "send" messages
+ //Hence, nothing to do in flush
+ if(latestFuture.get(source) != null) {
+ while (!latestFuture.get(source).isDone && !sendFailed.get()) {
+ //do nothing
+ }
+ if (sendFailed.get()) {
+ logger.error("Unable to send message from %s to system %s" format(source, systemName))
+ //Close producer
+ stop()
+ producer = null
+ metrics.flushFailed.inc
+ throw new SamzaException("Unable to send message from %s to system %s" format(source, systemName))
+ } else {
+ trace("Flushed %s." format (source))
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
index 7e1383f..8aa73ce 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
@@ -28,15 +28,20 @@ import org.apache.samza.metrics.Gauge
import org.apache.samza.metrics.MetricsRegistry
class KafkaSystemProducerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
- val reconnects = newCounter("producer-reconnects")
+ /* Tracks the number of calls made to send in KafkaSystemProducer */
val sends = newCounter("producer-sends")
+ /* Tracks the number of calls made to flush in KafkaSystemProducer */
val flushes = newCounter("flushes")
- val flushSizes = newCounter("flush-sizes")
+ /* Tracks how long the flush call takes to complete */
val flushMs = newTimer("flush-ms")
-
- def setBufferSize(source: String, getValue: () => Int) {
- newGauge("%s-producer-buffer-size" format source, getValue)
- }
+ /* Tracks the number of times the system producer retries a send request (due to RetriableException) */
+ val retries = newCounter("producer-retries")
+ /* Tracks the number of times flush operation failed */
+ val flushFailed = newCounter("flush-failed")
+ /* Tracks the number of send requests that was failed by the KafkaProducer (due to unrecoverable errors) */
+ val sendFailed = newCounter("producer-send-failed")
+ /* Tracks the number of send requests that was successfully completed by the KafkaProducer */
+ val sendSuccess = newCounter("producer-send-success")
override def getPrefix = systemName + "-"
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index f1b7511..2482f23 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -21,7 +21,11 @@ package org.apache.samza.util
import org.apache.samza.config.{Config, ConfigException}
import org.apache.samza.config.JobConfig.Config2Job
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+import org.apache.samza.system.OutgoingMessageEnvelope
+import org.apache.kafka.common.utils.Utils
+import java.util.Random
+import org.apache.kafka.common.PartitionInfo
object KafkaUtil {
val counter = new AtomicLong(0)
@@ -39,4 +43,10 @@ object KafkaUtil {
System.currentTimeMillis,
counter.getAndIncrement)
+ private def abs(n: Int) = if(n == Integer.MIN_VALUE) 0 else math.abs(n)
+
+ def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope, partitions: java.util.List[PartitionInfo]): Integer = {
+ val numPartitions = partitions.size
+ abs(envelope.getPartitionKey.hashCode()) % numPartitions
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 553d6b4..7d4bea8 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -23,8 +23,7 @@ import kafka.admin.AdminUtils
import kafka.common.InvalidMessageSizeException
import kafka.common.UnknownTopicOrPartitionException
import kafka.message.InvalidMessageException
-import kafka.producer.Producer
-import kafka.producer.ProducerConfig
+
import kafka.server.KafkaConfig
import kafka.server.KafkaServer
import kafka.utils.TestUtils
@@ -35,7 +34,7 @@ import kafka.zk.EmbeddedZookeeper
import org.I0Itec.zkclient.ZkClient
import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.config.MapConfig
+import org.apache.samza.config.{KafkaProducerConfig, MapConfig}
import org.apache.samza.container.TaskName
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
import org.apache.samza.serializers.CheckpointSerde
@@ -47,6 +46,7 @@ import org.junit.{ AfterClass, BeforeClass, Test }
import scala.collection._
import scala.collection.JavaConversions._
+import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer}
object TestKafkaCheckpointManager {
val checkpointTopic = "checkpoint-topic"
@@ -69,13 +69,14 @@ object TestKafkaCheckpointManager {
val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
props1.put("controlled.shutdown.enable", "true")
- val config = new java.util.Properties()
+ val config = new java.util.HashMap[String, Object]()
val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
- config.put("metadata.broker.list", brokers)
- config.put("producer.type", "sync")
- config.put("request.required.acks", "-1")
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
+ config.put("acks", "all")
+ config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, new Integer(1))
+ config.put(ProducerConfig.RETRIES_CONFIG, new Integer(java.lang.Integer.MAX_VALUE-1))
config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
- val producerConfig = new ProducerConfig(config)
+ val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
val partition = new Partition(0)
val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123"))
val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345"))
@@ -163,7 +164,7 @@ class TestKafkaCheckpointManager {
fail("Expected a KafkaCheckpointException.")
} catch {
case e: KafkaCheckpointException => None
- }
+ }
kcm.stop
}
}
@@ -177,7 +178,7 @@ class TestKafkaCheckpointManager {
bufferSize = 64 * 1024,
fetchSize = 300 * 1024,
metadataStore = metadataStore,
- connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig),
+ connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
@@ -192,7 +193,7 @@ class TestKafkaCheckpointManager {
bufferSize = 64 * 1024,
fetchSize = 300 * 1024,
metadataStore = metadataStore,
- connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig),
+ connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
serde = new InvalideSerde(exception),
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 0e1c38e..95f76b6 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -39,7 +39,7 @@ class TestKafkaConfig {
val props = new Properties
props.setProperty(" systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")
props.setProperty("systems.kafka.consumer.zookeeper.connect", "localhost:2181/")
- props.setProperty("systems.kafka.producer.metadata.broker.list", "localhost:9092")
+ props.setProperty("systems.kafka.producer.bootstrap.servers", "localhost:9092")
val mapConfig = new MapConfig(props.toMap[String, String])
val kafkaConfig = new KafkaConfig(mapConfig)
@@ -82,7 +82,7 @@ class TestKafkaConfig {
def testStreamLevelFetchSizeOverride() {
val props = new Properties
props.setProperty("systems.kafka.consumer.zookeeper.connect", "localhost:2181/")
- props.setProperty("systems.kafka.producer.metadata.broker.list", "localhost:9092")
+ props.setProperty("systems.kafka.producer.bootstrap.servers", "localhost:9092")
val mapConfig = new MapConfig(props.toMap[String, String])
val kafkaConfig = new KafkaConfig(mapConfig)
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java
new file mode 100644
index 0000000..6a2edf6
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java
@@ -0,0 +1,226 @@
+package org.apache.samza.system.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.samza.utils.TestUtils;
+
+
+public class MockKafkaProducer implements Producer {
+
+ private Cluster _cluster;
+ private List<FutureTask<RecordMetadata>> _callbacksList = new ArrayList<FutureTask<RecordMetadata>>();
+ private boolean shouldBuffer = false;
+ private boolean errorNext = false;
+ private Exception exception = null;
+ private AtomicInteger msgsSent = new AtomicInteger(0);
+
+ /*
+ * Helps mock out buffered behavior seen in KafkaProducer. This MockKafkaProducer enables you to:
+ * - Create send that will instantly succeed & return a successful future
+ * - Set error for the next message that is sent (using errorNext). In this case, the next call to send returns a
+ * future with exception.
+ * Please note that errorNext is reset to false once a message send has failed. This means that errorNext has to be
+ * manually set to true in the unit test, before expecting failure for another message.
+ * - "shouldBuffer" can be turned on to start buffering messages. This will store all the callbacks and execute it
+ * at a later point of time in a separate thread. This thread NEEDS to be triggered from the unit test itself
+ * using "startDelayedSendThread" method
+ * - "Offset" in RecordMetadata is not guranteed to be correct
+ */
+ public MockKafkaProducer(int numNodes, String topicName, int numPartitions) {
+ this._cluster = TestUtils.clusterWith(numNodes, topicName, numPartitions);
+ }
+
+ public void setShouldBuffer(boolean shouldBuffer) {
+ this.shouldBuffer = shouldBuffer;
+ }
+
+ public void setErrorNext(boolean errorNext, Exception exception) {
+ this.errorNext = errorNext;
+ this.exception = exception;
+ }
+
+ public int getMsgsSent() {
+ return this.msgsSent.get();
+ }
+
+ public Thread startDelayedSendThread(final int sleepTime) {
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ FutureTask[] callbackArray = new FutureTask[_callbacksList.size()];
+ AtomicReferenceArray<FutureTask> _bufferList = new AtomicReferenceArray<FutureTask>(_callbacksList.toArray(callbackArray));
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+ try {
+ for(int i = 0; i < _bufferList.length(); i++) {
+ Thread.sleep(sleepTime);
+ FutureTask f = _bufferList.get(i);
+ if(!f.isDone()) {
+ executor.submit(f).get();
+ }
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException ee) {
+ ee.printStackTrace();
+ }
+ }
+ });
+ t.start();
+ return t;
+ }
+
+ @Override
+ public Future<RecordMetadata> send(ProducerRecord record) {
+ return send(record, null);
+ }
+
+ private RecordMetadata getRecordMetadata(ProducerRecord record) {
+ return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get());
+ }
+
+ @Override
+ public Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+ if (errorNext) {
+ if (shouldBuffer) {
+ FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() {
+ @Override
+ public RecordMetadata call()
+ throws Exception {
+ callback.onCompletion(null, exception);
+ return getRecordMetadata(record);
+ }
+ });
+ _callbacksList.add(f);
+ this.errorNext = false;
+ return f;
+ } else {
+ callback.onCompletion(null, this.exception);
+ this.errorNext = false;
+ return new FutureFailure(this.exception);
+ }
+ } else {
+ if (shouldBuffer) {
+ FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() {
+ @Override
+ public RecordMetadata call()
+ throws Exception {
+ msgsSent.incrementAndGet();
+ RecordMetadata metadata = getRecordMetadata(record);
+ callback.onCompletion(metadata, null);
+ return metadata;
+ }
+ });
+ _callbacksList.add(f);
+ return f;
+ } else {
+ int offset = msgsSent.incrementAndGet();
+ final RecordMetadata metadata = getRecordMetadata(record);
+ callback.onCompletion(metadata, null);
+ return new FutureSuccess(record, offset);
+ }
+ }
+ }
+
+ @Override
+ public List<PartitionInfo> partitionsFor(String topic) {
+ return this._cluster.partitionsForTopic(topic);
+ }
+
+ @Override
+ public Map<String, ? extends Metric> metrics() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ private static class FutureFailure implements Future<RecordMetadata> {
+
+ private final ExecutionException exception;
+
+ public FutureFailure(Exception exception) {
+ this.exception = new ExecutionException(exception);
+ }
+
+ @Override
+ public boolean cancel(boolean interrupt) {
+ return false;
+ }
+
+ @Override
+ public RecordMetadata get() throws ExecutionException {
+ throw this.exception;
+ }
+
+ @Override
+ public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
+ throw this.exception;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return true;
+ }
+ }
+
+ private static class FutureSuccess implements Future<RecordMetadata> {
+
+ private ProducerRecord record;
+ private final RecordMetadata _metadata;
+
+ public FutureSuccess(ProducerRecord record, int offset) {
+ this.record = record;
+ this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset);
+ }
+
+ @Override
+ public boolean cancel(boolean interrupt) {
+ return false;
+ }
+
+ @Override
+ public RecordMetadata get() throws ExecutionException {
+ return this._metadata;
+ }
+
+ @Override
+ public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
+ return this._metadata;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index c759a7b..a055204 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -27,9 +27,6 @@ import kafka.admin.AdminUtils
import kafka.common.ErrorMapping
import kafka.consumer.Consumer
import kafka.consumer.ConsumerConfig
-import kafka.producer.KeyedMessage
-import kafka.producer.Producer
-import kafka.producer.ProducerConfig
import kafka.server.KafkaConfig
import kafka.server.KafkaServer
import kafka.utils.TestUtils
@@ -50,6 +47,9 @@ import org.junit.Assert._
import org.junit.{Test, BeforeClass, AfterClass}
import scala.collection.JavaConversions._
+import org.apache.samza.config.KafkaProducerConfig
+import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
+import java.util
object TestKafkaSystemAdmin {
val TOPIC = "input"
@@ -70,14 +70,13 @@ object TestKafkaSystemAdmin {
val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
- val config = new java.util.Properties()
+ val config = new util.HashMap[String, Object]()
val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
- config.put("metadata.broker.list", brokers)
- config.put("producer.type", "sync")
+ config.put("bootstrap.servers", brokers)
config.put("request.required.acks", "-1")
config.put("serializer.class", "kafka.serializer.StringEncoder")
- val producerConfig = new ProducerConfig(config)
- var producer: Producer[String, String] = null
+ val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+ var producer: KafkaProducer = null
var zookeeper: EmbeddedZookeeper = null
var server1: KafkaServer = null
var server2: KafkaServer = null
@@ -91,7 +90,7 @@ object TestKafkaSystemAdmin {
server2 = TestUtils.createServer(new KafkaConfig(props2))
server3 = TestUtils.createServer(new KafkaConfig(props3))
zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer)
- producer = new Producer(producerConfig)
+ producer = new KafkaProducer(producerConfig.getProducerProperties)
metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
}
@@ -143,6 +142,7 @@ object TestKafkaSystemAdmin {
@AfterClass
def afterCleanLogDirs {
+ producer.close()
server1.shutdown
server1.awaitShutdown()
server2.shutdown
@@ -229,7 +229,7 @@ class TestKafkaSystemAdmin {
// Add a new message to one of the partitions, and verify that it works as
// expected.
- producer.send(new KeyedMessage(TOPIC, "key1", "val1"))
+ producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val1".getBytes)).get()
metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC))
assertEquals(1, metadata.size)
val streamName = metadata.keySet.head
@@ -245,7 +245,7 @@ class TestKafkaSystemAdmin {
assertEquals("0", sspMetadata.get(new Partition(3)).getUpcomingOffset)
// Add a second message to one of the same partition.
- producer.send(new KeyedMessage(TOPIC, "key1", "val2"))
+ producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val2".getBytes)).get()
metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC))
assertEquals(1, metadata.size)
assertEquals(TOPIC, streamName)
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
index 8067cbf..5f65144 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
@@ -66,7 +66,7 @@ class TestKafkaSystemFactory {
val producerFactory = new KafkaSystemFactory
val config = new MapConfig(Map[String, String](
"job.name" -> "test",
- "systems.test.producer.metadata.broker.list" -> "",
+ "systems.test.producer.bootstrap.servers" -> "",
"systems.test.samza.key.serde" -> "json",
"systems.test.samza.msg.serde" -> "json",
"serializers.registry.json.class" -> "samza.serializers.JsonSerdeFactory"))
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
index 72b36f7..ca10ea5 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
@@ -19,141 +19,152 @@
package org.apache.samza.system.kafka
-import java.nio.ByteBuffer
-import java.util.Properties
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-
-import kafka.producer.KeyedMessage
-import kafka.producer.Producer
-import kafka.producer.ProducerConfig
-import kafka.producer.ProducerPool
-import kafka.producer.async.DefaultEventHandler
-import kafka.serializer.Encoder
-
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.system.OutgoingMessageEnvelope
-import org.apache.samza.system.SystemStream
-import org.junit.Assert._
+import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream}
import org.junit.Test
-import scala.collection.JavaConversions._
+import org.apache.kafka.clients.producer._
+import java.util
+import org.junit.Assert._
+import org.scalatest.Assertions.intercept
+import org.apache.kafka.common.errors.RecordTooLargeException
+import org.apache.samza.SamzaException
-class TestKafkaSystemProducer {
- val someMessage = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "test")
+class TestKafkaSystemProducer {
- def getProps = {
- val props = new Properties
- props.put("broker.list", "")
- props.put("metadata.broker.list", "")
- props
- }
+ val someMessage = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "test".getBytes)
@Test
def testKafkaProducer {
- val props = getProps
- @volatile var msgsSent = new CountDownLatch(1)
-
- val producer = new KafkaSystemProducer(systemName = "test", batchSize = 1, getProducer = (() => {
- new Producer[Object, Object](new ProducerConfig(props)) {
- override def send(messages: KeyedMessage[Object, Object]*) {
- msgsSent.countDown
- }
- }
- }), metrics = new KafkaSystemProducerMetrics)
+ val systemProducer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => { new MockProducer(true) },
+ metrics = new KafkaSystemProducerMetrics)
+ systemProducer.register("test")
+ systemProducer.start
+ systemProducer.send("test", someMessage)
+ assertEquals(1, systemProducer.producer.asInstanceOf[MockProducer].history().size())
+ systemProducer.stop
+ }
- producer.register("test")
- producer.start
- producer.send("test", someMessage)
- producer.stop
- msgsSent.await(120, TimeUnit.SECONDS)
- assertEquals(0, msgsSent.getCount)
+ @Test
+ def testKafkaProducerUsingMockKafkaProducer {
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val systemProducer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => mockProducer,
+ metrics = new KafkaSystemProducerMetrics)
+ systemProducer.register("test")
+ systemProducer.start()
+ systemProducer.send("test", someMessage)
+ assertEquals(1, mockProducer.getMsgsSent)
+ systemProducer.stop()
}
@Test
- def testKafkaProducerBatch {
- val props = getProps
- @volatile var msgsSent = 0
-
- val producer = new KafkaSystemProducer(systemName = "test", batchSize = 2, getProducer = (() => {
- new Producer[Object, Object](new ProducerConfig(props)) {
- override def send(messages: KeyedMessage[Object, Object]*) {
- msgsSent += 1
- }
- }
- }), metrics = new KafkaSystemProducerMetrics)
-
- // second message should trigger the count down
- producer.register("test")
- producer.start
- producer.send("test", someMessage)
- assertEquals(0, msgsSent)
- producer.send("test", someMessage)
- assertEquals(1, msgsSent)
- producer.stop
+ def testKafkaProducerBufferedSend {
+ val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val systemProducer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => mockProducer,
+ metrics = new KafkaSystemProducerMetrics)
+ systemProducer.register("test")
+ systemProducer.start()
+ systemProducer.send("test", msg1)
+
+ mockProducer.setShouldBuffer(true)
+ systemProducer.send("test", msg2)
+ systemProducer.send("test", msg3)
+ assertEquals(1, mockProducer.getMsgsSent)
+
+ val sendThread: Thread = mockProducer.startDelayedSendThread(2000)
+ sendThread.join()
+
+ assertEquals(3, mockProducer.getMsgsSent)
+ systemProducer.stop()
}
@Test
- def testKafkaProducerFlush {
- val props = getProps
- val msgs = scala.collection.mutable.ListBuffer[String]()
- val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a")
- val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b")
-
- val producer = new KafkaSystemProducer(systemName = "test", batchSize = 3, getProducer = (() => {
- new Producer[Object, Object](new ProducerConfig(props)) {
- override def send(messages: KeyedMessage[Object, Object]*) {
- msgs ++= messages.map(_.message.asInstanceOf[String])
- }
- }
- }), metrics = new KafkaSystemProducerMetrics)
-
- // flush should trigger the count down
- producer.register("test")
- producer.start
- producer.send("test", msg1)
- producer.send("test", msg2)
- assertEquals(0, msgs.size)
- producer.flush("test")
- assertEquals(2, msgs.size)
- assertEquals("a", msgs(0))
- assertEquals("b", msgs(1))
- producer.stop
+ def testKafkaProducerFlushSuccessful {
+ val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val systemProducer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => mockProducer,
+ metrics = new KafkaSystemProducerMetrics)
+ systemProducer.register("test")
+ systemProducer.start()
+ systemProducer.send("test", msg1)
+
+ mockProducer.setShouldBuffer(true)
+ systemProducer.send("test", msg2)
+ systemProducer.send("test", msg3)
+ assertEquals(1, mockProducer.getMsgsSent)
+ mockProducer.startDelayedSendThread(2000)
+ systemProducer.flush("test")
+ assertEquals(3, mockProducer.getMsgsSent)
+ systemProducer.stop()
}
@Test
- def testKafkaSyncProducerExceptions {
- var msgsSent = 0
- val props = new Properties
- val out = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a")
- props.put("metadata.broker.list", "")
- props.put("broker.list", "")
- props.put("producer.type", "sync")
-
- var failCount = 0
- val producer = new KafkaSystemProducer(systemName = "test", batchSize = 1, getProducer = (() => {
- failCount += 1
- if (failCount <= 5) {
- throw new RuntimeException("Pretend to fail in factory")
- }
- new Producer[Object, Object](new ProducerConfig(props)) {
- override def send(messages: KeyedMessage[Object, Object]*) {
- assertNotNull(messages)
- assertEquals(1, messages.length)
- assertEquals("a", messages(0).message)
- msgsSent += 1
- if (msgsSent <= 5) {
- throw new RuntimeException("Pretend to fail in send")
- }
- }
- }
- }), metrics = new KafkaSystemProducerMetrics)
+ def testKafkaProducerFlushWithException {
+ val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "d".getBytes)
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val systemProducer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => mockProducer,
+ metrics = new KafkaSystemProducerMetrics)
+ systemProducer.register("test")
+ systemProducer.start()
+ systemProducer.send("test", msg1)
+
+ mockProducer.setShouldBuffer(true)
+ systemProducer.send("test", msg2)
+ mockProducer.setErrorNext(true, new RecordTooLargeException())
+ systemProducer.send("test", msg3)
+ systemProducer.send("test", msg4)
+
+ assertEquals(1, mockProducer.getMsgsSent)
+ mockProducer.startDelayedSendThread(2000)
+ val thrown = intercept[SamzaException] {
+ systemProducer.flush("test")
+ }
+ assertTrue(thrown.isInstanceOf[SamzaException])
+ assertEquals(2, mockProducer.getMsgsSent)
+ systemProducer.stop()
+ }
+ @Test
+ def testKafkaProducerExceptions {
+ val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "d".getBytes)
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val producer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => mockProducer,
+ metrics = new KafkaSystemProducerMetrics)
producer.register("test")
- producer.start
- producer.send("test", out)
- producer.stop
- assertEquals(6, msgsSent)
+ producer.start()
+ producer.send("test", msg1)
+ producer.send("test", msg2)
+ producer.send("test", msg3)
+ mockProducer.setErrorNext(true, new RecordTooLargeException())
+
+ val thrown = intercept[SamzaException] {
+ producer.send("test", msg4)
+ }
+ assertTrue(thrown.isInstanceOf[SamzaException])
+ assertTrue(thrown.getMessage.contains("RecordTooLargeException"))
+ assertEquals(true, producer.sendFailed.get())
+ assertEquals(3, mockProducer.getMsgsSent)
+ producer.stop()
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java b/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java
new file mode 100644
index 0000000..2fa743f
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samza.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+
+import static java.util.Arrays.asList;
+
+
+/**
+ * Copied from :kafka-clients API as a workaround until KAFKA-1861 is resolved
+ * Helper functions for writing unit tests
+ */
+public class TestUtils {
+
+ public static File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
+
+ public static String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+ public static String DIGITS = "0123456789";
+ public static String LETTERS_AND_DIGITS = LETTERS + DIGITS;
+
+ /* A consistent random number generator to make tests repeatable */
+ public static final Random seededRandom = new Random(192348092834L);
+ public static final Random random = new Random();
+
+ public static Cluster singletonCluster(String topic, int partitions) {
+ return clusterWith(1, topic, partitions);
+ }
+
+ public static Cluster clusterWith(int nodes, String topic, int partitions) {
+ Node[] ns = new Node[nodes];
+ for (int i = 0; i < nodes; i++)
+ ns[i] = new Node(0, "localhost", 1969);
+ List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
+ for (int i = 0; i < partitions; i++)
+ parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
+ return new Cluster(asList(ns), parts);
+ }
+
+ /**
+ * Choose a number of random available ports
+ */
+ public static int[] choosePorts(int count) {
+ try {
+ ServerSocket[] sockets = new ServerSocket[count];
+ int[] ports = new int[count];
+ for (int i = 0; i < count; i++) {
+ sockets[i] = new ServerSocket(0);
+ ports[i] = sockets[i].getLocalPort();
+ }
+ for (int i = 0; i < count; i++)
+ sockets[i].close();
+ return ports;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Choose an available port
+ */
+ public static int choosePort() {
+ return choosePorts(1)[0];
+ }
+
+ /**
+ * Generate an array of random bytes
+ *
+ * @param size The size of the array
+ */
+ public static byte[] randomBytes(int size) {
+ byte[] bytes = new byte[size];
+ seededRandom.nextBytes(bytes);
+ return bytes;
+ }
+
+ /**
+ * Generate a random string of letters and digits of the given length
+ *
+ * @param len The length of the string
+ * @return The random string
+ */
+ public static String randomString(int len) {
+ StringBuilder b = new StringBuilder();
+ for (int i = 0; i < len; i++)
+ b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length())));
+ return b.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-test/src/main/config/negate-number.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/negate-number.properties b/samza-test/src/main/config/negate-number.properties
index 4989b27..379fa61 100644
--- a/samza-test/src/main/config/negate-number.properties
+++ b/samza-test/src/main/config/negate-number.properties
@@ -43,13 +43,12 @@ systems.kafka.samza.msg.serde=string
systems.kafka.samza.key.serde=string
systems.kafka.samza.offset.default=oldest
systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.producer.compression.codec=gzip
-systems.kafka.producer.metadata.broker.list=localhost:9092
-systems.kafka.producer.request.required.acks=1
-systems.kafka.producer.topic.metadata.refresh.interval.ms=86400000
-systems.kafka.producer.producer.type=sync
+systems.kafka.producer.compression.type=gzip
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.producer.acks=1
+systems.kafka.producer.metadata.max.age.ms=86400000
# Normally, we'd set this much higher, but we want things to look snappy in the demo.
-systems.kafka.producer.batch.num.messages=1
+systems.kafka.producer.buffer.memory=1000000
# negate-number
streams.samza-test-topic.consumer.reset.offset=true
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2c04f5/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index ca25258..6f0eb21 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -22,16 +22,12 @@ package org.apache.samza.test.integration
import java.util.Properties
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
-import java.util.logging.{Level, LogManager, Handler, Logger}
import kafka.admin.AdminUtils
import kafka.common.ErrorMapping
import kafka.consumer.Consumer
import kafka.consumer.ConsumerConfig
import kafka.message.MessageAndMetadata
-import kafka.producer.KeyedMessage
-import kafka.producer.Producer
-import kafka.producer.ProducerConfig
import kafka.server.KafkaConfig
import kafka.server.KafkaServer
import kafka.utils.TestUtils
@@ -52,6 +48,7 @@ import org.apache.samza.job.StreamJob
import org.apache.samza.storage.kv.KeyValueStore
import org.apache.samza.system.kafka.TopicMetadataCache
import org.apache.samza.system.{SystemStreamPartition, IncomingMessageEnvelope}
+import org.apache.samza.config.KafkaProducerConfig
import org.apache.samza.task.InitableTask
import org.apache.samza.task.MessageCollector
import org.apache.samza.task.StreamTask
@@ -67,6 +64,9 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.SynchronizedMap
+import org.apache.kafka.clients.producer.{ProducerConfig, Producer, ProducerRecord, KafkaProducer}
+import java.util
+
object TestStatefulTask {
val INPUT_TOPIC = "input"
@@ -93,14 +93,15 @@ object TestStatefulTask {
props2.setProperty("auto.create.topics.enable","false")
props3.setProperty("auto.create.topics.enable","false")
- val config = new java.util.Properties()
+ val config = new util.HashMap[String, Object]()
val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
- config.put("metadata.broker.list", brokers)
- config.put("producer.type", "sync")
+ config.put("bootstrap.servers", brokers)
config.put("request.required.acks", "-1")
- config.put("serializer.class", "kafka.serializer.StringEncoder");
- val producerConfig = new ProducerConfig(config)
- var producer: Producer[String, String] = null
+ config.put("serializer.class", "kafka.serializer.StringEncoder")
+ config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, new Integer(1))
+ config.put(ProducerConfig.RETRIES_CONFIG, new Integer(Integer.MAX_VALUE-1))
+ val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+ var producer: Producer = null
val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "123"))
val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "12345"))
var zookeeper: EmbeddedZookeeper = null
@@ -116,7 +117,7 @@ object TestStatefulTask {
server2 = TestUtils.createServer(new KafkaConfig(props2))
server3 = TestUtils.createServer(new KafkaConfig(props3))
zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer)
- producer = new Producer(producerConfig)
+ producer = new KafkaProducer(producerConfig.getProducerProperties)
metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
createTopics
@@ -163,6 +164,7 @@ object TestStatefulTask {
@AfterClass
def afterCleanLogDirs {
+ producer.close()
server1.shutdown
server1.awaitShutdown()
server2.shutdown
@@ -216,7 +218,7 @@ class TestStatefulTask {
"systems.kafka.consumer.auto.offset.reset" -> "smallest", // applies to an empty topic
"systems.kafka.samza.msg.serde" -> "string",
"systems.kafka.consumer.zookeeper.connect" -> zkConnect,
- "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1),
+ "systems.kafka.producer.bootstrap.servers" -> ("localhost:%s" format port1),
// Since using state, need a checkpoint manager
"task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
"task.checkpoint.system" -> "kafka",
@@ -353,7 +355,7 @@ class TestStatefulTask {
* Send a message to the input topic, and validate that it gets to the test task.
*/
def send(task: TestTask, msg: String) {
- producer.send(new KeyedMessage(INPUT_TOPIC, msg))
+ producer.send(new ProducerRecord(INPUT_TOPIC, msg.getBytes)).get()
task.awaitMessage
assertEquals(msg, task.received.last)
}