You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/09/20 19:40:59 UTC
samza git commit: SAMZA-1392: KafkaSystemProducer performance and correctness with conc…
Repository: samza
Updated Branches:
refs/heads/master 89f8ca4ec -> 3773ec699
SAMZA-1392: KafkaSystemProducer performance and correctness with conc…
…urrent sends and flushes
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>, Yi Pan (Data Infrastructure) <ni...@gmail.com>
Closes #272 from jmakes/samza-1392
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3773ec69
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3773ec69
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3773ec69
Branch: refs/heads/master
Commit: 3773ec699ef07fa421edbb72d3fd83d7b2273144
Parents: 89f8ca4
Author: Jacob Maes <jm...@linkedin.com>
Authored: Wed Sep 20 12:40:47 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Wed Sep 20 12:40:47 2017 -0700
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 18 +
.../samza/system/SystemProducerException.java | 38 ++
.../org/apache/samza/config/TaskConfig.scala | 3 +
.../samza/container/TestTaskInstance.scala | 35 ++
.../org/apache/samza/config/KafkaConfig.scala | 34 +-
.../samza/system/kafka/KafkaSystemFactory.scala | 5 +-
.../system/kafka/KafkaSystemProducer.scala | 256 ++++++-----
.../samza/system/kafka/MockKafkaProducer.java | 87 ++--
.../kafka/TestKafkaSystemProducerJava.java | 2 +-
.../apache/samza/config/TestKafkaConfig.scala | 24 +-
.../system/kafka/TestKafkaSystemProducer.scala | 427 ++++++++++++++++---
.../storage/kv/TestKeyValueStorageEngine.scala | 14 +-
.../test/integration/StreamTaskTestUtil.scala | 1 +
.../test/integration/TestStatefulTask.scala | 2 +-
14 files changed, 705 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 9b4e279..d736d6b 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -675,6 +675,24 @@
</tr>
<tr>
+ <td class="property" id="task-drop-producer-errors">task.drop.producer.errors</td>
+ <td class="default">false</td>
+ <td class="description">
+ If true, producer errors will be logged and ignored. The only exceptions that will be thrown are
+ those which are likely caused by the application itself (e.g. serializaiton errors). If false,
+ the producer will be closed and producer errors will be propagated upward until the container
+ ultimately fails. Failing the container is a safety precaution to ensure the latest checkpoints
+ only reflect the events that have been completely and successfully processed. However, some
+ applications prefer to remain running at all costs, even if that means lost messages. Setting
+ this property to true will enable applications to recover from producer errors at the expense of
+ one or many (in the case of batching producers) dropped messages. If you enable this, it is highly
+ recommended that you also configure alerting on the 'producer-send-failed' metric, since the
+ producer might drop messages indefinitely. The logic for this property is specific to each
+ SystemProducer implementation. It will have no effect for SystemProducers that ignore the property.
+ </td>
+ </tr>
+
+ <tr>
<td class="property" id="task-log4j-system">task.log4j.system</td>
<td class="default"></td>
<td class="description">
http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-api/src/main/java/org/apache/samza/system/SystemProducerException.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemProducerException.java b/samza-api/src/main/java/org/apache/samza/system/SystemProducerException.java
new file mode 100644
index 0000000..6e663b1
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemProducerException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import org.apache.samza.SamzaException;
+
+
+public class SystemProducerException extends SamzaException {
+ private static final long serialVersionUID = 1L;
+
+ public SystemProducerException(String s) {
+ super(s);
+ }
+
+ public SystemProducerException(Throwable t) {
+ super(t);
+ }
+
+ public SystemProducerException(String s, Throwable t) {
+ super(s, t);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index e49d74b..2ee0032 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -36,6 +36,7 @@ object TaskConfig {
val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class"
val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails
val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
+ val DROP_PRODUCER_ERROR = "task.drop.producer.errors" // whether to ignore producer errors and drop the messages that failed to send
val IGNORED_EXCEPTIONS = "task.ignored.exceptions" // exceptions to ignore in process and window
val GROUPER_FACTORY = "task.name.grouper.factory" // class name for task grouper
val MAX_CONCURRENCY = "task.max.concurrency" // max number of concurrent process for a AsyncStreamTask
@@ -107,6 +108,8 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getDropSerialization = getOption(TaskConfig.DROP_SERIALIZATION_ERROR)
+ def getDropProducerError = getBoolean(TaskConfig.DROP_PRODUCER_ERROR, false)
+
def getPollIntervalMs = getOption(TaskConfig.POLL_INTERVAL_MS)
def getIgnoredExceptions = getOption(TaskConfig.IGNORED_EXCEPTIONS)
http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 4958a57..d9eb7ef 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -391,6 +391,41 @@ class TestTaskInstance {
// Finally, checkpoint the inputs with the snapshotted checkpoint captured at the beginning of commit
mockOrder.verify(offsetManager).writeCheckpoint(taskName, checkpoint)
}
+
+ @Test(expected = classOf[SystemProducerException])
+ def testProducerExceptionsIsPropagated {
+ // Simple objects
+ val partition = new Partition(0)
+ val taskName = new TaskName("taskName")
+ val systemStream = new SystemStream("test-system", "test-stream")
+ val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+
+ // Mocks
+ val collector = Mockito.mock(classOf[TaskInstanceCollector])
+ when(collector.flush).thenThrow(new SystemProducerException("Test"))
+ val storageManager = Mockito.mock(classOf[TaskStorageManager])
+ val offsetManager = Mockito.mock(classOf[OffsetManager])
+
+ val taskInstance: TaskInstance = new TaskInstance(
+ Mockito.mock(classOf[StreamTask]).asInstanceOf[StreamTask],
+ taskName,
+ new MapConfig,
+ new TaskInstanceMetrics,
+ null,
+ Mockito.mock(classOf[SystemConsumers]),
+ collector,
+ Mockito.mock(classOf[SamzaContainerContext]),
+ offsetManager,
+ storageManager,
+ systemStreamPartitions = Set(systemStreamPartition))
+
+ try {
+ taskInstance.commit // Should not swallow the SystemProducerException
+ } finally {
+ Mockito.verify(offsetManager, times(0)).writeCheckpoint(any(classOf[TaskName]), any(classOf[Checkpoint]))
+ }
+ }
+
}
class MockSystemAdmin extends SystemAdmin {
http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 9ac21ef..79fd4f3 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -20,24 +20,19 @@
package org.apache.samza.config
+import java.util
import java.util.regex.Pattern
-
-import org.apache.samza.util.Util
-import org.apache.samza.util.Logging
-
-import scala.collection.JavaConverters._
-import kafka.consumer.ConsumerConfig
import java.util.{Properties, UUID}
+import kafka.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.samza.SamzaException
-import java.util
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.system.kafka.KafkaSystemFactory
+import org.apache.samza.util.{Logging, Util}
import scala.collection.JavaConverters._
-import org.apache.samza.system.kafka.KafkaSystemFactory
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.kafka.common.serialization.ByteArraySerializer
object KafkaConfig {
val TOPIC_REPLICATION_FACTOR = "replication.factor"
@@ -296,8 +291,10 @@ class KafkaProducerConfig(val systemName: String,
//Overrides specific to samza-kafka (these are considered as defaults in Samza & can be overridden by user
val MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT: java.lang.Integer = 1.asInstanceOf[Integer]
val RETRIES_DEFAULT: java.lang.Integer = Integer.MAX_VALUE
+ val LINGER_MS_DEFAULT: java.lang.Integer = 10
def getProducerProperties = {
+
val byteArraySerializerClassName = classOf[ByteArraySerializer].getCanonicalName
val producerProperties: java.util.Map[String, Object] = new util.HashMap[String, Object]()
producerProperties.putAll(properties)
@@ -320,14 +317,17 @@ class KafkaProducerConfig(val systemName: String,
producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT)
}
- if (producerProperties.containsKey(ProducerConfig.RETRIES_CONFIG)
- && producerProperties.get(ProducerConfig.RETRIES_CONFIG).asInstanceOf[String].toInt < RETRIES_DEFAULT) {
- warn("Samza does not provide producer failure handling. Consider setting '%s' to a large value, like Int.MAX." format ProducerConfig.RETRIES_CONFIG)
- } else {
- // Retries config is set to Max so that when all attempts fail, Samza also fails the send. We do not have any special handler
- // for producer failure
+ if (!producerProperties.containsKey(ProducerConfig.RETRIES_CONFIG)) {
+ debug("%s undefined. Defaulting to %s." format(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT))
producerProperties.put(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT)
}
+ producerProperties.get(ProducerConfig.RETRIES_CONFIG).toString.toInt // Verify int
+
+ if (!producerProperties.containsKey(ProducerConfig.LINGER_MS_CONFIG)) {
+ debug("%s undefined. Defaulting to %s." format(ProducerConfig.LINGER_MS_CONFIG, LINGER_MS_DEFAULT))
+ producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, LINGER_MS_DEFAULT)
+ }
+ producerProperties.get(ProducerConfig.LINGER_MS_CONFIG).toString.toInt // Verify int
producerProperties
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 638806b..e0b5540 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -26,7 +26,7 @@ import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, Clie
import org.apache.samza.config.Config
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.samza.system.SystemFactory
import org.apache.samza.config.StorageConfig._
@@ -97,7 +97,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
systemName,
new ExponentialSleepStrategy(initialDelayMs = producerConfig.reconnectIntervalMs),
getProducer,
- metrics)
+ metrics,
+ dropProducerExceptions = config.getDropProducerError)
}
def getAdmin(systemName: String, config: Config): SystemAdmin = {
http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index b680ed4..5e83666 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -21,100 +21,61 @@ package org.apache.samza.system.kafka
import java.util.concurrent.atomic.AtomicReference
-import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Future}
+import java.util.concurrent.TimeUnit
import org.apache.kafka.clients.producer.Callback
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.PartitionInfo
-import org.apache.samza.SamzaException
+import org.apache.kafka.common.errors.SerializationException
import org.apache.samza.system.OutgoingMessageEnvelope
import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemProducerException
import org.apache.samza.util.ExponentialSleepStrategy
import org.apache.samza.util.KafkaUtil
import org.apache.samza.util.Logging
import org.apache.samza.util.TimerUtils
-import scala.collection.JavaConverters._
-
class KafkaSystemProducer(systemName: String,
retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
getProducer: () => Producer[Array[Byte], Array[Byte]],
metrics: KafkaSystemProducerMetrics,
- val clock: () => Long = () => System.nanoTime) extends SystemProducer with Logging with TimerUtils
-{
-
- class SourceData {
- /**
- * lock to make send() and store its future atomic
- */
- val sendLock: Object = new Object
- /**
- * The most recent send's Future handle
- */
- @volatile
- var latestFuture: Future[RecordMetadata] = null
- /**
- * exceptionInCallback: to store the exception in case of any "ultimate" send failure (ie. failure
- * after exhausting max_retries in Kafka producer) in the I/O thread, we do not continue to queue up more send
- * requests from the samza thread. It helps the samza thread identify if the failure happened in I/O thread or not.
- *
- * In cases of multiple exceptions in the callbacks, we keep the first one before throwing.
- */
- var exceptionInCallback: AtomicReference[SamzaException] = new AtomicReference[SamzaException]()
- }
+ val clock: () => Long = () => System.nanoTime,
+ val dropProducerExceptions: Boolean = false) extends SystemProducer with Logging with TimerUtils {
+ // Represents a fatal error that caused the producer to close.
+ val fatalException: AtomicReference[SystemProducerException] = new AtomicReference[SystemProducerException]()
@volatile var producer: Producer[Array[Byte], Array[Byte]] = null
- var producerLock: Object = new Object
- val StreamNameNullOrEmptyErrorMsg = "Stream Name should be specified in the stream configuration file."
- val sources: ConcurrentHashMap[String, SourceData] = new ConcurrentHashMap[String, SourceData]
+ val producerLock: Object = new Object
def start(): Unit = {
+ producer = getProducer()
}
def stop() {
- try {
- val currentProducer = producer
- if (currentProducer != null) {
- producerLock.synchronized {
- if (currentProducer == producer) {
- // only nullify the member producer if it is still the same object, no point nullifying new producer
- producer = null
- }
+ info("Stopping producer for system: " + this.systemName)
+
+ // stop() should not happen often so no need to optimize locking
+ producerLock.synchronized {
+ try {
+ if (producer != null) {
+ producer.close // Also performs the equivalent of a flush()
}
- currentProducer.close
- sources.asScala.foreach {p =>
- if (p._2.exceptionInCallback.get() == null) {
- flush(p._1)
- }
+ val exception = fatalException.get()
+ if (exception != null) {
+ error("Observed an earlier send() error while closing producer", exception)
}
+ } catch {
+ case e: Exception => error("Error while closing producer for system: " + systemName, e)
+ } finally {
+ producer = null
}
- } catch {
- case e: Exception => error(e.getMessage, e)
}
}
def register(source: String) {
- if(sources.putIfAbsent(source, new SourceData) != null) {
- throw new SamzaException("%s is already registered with the %s system producer" format (source, systemName))
- }
- }
-
- def closeAndNullifyCurrentProducer(currentProducer: Producer[Array[Byte], Array[Byte]]) {
- try {
- // TODO: we should use timeout close() to make sure we fail all waiting messages in kafka 0.9+
- currentProducer.close()
- } catch {
- case e: Exception => error("producer close failed", e)
- }
- producerLock.synchronized {
- if (currentProducer == producer) {
- // only nullify the member producer if it is still the same object, no point nullifying new producer
- producer = null
- }
- }
}
def send(source: String, envelope: OutgoingMessageEnvelope) {
@@ -122,33 +83,18 @@ class KafkaSystemProducer(systemName: String,
val topicName = envelope.getSystemStream.getStream
if (topicName == null || topicName == "") {
- throw new IllegalArgumentException(StreamNameNullOrEmptyErrorMsg)
- }
-
- val sourceData = sources.get(source)
- if (sourceData == null) {
- throw new IllegalArgumentException("Source %s must be registered first before send." format source)
+ throw new IllegalArgumentException("Invalid system stream: " + envelope.getSystemStream)
}
- val exception = sourceData.exceptionInCallback.getAndSet(null)
- if (exception != null) {
+ val globalProducerException = fatalException.get()
+ if (globalProducerException != null) {
metrics.sendFailed.inc
- throw exception // in case the caller catches all exceptions and will try again
- }
-
- // lazy initialization of the producer
- if (producer == null) {
- producerLock.synchronized {
- if (producer == null) {
- info("Creating a new producer for system %s." format systemName)
- producer = getProducer()
- }
- }
+ throw new SystemProducerException("Producer was unable to recover from previous exception.", globalProducerException)
}
val currentProducer = producer
if (currentProducer == null) {
- throw new SamzaException("Kafka system producer is not available.")
+ throw new SystemProducerException("Kafka producer is null.")
}
// Java-based Kafka producer API requires an "Integer" type partitionKey and does not allow custom overriding of Partitioners
@@ -161,72 +107,114 @@ class KafkaSystemProducer(systemName: String,
envelope.getMessage.asInstanceOf[Array[Byte]])
try {
- sourceData.sendLock.synchronized {
- val futureRef: Future[RecordMetadata] =
- currentProducer.send(record, new Callback {
- def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
- if (exception == null) {
- //send was successful.
- metrics.sendSuccess.inc
- }
- else {
- error("Closing the producer because of an exception in callback: ", exception)
- //If there is an exception in the callback, close producer.
- closeAndNullifyCurrentProducer(currentProducer)
-
- // we keep the exception and will throw the exception in the next producer.send()
- // so the user can handle the exception and decide to fail or ignore
- sourceData.exceptionInCallback.compareAndSet(
- null,
- new SamzaException("Unable to send message from %s to system %s." format(source, systemName),
- exception))
-
- metrics.sendFailed.inc
- error("Unable to send message on Topic:%s Partition:%s" format(topicName, partitionKey),
- exception)
- }
- }
- })
- sourceData.latestFuture = futureRef
- }
+ currentProducer.send(record, new Callback {
+ def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
+ if (exception == null) {
+ metrics.sendSuccess.inc
+ } else {
+ val producerException = new SystemProducerException("Failed to send message for Source: %s on System:%s Topic:%s Partition:%s"
+ .format(source, systemName, topicName, partitionKey), exception)
+
+ handleSendException(currentProducer, producerException, true)
+ }
+ }
+ })
metrics.sends.inc
} catch {
- case e: Exception => {
- error("Closing the producer because of an exception in send: ", e)
-
- closeAndNullifyCurrentProducer(currentProducer)
+ case originalException : Exception =>
+ val producerException = new SystemProducerException("Failed to send message for Source: %s on System:%s Topic:%s Partition:%s"
+ .format(source, systemName, topicName, partitionKey), originalException)
- metrics.sendFailed.inc
- throw new SamzaException(("Failed to send message on Topic:%s Partition:%s Exception:\n %s,")
- .format(topicName, partitionKey, e))
- }
+ handleSendException(currentProducer, producerException, isFatalException(originalException))
+ throw producerException
}
}
+
def flush(source: String) {
updateTimer(metrics.flushNs) {
metrics.flushes.inc
- val sourceData = sources.get(source)
- //if latestFuture is null, it probably means that there has been no calls to "send" messages
- //Hence, nothing to do in flush
- if(sourceData.latestFuture != null) {
- while(!sourceData.latestFuture.isDone && sourceData.exceptionInCallback.get() == null) {
- try {
- sourceData.latestFuture.get()
- } catch {
- case t: Throwable => error(t.getMessage, t)
+ val currentProducer = producer
+ if (currentProducer == null) {
+ throw new SystemProducerException("Kafka producer is null.")
+ }
+
+ // Flush only throws InterruptedException, all other errors are handled in send() callbacks
+ currentProducer.flush()
+
+ // Invariant: At this point either
+ // 1. The producer is fine and there are no exceptions to handle OR
+ // 2. The producer is closed and one or more sources have exceptions to handle
+ // 2a. All new sends get a ProducerClosedException or IllegalStateException (depending on kafka version)
+ // 2b. There are no messages in flight because the producer is closed
+
+ // We must check for an exception AFTER flush() because when flush() returns all callbacks for messages sent
+ // in that flush() are guaranteed to have completed and we update the exception in the callback.
+ // If there is an exception, we rethrow it here to prevent the checkpoint.
+ val exception = fatalException.get()
+ if (exception != null) {
+ metrics.flushFailed.inc
+ throw new SystemProducerException("Flush failed. One or more batches of messages were not sent!", exception)
+ }
+ trace("Flushed %s." format source)
+ }
+ }
+
+
+ private def handleSendException(currentProducer: Producer[Array[Byte], Array[Byte]], producerException: SystemProducerException, isFatalException: Boolean) = {
+ metrics.sendFailed.inc
+ error(producerException)
+ // The SystemProducer API is synchronous, so there's no way for us to guarantee that an exception will
+ // be handled by the Task before we recreate the producer, and if it isn't handled, a concurrent send() from another
+ // Task could send on the new producer before the first Task properly handled the exception and produce out of order messages.
+ // So we have to handle it right here in the SystemProducer.
+ if (dropProducerExceptions) {
+ warn("Ignoring producer exception. All messages in the failed producer request will be dropped!")
+
+ if (isFatalException) {
+ producerLock.synchronized {
+ // Prevent each callback from recreating producer for the same failure.
+ if (currentProducer == producer) {
+ info("Creating a new producer for system %s." format systemName)
+ try {
+ currentProducer.close(0, TimeUnit.MILLISECONDS)
+ } catch {
+ case exception: Exception => error("Exception while closing producer.", exception)
+ }
+ producer = getProducer()
}
}
-
- //if there is an exception thrown from the previous callbacks just before flush, we have to fail the container
- if (sourceData.exceptionInCallback.get() != null) {
- metrics.flushFailed.inc
- throw sourceData.exceptionInCallback.get()
- } else {
- trace("Flushed %s." format (source))
+ }
+ } else {
+ // If there is an exception in the callback, it means that the Kafka producer has exhausted the max-retries
+ // Close producer to ensure messages queued in-flight are not sent and hence, avoid re-ordering
+ // This works because there is only 1 callback thread and no sends can complete until the callback returns.
+ if (isFatalException) {
+ fatalException.compareAndSet(null, producerException)
+ try {
+ currentProducer.close(0, TimeUnit.MILLISECONDS)
+ } catch {
+ case exception: Exception => error("Exception while closing producer.", exception)
}
}
}
}
+
+ /**
+ * A fatal exception is one that corrupts the producer or otherwise makes it unusable.
+ * We want to handle non-fatal exceptions differently because they can often be handled by the user
+ * and that's preferable because it gives users that drop exceptions a way to do that with less
+ * data loss (no collateral damage from batches of messages getting dropped)
+ *
+ * @param exception the exception to check
+ * @return true if the exception is unrecoverable.
+ */
+ private def isFatalException(exception: Exception): Boolean = {
+ exception match {
+ case _: SerializationException => false
+ case _: ClassCastException => false
+ case _ => true
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
index aaa949d..024c6e6 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
@@ -31,15 +31,16 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
+import kafka.producer.ProducerClosedException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.test.TestUtils;
@@ -49,8 +50,11 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
private List<FutureTask<RecordMetadata>> _callbacksList = new ArrayList<FutureTask<RecordMetadata>>();
private boolean shouldBuffer = false;
private boolean errorNext = false;
+ private boolean errorInCallback = true;
private Exception exception = null;
private AtomicInteger msgsSent = new AtomicInteger(0);
+ private boolean closed = false;
+ private int openCount = 0;
/*
* Helps mock out buffered behavior seen in KafkaProducer. This MockKafkaProducer enables you to:
@@ -72,8 +76,9 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
this.shouldBuffer = shouldBuffer;
}
- public void setErrorNext(boolean errorNext, Exception exception) {
+ public void setErrorNext(boolean errorNext, boolean errorInCallback, Exception exception) {
this.errorNext = errorNext;
+ this.errorInCallback = errorInCallback;
this.exception = exception;
}
@@ -82,27 +87,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
}
public Thread startDelayedSendThread(final int sleepTime) {
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- FutureTask[] callbackArray = new FutureTask[_callbacksList.size()];
- AtomicReferenceArray<FutureTask> _bufferList = new AtomicReferenceArray<FutureTask>(_callbacksList.toArray(callbackArray));
- ExecutorService executor = Executors.newFixedThreadPool(10);
- try {
- for(int i = 0; i < _bufferList.length(); i++) {
- Thread.sleep(sleepTime);
- FutureTask f = _bufferList.get(i);
- if(!f.isDone()) {
- executor.submit(f).get();
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException ee) {
- ee.printStackTrace();
- }
- }
- });
+ Thread t = new Thread(new FlushRunnable(sleepTime));
t.start();
return t;
}
@@ -118,7 +103,14 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
@Override
public Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+ if (closed) {
+ throw new ProducerClosedException();
+ }
if (errorNext) {
+ if (!errorInCallback) {
+ this.errorNext = false;
+ throw (RuntimeException)exception;
+ }
if (shouldBuffer) {
FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() {
@Override
@@ -171,16 +163,31 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
@Override
public void close() {
-
+ close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
@Override
public void close(long timeout, TimeUnit timeUnit) {
+ closed = true;
+ // The real producer will flush messages as part of closing. We'll invoke flush here to approximate that behavior.
+ new FlushRunnable(0).run();
+ }
+ public void open() {
+ this.closed = false;
+ openCount++;
}
- public synchronized void flush () {
+ public boolean isClosed() {
+ return closed;
+ }
+ public int getOpenCount() {
+ return openCount;
+ }
+
+ public synchronized void flush () {
+ new FlushRunnable(0).run();
}
@@ -253,4 +260,34 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
return true;
}
}
+
+ private class FlushRunnable implements Runnable {
+ private final int _sleepTime;
+
+ public FlushRunnable(int sleepTime) {
+ _sleepTime = sleepTime;
+ }
+
+ public void run() {
+ FutureTask[] callbackArray = new FutureTask[_callbacksList.size()];
+ AtomicReferenceArray<FutureTask> _bufferList =
+ new AtomicReferenceArray<FutureTask>(_callbacksList.toArray(callbackArray));
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+ try {
+ for (int i = 0; i < _bufferList.length(); i++) {
+ Thread.sleep(_sleepTime);
+ FutureTask f = _bufferList.get(i);
+ if (!f.isDone()) {
+ executor.submit(f).get();
+ }
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException ee) {
+ ee.printStackTrace();
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
index 224ca2f..7fc450d 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
@@ -48,7 +48,7 @@ public class TestKafkaSystemProducerJava {
public Object apply() {
return System.currentTimeMillis();
}
- });
+ }, false);
long now = System.currentTimeMillis();
assertTrue((Long)ksp.clock().apply() >= now);
http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index d4b8150..9d1e99b 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -31,7 +31,7 @@ import org.junit.Before
class TestKafkaConfig {
var props : Properties = new Properties
- val SYSTEM_NAME = "kafka";
+ val SYSTEM_NAME = "kafka"
val KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer."
val TEST_CLIENT_ID = "TestClientId"
val TEST_GROUP_ID = "TestGroupId"
@@ -147,9 +147,9 @@ class TestKafkaConfig {
@Test
def testMaxInFlightRequestsPerConnectionOverride() {
- val expectedValue = "200";
+ val expectedValue = "200"
- props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, expectedValue);
+ props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, expectedValue)
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
@@ -161,9 +161,9 @@ class TestKafkaConfig {
@Test
def testRetriesOverride() {
- val expectedValue = "200";
+ val expectedValue = "200"
- props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, expectedValue);
+ props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, expectedValue)
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
@@ -175,7 +175,7 @@ class TestKafkaConfig {
@Test(expected = classOf[NumberFormatException])
def testMaxInFlightRequestsPerConnectionWrongNumberFormat() {
- props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "Samza");
+ props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "Samza")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
@@ -185,7 +185,17 @@ class TestKafkaConfig {
@Test(expected = classOf[NumberFormatException])
def testRetriesWrongNumberFormat() {
- props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, "Samza");
+ props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, "Samza")
+
+ val mapConfig = new MapConfig(props.asScala.asJava)
+ val kafkaConfig = new KafkaConfig(mapConfig)
+ val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
+ kafkaProducerConfig.getProducerProperties
+ }
+
+ @Test(expected = classOf[NumberFormatException])
+ def testLingerWrongNumberFormat() {
+ props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.LINGER_MS_CONFIG, "Samza")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
index 7331611..9117be5 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
@@ -19,22 +19,18 @@
package org.apache.samza.system.kafka
-import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream}
-import org.junit.Test
-
import org.apache.kafka.clients.producer._
-import java.util
+import org.apache.kafka.common.errors.{RecordTooLargeException, SerializationException, TimeoutException}
+import org.apache.kafka.test.MockSerializer
+import org.apache.samza.system.{OutgoingMessageEnvelope, SystemProducerException, SystemStream}
import org.junit.Assert._
+import org.junit.Test
import org.scalatest.Assertions.intercept
-import org.apache.kafka.common.errors.{TimeoutException, RecordTooLargeException}
-import org.apache.kafka.test.MockSerializer
-import org.apache.samza.SamzaException
class TestKafkaSystemProducer {
-
- val someMessage = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "test".getBytes)
- val StreamNameNullOrEmptyErrorMsg = "Stream Name should be specified in the stream configuration file.";
+ val systemStream = new SystemStream("testSystem", "testStream")
+ val someMessage = new OutgoingMessageEnvelope(systemStream, "test".getBytes)
@Test
def testKafkaProducer {
@@ -64,9 +60,9 @@ class TestKafkaSystemProducer {
@Test
def testKafkaProducerBufferedSend {
- val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
- val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
- val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
val mockProducer = new MockKafkaProducer(1, "test", 1)
val producerMetrics = new KafkaSystemProducerMetrics
@@ -91,9 +87,9 @@ class TestKafkaSystemProducer {
@Test
def testKafkaProducerFlushSuccessful {
- val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
- val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
- val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
val mockProducer = new MockKafkaProducer(1, "test", 1)
val systemProducer = new KafkaSystemProducer(systemName = "test",
@@ -115,10 +111,10 @@ class TestKafkaSystemProducer {
@Test
def testKafkaProducerFlushWithException {
- val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
- val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
- val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
- val msg4 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "d".getBytes)
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
val mockProducer = new MockKafkaProducer(1, "test", 1)
val systemProducer = new KafkaSystemProducer(systemName = "test",
@@ -130,27 +126,27 @@ class TestKafkaSystemProducer {
mockProducer.setShouldBuffer(true)
systemProducer.send("test", msg2)
- mockProducer.setErrorNext(true, new RecordTooLargeException())
+ mockProducer.setErrorNext(true, true, new RecordTooLargeException())
systemProducer.send("test", msg3)
systemProducer.send("test", msg4)
assertEquals(1, mockProducer.getMsgsSent)
mockProducer.startDelayedSendThread(2000)
- val thrown = intercept[SamzaException] {
+ val thrown = intercept[SystemProducerException] {
systemProducer.flush("test")
}
- assertTrue(thrown.isInstanceOf[SamzaException])
+ assertTrue(thrown.isInstanceOf[SystemProducerException])
assertEquals(3, mockProducer.getMsgsSent) // msg1, msg2 and msg4 will be sent
systemProducer.stop()
}
@Test
def testKafkaProducerWithRetriableException {
- val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
- val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
- val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
- val msg4 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "d".getBytes)
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
val mockProducer = new MockKafkaProducer(1, "test", 1)
val producerMetrics = new KafkaSystemProducerMetrics()
@@ -165,52 +161,383 @@ class TestKafkaSystemProducer {
producer.send("test", msg3)
producer.flush("test")
- mockProducer.setErrorNext(true, new TimeoutException())
+ mockProducer.setErrorNext(true, true, new TimeoutException())
producer.send("test", msg4)
- val thrown = intercept[SamzaException] {
+ val thrown = intercept[SystemProducerException] {
producer.flush("test")
}
- assertTrue(thrown.isInstanceOf[SamzaException])
- assertTrue(thrown.getCause.isInstanceOf[TimeoutException])
+ assertTrue(thrown.isInstanceOf[SystemProducerException])
+ assertTrue(thrown.getCause.getCause.isInstanceOf[TimeoutException])
assertEquals(3, mockProducer.getMsgsSent)
producer.stop()
}
+ /**
+ * If there's an exception, we should:
+ * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order.
+ * 2. Record the original exception
+ * 3. Throw the exception every time a KafkaSystemProducer method is invoked until the container fails.
+ *
+ * Assumptions:
+ * 1. SystemProducer.flush() can happen concurrently with SystemProducer.send() for a particular TaskInstance (task.async.commit)
+ * 2. SystemProducer.flush() cannot happen concurrently with itself for a particular task instance
+ * 3. Any exception thrown from SystemProducer.flush() will prevent the checkpointing and fail the container
+ * 4. A single KafkaProducer is shared by all the tasks so any failure from one task can affect the others.
+ *
+ * Conclusions:
+ * It is only safe to handle the async exceptions from by closing the producer and failing the container.
+ * This prevents race conditons with setting/clearing exceptions and recreating the producer that could cause data
+ * loss by checkpointing a failed offset.
+ *
+ * Inaccuracies:
+ * A real kafka producer succeeds or fails all the messages in a batch. In other words, the messages of a batch all
+ * fail or they all succeed together. This test, however, fails individual callbacks in order to test boundary
+ * conditions where the batches align perfectly around the failed send().
+ */
@Test
- def testKafkaProducerWithNonRetriableExceptions {
- val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
- val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
- val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
- val msg4 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "d".getBytes)
+ def testKafkaProducerWithFatalExceptions {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+ val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
val producerMetrics = new KafkaSystemProducerMetrics()
val mockProducer = new MockKafkaProducer(1, "test", 1)
val producer = new KafkaSystemProducer(systemName = "test",
- getProducer = () => mockProducer,
+ getProducer = () => {
+ mockProducer.open() // A new producer is never closed
+ mockProducer
+ },
metrics = producerMetrics)
producer.register("test")
producer.start()
+
producer.send("test", msg1)
producer.send("test", msg2)
- producer.send("test", msg3)
- mockProducer.setErrorNext(true, new RecordTooLargeException())
+ mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+ producer.send("test", msg3) // Callback exception
+ assertTrue(mockProducer.isClosed)
+ assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
- producer.send("test", msg4)
- val thrown = intercept[SamzaException] {
- producer.flush("test")
+ val senderException = intercept[SystemProducerException] {
+ producer.send("test", msg4) // Should fail because the producer is closed.
}
- assertTrue(thrown.isInstanceOf[SamzaException])
- assertTrue(thrown.getCause.isInstanceOf[RecordTooLargeException])
- assertEquals(3, mockProducer.getMsgsSent)
+ assertTrue(senderException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+ val callbackException = intercept[SystemProducerException] {
+ producer.flush("test") // Should throw the callback exception
+ }
+ assertTrue(callbackException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+ val postFlushException = intercept[SystemProducerException] {
+ producer.send("test", msg5) // Should not be able to send again after flush
+ }
+ assertTrue(postFlushException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+ val callbackException2 = intercept[SystemProducerException] {
+ producer.flush("test") // Should rethrow the exception
+ }
+ assertTrue(callbackException2.getCause.getCause.isInstanceOf[RecordTooLargeException])
+ assertEquals(2, mockProducer.getMsgsSent) // only the messages before the error get sent
+ producer.stop()
+ }
+
+ /**
+ * Recapping from [[testKafkaProducerWithFatalExceptions]]:
+ *
+ * If there's an exception, we should:
+ * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order.
+ * 2. Record the original exception
+ * 3. Throw the exception every time a KafkaSystemProducer method is invoked until the container fails.
+ *
+ * This test focuses on point 3. Particularly it ensures that the failures are handled properly across multiple sources
+ * which share the same producer.
+ */
+ @Test
+ def testKafkaProducerWithFatalExceptionsMultipleSources {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+ val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+ val msg6 = new OutgoingMessageEnvelope(systemStream, "f".getBytes)
+ val msg7 = new OutgoingMessageEnvelope(systemStream, "g".getBytes)
+ val producerMetrics = new KafkaSystemProducerMetrics()
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val producer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => {
+ mockProducer.open() // A new producer is never closed
+ mockProducer
+ },
+ metrics = producerMetrics)
+ producer.register("test1")
+ producer.register("test2")
+
+ producer.start()
+
+ // Initial sends
+ producer.send("test1", msg1)
+ producer.send("test2", msg2)
+
+ // Inject error for next send
+ mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+ producer.send("test1", msg3) // Callback exception
+ assertTrue(mockProducer.isClosed)
+ assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
+
+ // Subsequent sends
+ val senderException = intercept[SystemProducerException] {
+ producer.send("test1", msg4) // Should fail because the producer is closed.
+ }
+ assertTrue(senderException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+ val callbackException = intercept[SystemProducerException] {
+ producer.send("test2", msg4) // First send from separate source gets a producer closed exception
+ }
+ assertTrue(callbackException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+ val callbackException2 = intercept[SystemProducerException] {
+ producer.send("test2", msg5) // Second send should still get the error
+ }
+ assertTrue(callbackException2.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+ // Flushes
+ val callbackException3 = intercept[SystemProducerException] {
+ producer.flush("test2") // Should rethrow the closed exception in flush
+ }
+ assertTrue(callbackException3.isInstanceOf[SystemProducerException])
+ assertTrue(callbackException3.getCause.getCause.isInstanceOf[RecordTooLargeException])
+ intercept[SystemProducerException] {
+ producer.send("test2", msg6) // Should still not be able to send after flush
+ }
+
+ val thrown3 = intercept[SystemProducerException] {
+ producer.flush("test1") // Should throw the callback exception
+ }
+ assertTrue(thrown3.isInstanceOf[SystemProducerException])
+ assertTrue(thrown3.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+ intercept[SystemProducerException] {
+ producer.send("test1", msg7) // Should still not be able to send after flush
+ }
+
+ intercept[SystemProducerException] {
+ producer.flush("test1") // Should throw the callback exception
+ }
+ assertEquals(2, mockProducer.getMsgsSent)
+ producer.stop()
+ }
+
+ @Test
+ def testKafkaProducerWithNonFatalExceptionsMultipleSources {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+ val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+ val producerMetrics = new KafkaSystemProducerMetrics()
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val producer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => {
+ mockProducer.open() // A new producer is never closed
+ mockProducer
+ },
+ metrics = producerMetrics)
+ producer.register("test1")
+ producer.register("test2")
+ producer.start()
+
+ producer.send("test1", msg1)
+ producer.send("test2", msg2)
+ mockProducer.setErrorNext(true, false, new SerializationException())
+ val sendException = intercept[SystemProducerException] {
+ producer.send("test1", msg3) // User-thread exception
+ }
+ assertTrue(sendException.getCause.isInstanceOf[SerializationException])
+ assertFalse(mockProducer.isClosed)
+ assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
+
+ producer.send("test1", msg3) // Should be able to resend msg3
+ producer.send("test2", msg4) // Second source should not be affected
+
+ producer.flush("test1") // Flush should be unaffected
+
+ producer.send("test1", msg5) // Should be able to send again after flush
+
+ assertEquals(5, mockProducer.getMsgsSent) // only the messages before the error get sent
+ producer.stop()
+ }
+
+ /**
+ * If there's an exception and the user configured task.drop.producer.errors, we should:
+ * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order.
+ * 2. Recreate the producer.
+ * 3. Ignore any messages that were dropped (user knows they're signing up for this if they enable the option)
+ *
+ * Assumptions:
+ * 1. SystemProducer.flush() can happen concurrently with SystemProducer.send() for a particular TaskInstance (task.async.commit)
+ * 2. SystemProducer.flush() cannot happen concurrently with itself for a particular task instance
+ * 3. Any exception thrown from SystemProducer.flush() will prevent the checkpointing and fail the container
+ * 4. A single KafkaProducer is shared by all the tasks so any failure from one task can affect the others.
+ *
+ * Conclusions:
+ * If the user is ok with dropping messages for the sake of availability, we will swallow all exceptions and
+ * recreate the producer to recover. There are no guarantees how many messages are lost, but the send-failed metric
+ * should be accurate, so users should alert on that.
+ *
+ * Inaccuracies:
+ * A real kafka producer succeeds or fails all the messages in a batch. In other words, the messages of a batch all
+ * fail or they all succeed together. This test, however, fails individual callbacks in order to test boundary
+ * conditions where the batches align perfectly around the failed send().
+ */
+ @Test
+ def testKafkaProducerWithFatalExceptionsDroppingExceptions {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+ val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+ val producerMetrics = new KafkaSystemProducerMetrics()
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val producer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => {
+ mockProducer.open() // A new producer is never closed
+ mockProducer
+ },
+ metrics = producerMetrics,
+ dropProducerExceptions = true) // Here's where we enable exception dropping.
+ producer.register("test")
+ producer.start()
+
+ producer.send("test", msg1)
+ producer.send("test", msg2)
+ mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+ producer.send("test", msg3) // Callback exception
+ assertFalse(mockProducer.isClosed)
+ assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount)
+
+ producer.send("test", msg4) // Should succeed because the producer recovered.
+ producer.flush("test") // Should not throw
+
+ producer.send("test", msg5) // Should be able to send again after flush
+ producer.flush("test")
+
+ assertEquals(4, mockProducer.getMsgsSent) // every message except the one with the error should get sent
+ producer.stop()
+ }
+
+ /**
+ * Recapping from [[testKafkaProducerWithFatalExceptionsDroppingExceptions]]:
+ *
+ * If there's an exception, we should:
+ * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order.
+ * 2. Recreate the producer.
+ * 3. Ignore any messages that were dropped (user knows they're signing up for this if they enable the option)
+ *
+ * This test ensures that the failures are handled properly across multiple sources
+ * which share the same producer.
+ */
+ @Test
+ def testKafkaProducerWithFatalExceptionsMultipleSourcesDroppingExceptions {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+ val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+ val msg6 = new OutgoingMessageEnvelope(systemStream, "f".getBytes)
+ val msg7 = new OutgoingMessageEnvelope(systemStream, "g".getBytes)
+ val producerMetrics = new KafkaSystemProducerMetrics()
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val producer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => {
+ mockProducer.open() // A new producer is never closed
+ mockProducer
+ },
+ metrics = producerMetrics,
+ dropProducerExceptions = true) // Here's where we enable exception dropping.
+ producer.register("test1")
+ producer.register("test2")
+
+ producer.start()
+
+ // Initial sends
+ producer.send("test1", msg1)
+ producer.send("test2", msg2)
+
+ // Inject error for next send
+ mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+ producer.send("test1", msg3) // Callback exception
+ assertFalse(mockProducer.isClosed)
+ assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount)
+
+ // Subsequent sends
+ producer.send("test1", msg4) // Should succeed because the producer recovered.
+ producer.send("test2", msg5) // Second source should also not have any error.
+
+ // Flushes
+ producer.flush("test2") // Should not throw for test2
+ producer.send("test2", msg6) // Should still work after flush
+
+ producer.flush("test1") // Should not throw for test1 either
+ producer.send("test1", msg7)
+
+ assertEquals(6, mockProducer.getMsgsSent) // every message except the one with the error should get sent
+ producer.stop()
+ }
+
+ @Test
+ def testKafkaProducerWithNonFatalExceptionsMultipleSourcesDroppingExceptions {
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+ val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+ val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+ val producerMetrics = new KafkaSystemProducerMetrics()
+
+ val mockProducer = new MockKafkaProducer(1, "test", 1)
+ val producer = new KafkaSystemProducer(systemName = "test",
+ getProducer = () => {
+ mockProducer.open() // A new producer is never closed
+ mockProducer
+ },
+ metrics = producerMetrics,
+ dropProducerExceptions = true) // Here's where we enable exception dropping.
+ producer.register("test1")
+ producer.register("test2")
+ producer.start()
+
+ producer.send("test1", msg1)
+ producer.send("test2", msg2)
+ mockProducer.setErrorNext(true, false, new SerializationException())
+ val sendException = intercept[SystemProducerException] {
+ producer.send("test1", msg3) // User-thread exception
+ }
+ assertTrue(sendException.getCause.isInstanceOf[SerializationException])
+ assertFalse(mockProducer.isClosed)
+ assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
+
+ producer.send("test1", msg3) // Should be able to resend msg3
+ producer.send("test2", msg4) // Second source should not be affected
+
+ producer.flush("test1") // Flush should be unaffected
+
+ producer.send("test1", msg5) // Should be able to send again after flush
+
+ assertEquals(5, mockProducer.getMsgsSent) // only the messages before the error get sent
producer.stop()
}
@Test
def testKafkaProducerFlushMsgsWhenStop {
- val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
- val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
- val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
+ val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+ val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+ val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
val msg4 = new OutgoingMessageEnvelope(new SystemStream("test2", "test"), "d".getBytes)
val mockProducer = new MockKafkaProducer(1, "test", 1)
@@ -253,7 +580,7 @@ class TestKafkaSystemProducer {
producer.send("testSrc2", omeStreamNameEmpty)
assertEquals(0, mockProducer.getMsgsSent)
}
- assertTrue(thrownNull.getMessage() == StreamNameNullOrEmptyErrorMsg)
- assertTrue(thrownEmpty.getMessage() == StreamNameNullOrEmptyErrorMsg)
+ assertTrue(thrownNull.getMessage() == "Invalid system stream: " + omeStreamNameNull.getSystemStream)
+ assertTrue(thrownEmpty.getMessage() == "Invalid system stream: " + omeStreamNameEmpty.getSystemStream)
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
index 8276c18..f674685 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
@@ -28,7 +28,8 @@ import org.mockito.Mockito._
class TestKeyValueStorageEngine {
var engine: KeyValueStorageEngine[String, String] = null
- var metrics: KeyValueStorageEngineMetrics = null;
+ var metrics: KeyValueStorageEngineMetrics = null
+ var now: Long = 0L
@Before
def setup() {
@@ -36,7 +37,7 @@ class TestKeyValueStorageEngine {
val rawKv = mock(classOf[KeyValueStore[Array[Byte], Array[Byte]]])
val properties = mock(classOf[StoreProperties])
metrics = new KeyValueStorageEngineMetrics
- engine = new KeyValueStorageEngine[String, String](properties, wrapperKv, rawKv, metrics)
+ engine = new KeyValueStorageEngine[String, String](properties, wrapperKv, rawKv, metrics, clock = () => { getNextTimestamp() })
}
@After
@@ -48,8 +49,8 @@ class TestKeyValueStorageEngine {
def testGetAndPut(): Unit = {
var prevGets = metrics.gets.getCount
var prevGetNsSnapshotSize = metrics.getNs.getSnapshot.getSize
- var valueForK1 = engine.get("k1");
- assertNull("k1 is not existing before put", valueForK1);
+ var valueForK1 = engine.get("k1")
+ assertNull("k1 is not existing before put", valueForK1)
assertEquals("get counter increments by 1", 1, metrics.gets.getCount - prevGets)
assertEquals("get timer has 1 additional data point" , 1, metrics.getNs.getSnapshot.getSize - prevGetNsSnapshotSize)
@@ -127,4 +128,9 @@ class TestKeyValueStorageEngine {
}
assertFalse("no next after iterating 2 keys in the range", iter.hasNext)
}
+
+ def getNextTimestamp(): Long = {
+ now += 1
+ now
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 59e9a89..a007c77 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -132,6 +132,7 @@ object StreamTaskTestUtil {
config.put("serializer.class", "kafka.serializer.StringEncoder")
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(Integer.MAX_VALUE-1)).toString())
+ config.put(ProducerConfig.LINGER_MS_CONFIG, "0")
val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 734487b..3ef08e5 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -97,7 +97,7 @@ class TestStatefulTask extends StreamTaskTestUtil {
TestStatefulTask.MESSAGES_SEND_1.foreach(m => send(task, m))
// Validate that messages appear in store stream.
- val messages = readAll(TestStatefulTask.STATE_TOPIC_STREAM, 5, "testShouldStartTaskForFirstTime")
+ val messages = readAll(TestStatefulTask.STATE_TOPIC_STREAM, TestStatefulTask.MESSAGES_RECV_1.length-1, "testShouldStartTaskForFirstTime")
assertEquals(TestStatefulTask.MESSAGES_RECV_1, messages)