You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/09/20 19:40:59 UTC

samza git commit: SAMZA-1392: KafkaSystemProducer performance and correctness with conc…

Repository: samza
Updated Branches:
  refs/heads/master 89f8ca4ec -> 3773ec699


SAMZA-1392: KafkaSystemProducer performance and correctness with conc…

…urrent sends and flushes

Author: Jacob Maes <jm...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>, Yi Pan (Data Infrastructure) <ni...@gmail.com>

Closes #272 from jmakes/samza-1392


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3773ec69
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3773ec69
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3773ec69

Branch: refs/heads/master
Commit: 3773ec699ef07fa421edbb72d3fd83d7b2273144
Parents: 89f8ca4
Author: Jacob Maes <jm...@linkedin.com>
Authored: Wed Sep 20 12:40:47 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Wed Sep 20 12:40:47 2017 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |  18 +
 .../samza/system/SystemProducerException.java   |  38 ++
 .../org/apache/samza/config/TaskConfig.scala    |   3 +
 .../samza/container/TestTaskInstance.scala      |  35 ++
 .../org/apache/samza/config/KafkaConfig.scala   |  34 +-
 .../samza/system/kafka/KafkaSystemFactory.scala |   5 +-
 .../system/kafka/KafkaSystemProducer.scala      | 256 ++++++-----
 .../samza/system/kafka/MockKafkaProducer.java   |  87 ++--
 .../kafka/TestKafkaSystemProducerJava.java      |   2 +-
 .../apache/samza/config/TestKafkaConfig.scala   |  24 +-
 .../system/kafka/TestKafkaSystemProducer.scala  | 427 ++++++++++++++++---
 .../storage/kv/TestKeyValueStorageEngine.scala  |  14 +-
 .../test/integration/StreamTaskTestUtil.scala   |   1 +
 .../test/integration/TestStatefulTask.scala     |   2 +-
 14 files changed, 705 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 9b4e279..d736d6b 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -675,6 +675,24 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="task-drop-producer-errors">task.drop.producer.errors</td>
+                    <td class="default">false</td>
+                    <td class="description">
+                        If true, producer errors will be logged and ignored. The only exceptions that will be thrown are
+                        those which are likely caused by the application itself (e.g. serializaiton errors). If false,
+                        the producer will be closed and producer errors will be propagated upward until the container
+                        ultimately fails. Failing the container is a safety precaution to ensure the latest checkpoints
+                        only reflect the events that  have been completely and successfully processed. However, some
+                        applications prefer to remain running at all costs, even if that means lost messages. Setting
+                        this property to true will enable applications to recover from producer errors at the expense of
+                        one or many (in the case of batching producers) dropped messages. If you enable this, it is highly
+                        recommended that you also configure alerting on the 'producer-send-failed' metric, since the
+                        producer might drop messages indefinitely. The logic for this property is specific to each
+                        SystemProducer implementation. It will have no effect for SystemProducers that ignore the property.
+                    </td>
+                </tr>
+
+                <tr>
                     <td class="property" id="task-log4j-system">task.log4j.system</td>
                     <td class="default"></td>
                     <td class="description">

http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-api/src/main/java/org/apache/samza/system/SystemProducerException.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemProducerException.java b/samza-api/src/main/java/org/apache/samza/system/SystemProducerException.java
new file mode 100644
index 0000000..6e663b1
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemProducerException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import org.apache.samza.SamzaException;
+
+
+public class SystemProducerException extends SamzaException {
+  private static final long serialVersionUID = 1L;
+
+  public SystemProducerException(String s) {
+    super(s);
+  }
+
+  public SystemProducerException(Throwable t) {
+    super(t);
+  }
+
+  public SystemProducerException(String s, Throwable t) {
+    super(s, t);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index e49d74b..2ee0032 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -36,6 +36,7 @@ object TaskConfig {
   val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class"
   val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails
   val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
+  val DROP_PRODUCER_ERROR = "task.drop.producer.errors" // whether to ignore producer errors and drop the messages that failed to send
   val IGNORED_EXCEPTIONS = "task.ignored.exceptions" // exceptions to ignore in process and window
   val GROUPER_FACTORY = "task.name.grouper.factory" // class name for task grouper
   val MAX_CONCURRENCY = "task.max.concurrency" // max number of concurrent process for a AsyncStreamTask
@@ -107,6 +108,8 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   def getDropSerialization = getOption(TaskConfig.DROP_SERIALIZATION_ERROR)
 
+  def getDropProducerError = getBoolean(TaskConfig.DROP_PRODUCER_ERROR, false)
+
   def getPollIntervalMs = getOption(TaskConfig.POLL_INTERVAL_MS)
 
   def getIgnoredExceptions = getOption(TaskConfig.IGNORED_EXCEPTIONS)

http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 4958a57..d9eb7ef 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -391,6 +391,41 @@ class TestTaskInstance {
     // Finally, checkpoint the inputs with the snapshotted checkpoint captured at the beginning of commit
     mockOrder.verify(offsetManager).writeCheckpoint(taskName, checkpoint)
   }
+
+  @Test(expected = classOf[SystemProducerException])
+  def testProducerExceptionsIsPropagated {
+    // Simple objects
+    val partition = new Partition(0)
+    val taskName = new TaskName("taskName")
+    val systemStream = new SystemStream("test-system", "test-stream")
+    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+
+    // Mocks
+    val collector = Mockito.mock(classOf[TaskInstanceCollector])
+    when(collector.flush).thenThrow(new SystemProducerException("Test"))
+    val storageManager = Mockito.mock(classOf[TaskStorageManager])
+    val offsetManager = Mockito.mock(classOf[OffsetManager])
+
+    val taskInstance: TaskInstance = new TaskInstance(
+      Mockito.mock(classOf[StreamTask]).asInstanceOf[StreamTask],
+      taskName,
+      new MapConfig,
+      new TaskInstanceMetrics,
+      null,
+      Mockito.mock(classOf[SystemConsumers]),
+      collector,
+      Mockito.mock(classOf[SamzaContainerContext]),
+      offsetManager,
+      storageManager,
+      systemStreamPartitions = Set(systemStreamPartition))
+
+    try {
+      taskInstance.commit // Should not swallow the SystemProducerException
+    } finally {
+      Mockito.verify(offsetManager, times(0)).writeCheckpoint(any(classOf[TaskName]), any(classOf[Checkpoint]))
+    }
+  }
+
 }
 
 class MockSystemAdmin extends SystemAdmin {

http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 9ac21ef..79fd4f3 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -20,24 +20,19 @@
 package org.apache.samza.config
 
 
+import java.util
 import java.util.regex.Pattern
-
-import org.apache.samza.util.Util
-import org.apache.samza.util.Logging
-
-import scala.collection.JavaConverters._
-import kafka.consumer.ConsumerConfig
 import java.util.{Properties, UUID}
 
+import kafka.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.samza.SamzaException
-import java.util
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.system.kafka.KafkaSystemFactory
+import org.apache.samza.util.{Logging, Util}
 
 import scala.collection.JavaConverters._
-import org.apache.samza.system.kafka.KafkaSystemFactory
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.kafka.common.serialization.ByteArraySerializer
 
 object KafkaConfig {
   val TOPIC_REPLICATION_FACTOR = "replication.factor"
@@ -296,8 +291,10 @@ class KafkaProducerConfig(val systemName: String,
   //Overrides specific to samza-kafka (these are considered as defaults in Samza & can be overridden by user
   val MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT: java.lang.Integer = 1.asInstanceOf[Integer]
   val RETRIES_DEFAULT: java.lang.Integer = Integer.MAX_VALUE
+  val LINGER_MS_DEFAULT: java.lang.Integer = 10
 
   def getProducerProperties = {
+
     val byteArraySerializerClassName = classOf[ByteArraySerializer].getCanonicalName
     val producerProperties: java.util.Map[String, Object] = new util.HashMap[String, Object]()
     producerProperties.putAll(properties)
@@ -320,14 +317,17 @@ class KafkaProducerConfig(val systemName: String,
       producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT)
     }
 
-    if (producerProperties.containsKey(ProducerConfig.RETRIES_CONFIG)
-      && producerProperties.get(ProducerConfig.RETRIES_CONFIG).asInstanceOf[String].toInt < RETRIES_DEFAULT) {
-      warn("Samza does not provide producer failure handling. Consider setting '%s' to a large value, like Int.MAX." format ProducerConfig.RETRIES_CONFIG)
-    } else {
-      // Retries config is set to Max so that when all attempts fail, Samza also fails the send. We do not have any special handler
-      // for producer failure
+    if (!producerProperties.containsKey(ProducerConfig.RETRIES_CONFIG)) {
+      debug("%s undefined. Defaulting to %s." format(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT))
       producerProperties.put(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT)
     }
+    producerProperties.get(ProducerConfig.RETRIES_CONFIG).toString.toInt // Verify int
+
+    if (!producerProperties.containsKey(ProducerConfig.LINGER_MS_CONFIG)) {
+      debug("%s undefined. Defaulting to %s." format(ProducerConfig.LINGER_MS_CONFIG, LINGER_MS_DEFAULT))
+      producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, LINGER_MS_DEFAULT)
+    }
+    producerProperties.get(ProducerConfig.LINGER_MS_CONFIG).toString.toInt // Verify int
 
     producerProperties
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 638806b..e0b5540 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -26,7 +26,7 @@ import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, Clie
 import org.apache.samza.config.Config
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.config.StorageConfig._
@@ -97,7 +97,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
       systemName,
       new ExponentialSleepStrategy(initialDelayMs = producerConfig.reconnectIntervalMs),
       getProducer,
-      metrics)
+      metrics,
+      dropProducerExceptions = config.getDropProducerError)
   }
 
   def getAdmin(systemName: String, config: Config): SystemAdmin = {

http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index b680ed4..5e83666 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -21,100 +21,61 @@ package org.apache.samza.system.kafka
 
 
 import java.util.concurrent.atomic.AtomicReference
-import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Future}
+import java.util.concurrent.TimeUnit
 
 import org.apache.kafka.clients.producer.Callback
 import org.apache.kafka.clients.producer.Producer
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.kafka.common.PartitionInfo
-import org.apache.samza.SamzaException
+import org.apache.kafka.common.errors.SerializationException
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemProducerException
 import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.KafkaUtil
 import org.apache.samza.util.Logging
 import org.apache.samza.util.TimerUtils
 
-import scala.collection.JavaConverters._
-
 class KafkaSystemProducer(systemName: String,
                           retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
                           getProducer: () => Producer[Array[Byte], Array[Byte]],
                           metrics: KafkaSystemProducerMetrics,
-                          val clock: () => Long = () => System.nanoTime) extends SystemProducer with Logging with TimerUtils
-{
-
-  class SourceData {
-    /**
-     * lock to make send() and store its future atomic
-     */
-    val sendLock: Object = new Object
-    /**
-     * The most recent send's Future handle
-     */
-    @volatile
-    var latestFuture: Future[RecordMetadata] = null
-    /**
-     * exceptionInCallback: to store the exception in case of any "ultimate" send failure (ie. failure
-     * after exhausting max_retries in Kafka producer) in the I/O thread, we do not continue to queue up more send
-     * requests from the samza thread. It helps the samza thread identify if the failure happened in I/O thread or not.
-     *
-     * In cases of multiple exceptions in the callbacks, we keep the first one before throwing.
-     */
-    var exceptionInCallback: AtomicReference[SamzaException] = new AtomicReference[SamzaException]()
-  }
+                          val clock: () => Long = () => System.nanoTime,
+                          val dropProducerExceptions: Boolean = false) extends SystemProducer with Logging with TimerUtils {
 
+  // Represents a fatal error that caused the producer to close.
+  val fatalException: AtomicReference[SystemProducerException] = new AtomicReference[SystemProducerException]()
   @volatile var producer: Producer[Array[Byte], Array[Byte]] = null
-  var producerLock: Object = new Object
-  val StreamNameNullOrEmptyErrorMsg = "Stream Name should be specified in the stream configuration file."
-  val sources: ConcurrentHashMap[String, SourceData] = new ConcurrentHashMap[String, SourceData]
+  val producerLock: Object = new Object
 
   def start(): Unit = {
+    producer = getProducer()
   }
 
   def stop() {
-    try {
-      val currentProducer = producer
-      if (currentProducer != null) {
-        producerLock.synchronized {
-          if (currentProducer == producer) {
-            // only nullify the member producer if it is still the same object, no point nullifying new producer
-            producer = null
-          }
+    info("Stopping producer for system: " + this.systemName)
+
+    // stop() should not happen often so no need to optimize locking
+    producerLock.synchronized {
+      try {
+        if (producer != null) {
+          producer.close  // Also performs the equivalent of a flush()
         }
-        currentProducer.close
 
-        sources.asScala.foreach {p =>
-          if (p._2.exceptionInCallback.get() == null) {
-            flush(p._1)
-          }
+        val exception = fatalException.get()
+        if (exception != null) {
+          error("Observed an earlier send() error while closing producer", exception)
         }
+      } catch {
+        case e: Exception => error("Error while closing producer for system: " + systemName, e)
+      } finally {
+        producer = null
       }
-    } catch {
-      case e: Exception => error(e.getMessage, e)
     }
   }
 
   def register(source: String) {
-    if(sources.putIfAbsent(source, new SourceData) != null) {
-      throw new SamzaException("%s is already registered with the %s system producer" format (source, systemName))
-    }
-  }
-
-  def closeAndNullifyCurrentProducer(currentProducer: Producer[Array[Byte], Array[Byte]]) {
-    try {
-      // TODO: we should use timeout close() to make sure we fail all waiting messages in kafka 0.9+
-      currentProducer.close()
-    } catch {
-      case e: Exception => error("producer close failed", e)
-    }
-    producerLock.synchronized {
-      if (currentProducer == producer) {
-        // only nullify the member producer if it is still the same object, no point nullifying new producer
-        producer = null
-      }
-    }
   }
 
   def send(source: String, envelope: OutgoingMessageEnvelope) {
@@ -122,33 +83,18 @@ class KafkaSystemProducer(systemName: String,
 
     val topicName = envelope.getSystemStream.getStream
     if (topicName == null || topicName == "") {
-      throw new IllegalArgumentException(StreamNameNullOrEmptyErrorMsg)
-    }
-
-    val sourceData = sources.get(source)
-    if (sourceData == null) {
-      throw new IllegalArgumentException("Source %s must be registered first before send." format source)
+      throw new IllegalArgumentException("Invalid system stream: " + envelope.getSystemStream)
     }
 
-    val exception = sourceData.exceptionInCallback.getAndSet(null)
-    if (exception != null) {
+    val globalProducerException = fatalException.get()
+    if (globalProducerException != null) {
       metrics.sendFailed.inc
-      throw exception  // in case the caller catches all exceptions and will try again
-    }
-
-    // lazy initialization of the producer
-    if (producer == null) {
-      producerLock.synchronized {
-        if (producer == null) {
-          info("Creating a new producer for system %s." format systemName)
-          producer = getProducer()
-        }
-      }
+      throw new SystemProducerException("Producer was unable to recover from previous exception.", globalProducerException)
     }
 
     val currentProducer = producer
     if (currentProducer == null) {
-      throw new SamzaException("Kafka system producer is not available.")
+      throw new SystemProducerException("Kafka producer is null.")
     }
 
     // Java-based Kafka producer API requires an "Integer" type partitionKey and does not allow custom overriding of Partitioners
@@ -161,72 +107,114 @@ class KafkaSystemProducer(systemName: String,
                                     envelope.getMessage.asInstanceOf[Array[Byte]])
 
     try {
-      sourceData.sendLock.synchronized {
-        val futureRef: Future[RecordMetadata] =
-          currentProducer.send(record, new Callback {
-            def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
-              if (exception == null) {
-                //send was successful.
-                metrics.sendSuccess.inc
-              }
-              else {
-                error("Closing the producer because of an exception in callback: ", exception)
-                //If there is an exception in the callback, close producer.
-                closeAndNullifyCurrentProducer(currentProducer)
-
-                // we keep the exception and will throw the exception in the next producer.send()
-                // so the user can handle the exception and decide to fail or ignore
-                sourceData.exceptionInCallback.compareAndSet(
-                  null,
-                  new SamzaException("Unable to send message from %s to system %s." format(source, systemName),
-                    exception))
-
-                metrics.sendFailed.inc
-                error("Unable to send message on Topic:%s Partition:%s" format(topicName, partitionKey),
-                             exception)
-              }
-            }
-          })
-        sourceData.latestFuture = futureRef
-      }
+      currentProducer.send(record, new Callback {
+        def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
+          if (exception == null) {
+            metrics.sendSuccess.inc
+          } else {
+            val producerException = new SystemProducerException("Failed to send message for Source: %s on System:%s Topic:%s Partition:%s"
+              .format(source, systemName, topicName, partitionKey), exception)
+
+            handleSendException(currentProducer, producerException, true)
+          }
+        }
+      })
       metrics.sends.inc
     } catch {
-      case e: Exception => {
-        error("Closing the producer because of an exception in send: ", e)
-
-        closeAndNullifyCurrentProducer(currentProducer)
+      case originalException : Exception =>
+        val producerException = new SystemProducerException("Failed to send message for Source: %s on System:%s Topic:%s Partition:%s"
+          .format(source, systemName, topicName, partitionKey), originalException)
 
-        metrics.sendFailed.inc
-        throw new SamzaException(("Failed to send message on Topic:%s Partition:%s Exception:\n %s,")
-          .format(topicName, partitionKey, e))
-      }
+        handleSendException(currentProducer, producerException, isFatalException(originalException))
+        throw producerException
     }
   }
 
+
   def flush(source: String) {
     updateTimer(metrics.flushNs) {
       metrics.flushes.inc
 
-      val sourceData = sources.get(source)
-      //if latestFuture is null, it probably means that there has been no calls to "send" messages
-      //Hence, nothing to do in flush
-      if(sourceData.latestFuture != null) {
-        while(!sourceData.latestFuture.isDone && sourceData.exceptionInCallback.get() == null) {
-          try {
-            sourceData.latestFuture.get()
-          } catch {
-            case t: Throwable => error(t.getMessage, t)
+      val currentProducer = producer
+      if (currentProducer == null) {
+        throw new SystemProducerException("Kafka producer is null.")
+      }
+
+      // Flush only throws InterruptedException, all other errors are handled in send() callbacks
+      currentProducer.flush()
+
+      // Invariant: At this point either
+      // 1. The producer is fine and there are no exceptions to handle   OR
+      // 2. The producer is closed and one or more sources have exceptions to handle
+      //   2a. All new sends get a ProducerClosedException or IllegalStateException (depending on kafka version)
+      //   2b. There are no messages in flight because the producer is closed
+
+      // We must check for an exception AFTER flush() because when flush() returns all callbacks for messages sent
+      // in that flush() are guaranteed to have completed and we update the exception in the callback.
+      // If there is an exception, we rethrow it here to prevent the checkpoint.
+      val exception = fatalException.get()
+      if (exception != null) {
+        metrics.flushFailed.inc
+        throw new SystemProducerException("Flush failed. One or more batches of messages were not sent!", exception)
+      }
+      trace("Flushed %s." format source)
+    }
+  }
+
+
+  private def handleSendException(currentProducer: Producer[Array[Byte], Array[Byte]], producerException: SystemProducerException, isFatalException: Boolean) = {
+    metrics.sendFailed.inc
+    error(producerException)
+    // The SystemProducer API is synchronous, so there's no way for us to guarantee that an exception will
+    // be handled by the Task before we recreate the producer, and if it isn't handled, a concurrent send() from another
+    // Task could send on the new producer before the first Task properly handled the exception and produce out of order messages.
+    // So we have to handle it right here in the SystemProducer.
+    if (dropProducerExceptions) {
+      warn("Ignoring producer exception. All messages in the failed producer request will be dropped!")
+
+      if (isFatalException) {
+        producerLock.synchronized {
+          // Prevent each callback from recreating producer for the same failure.
+          if (currentProducer == producer) {
+            info("Creating a new producer for system %s." format systemName)
+            try {
+              currentProducer.close(0, TimeUnit.MILLISECONDS)
+            } catch {
+              case exception: Exception => error("Exception while closing producer.", exception)
+            }
+            producer = getProducer()
           }
         }
-
-        //if there is an exception thrown from the previous callbacks just before flush, we have to fail the container
-        if (sourceData.exceptionInCallback.get() != null) {
-          metrics.flushFailed.inc
-          throw sourceData.exceptionInCallback.get()
-        } else {
-          trace("Flushed %s." format (source))
+      }
+    } else {
+      // If there is an exception in the callback, it means that the Kafka producer has exhausted the max-retries
+      // Close producer to ensure messages queued in-flight are not sent and hence, avoid re-ordering
+      // This works because there is only 1 callback thread and no sends can complete until the callback returns.
+      if (isFatalException) {
+        fatalException.compareAndSet(null, producerException)
+        try {
+          currentProducer.close(0, TimeUnit.MILLISECONDS)
+        } catch {
+          case exception: Exception => error("Exception while closing producer.", exception)
         }
       }
     }
   }
+
+  /**
+    * A fatal exception is one that corrupts the producer or otherwise makes it unusable.
+    * We want to handle non-fatal exceptions differently because they can often be handled by the user
+    * and that's preferable because it gives users that drop exceptions a way to do that with less
+    * data loss (no collateral damage from batches of messages getting dropped)
+    *
+    * @param exception  the exception to check
+    * @return           true if the exception is unrecoverable.
+    */
+  private def isFatalException(exception: Exception): Boolean = {
+    exception match {
+      case _: SerializationException => false
+      case _: ClassCastException => false
+      case _ => true
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
index aaa949d..024c6e6 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
@@ -31,15 +31,16 @@ import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReferenceArray;
+import kafka.producer.ProducerClosedException;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.test.TestUtils;
 
@@ -49,8 +50,11 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
   private List<FutureTask<RecordMetadata>> _callbacksList = new ArrayList<FutureTask<RecordMetadata>>();
   private boolean shouldBuffer = false;
   private boolean errorNext = false;
+  private boolean errorInCallback = true;
   private Exception exception = null;
   private AtomicInteger msgsSent = new AtomicInteger(0);
+  private boolean closed = false;
+  private int openCount = 0;
 
   /*
    * Helps mock out buffered behavior seen in KafkaProducer. This MockKafkaProducer enables you to:
@@ -72,8 +76,9 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
     this.shouldBuffer = shouldBuffer;
   }
 
-  public void setErrorNext(boolean errorNext, Exception exception) {
+  public void setErrorNext(boolean errorNext, boolean errorInCallback, Exception exception) {
     this.errorNext = errorNext;
+    this.errorInCallback = errorInCallback;
     this.exception = exception;
   }
 
@@ -82,27 +87,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
   }
 
   public Thread startDelayedSendThread(final int sleepTime) {
-    Thread t = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        FutureTask[] callbackArray = new FutureTask[_callbacksList.size()];
-        AtomicReferenceArray<FutureTask> _bufferList = new AtomicReferenceArray<FutureTask>(_callbacksList.toArray(callbackArray));
-        ExecutorService executor = Executors.newFixedThreadPool(10);
-        try {
-          for(int i = 0; i < _bufferList.length(); i++) {
-            Thread.sleep(sleepTime);
-            FutureTask f = _bufferList.get(i);
-            if(!f.isDone()) {
-              executor.submit(f).get();
-            }
-          }
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-        } catch (ExecutionException ee) {
-          ee.printStackTrace();
-        }
-      }
-    });
+    Thread t = new Thread(new FlushRunnable(sleepTime));
     t.start();
     return t;
   }
@@ -118,7 +103,14 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
 
   @Override
   public Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+    if (closed) {
+      throw new ProducerClosedException();
+    }
     if (errorNext) {
+      if (!errorInCallback) {
+        this.errorNext = false;
+        throw (RuntimeException)exception;
+      }
       if (shouldBuffer) {
         FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() {
           @Override
@@ -171,16 +163,31 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
 
   @Override
   public void close() {
-
+    close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
   }
 
   @Override
   public void close(long timeout, TimeUnit timeUnit) {
+    closed = true;
+    // The real producer will flush messages as part of closing. We'll invoke flush here to approximate that behavior.
+    new FlushRunnable(0).run();
+  }
 
+  public void open() {
+    this.closed = false;
+    openCount++;
   }
 
-  public synchronized void flush () {
+  public boolean isClosed() {
+    return closed;
+  }
 
+  public int getOpenCount() {
+    return openCount;
+  }
+
+  public synchronized void flush () {
+    new FlushRunnable(0).run();
   }
 
 
@@ -253,4 +260,34 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
       return true;
     }
   }
+
+  private class FlushRunnable implements Runnable {
+    private final int _sleepTime;
+
+    public FlushRunnable(int sleepTime) {
+      _sleepTime = sleepTime;
+    }
+
+    public void run() {
+      FutureTask[] callbackArray = new FutureTask[_callbacksList.size()];
+      AtomicReferenceArray<FutureTask> _bufferList =
+          new AtomicReferenceArray<FutureTask>(_callbacksList.toArray(callbackArray));
+      ExecutorService executor = Executors.newFixedThreadPool(10);
+      try {
+        for (int i = 0; i < _bufferList.length(); i++) {
+          Thread.sleep(_sleepTime);
+          FutureTask f = _bufferList.get(i);
+          if (!f.isDone()) {
+            executor.submit(f).get();
+          }
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (ExecutionException ee) {
+        ee.printStackTrace();
+      } finally {
+        executor.shutdownNow();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
index 224ca2f..7fc450d 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
@@ -48,7 +48,7 @@ public class TestKafkaSystemProducerJava {
       public Object apply() {
         return System.currentTimeMillis();
       }
-    });
+    }, false);
 
     long now = System.currentTimeMillis();
     assertTrue((Long)ksp.clock().apply() >= now);

http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index d4b8150..9d1e99b 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -31,7 +31,7 @@ import org.junit.Before
 class TestKafkaConfig {
 
   var props : Properties = new Properties
-  val SYSTEM_NAME = "kafka";
+  val SYSTEM_NAME = "kafka"
   val KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer."
   val TEST_CLIENT_ID = "TestClientId"
   val TEST_GROUP_ID = "TestGroupId"
@@ -147,9 +147,9 @@ class TestKafkaConfig {
 
   @Test
   def testMaxInFlightRequestsPerConnectionOverride() {
-    val expectedValue = "200";
+    val expectedValue = "200"
 
-    props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, expectedValue);
+    props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, expectedValue)
 
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
@@ -161,9 +161,9 @@ class TestKafkaConfig {
 
   @Test
   def testRetriesOverride() {
-    val expectedValue = "200";
+    val expectedValue = "200"
 
-    props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, expectedValue);
+    props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, expectedValue)
 
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
@@ -175,7 +175,7 @@ class TestKafkaConfig {
 
   @Test(expected = classOf[NumberFormatException])
   def testMaxInFlightRequestsPerConnectionWrongNumberFormat() {
-    props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "Samza");
+    props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "Samza")
 
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
@@ -185,7 +185,17 @@ class TestKafkaConfig {
 
   @Test(expected = classOf[NumberFormatException])
   def testRetriesWrongNumberFormat() {
-    props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, "Samza");
+    props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, "Samza")
+
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
+    kafkaProducerConfig.getProducerProperties
+  }
+
+  @Test(expected = classOf[NumberFormatException])
+  def testLingerWrongNumberFormat() {
+    props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.LINGER_MS_CONFIG, "Samza")
 
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)

http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
index 7331611..9117be5 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
@@ -19,22 +19,18 @@
 
 package org.apache.samza.system.kafka
 
-import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream}
-import org.junit.Test
-
 import org.apache.kafka.clients.producer._
-import java.util
+import org.apache.kafka.common.errors.{RecordTooLargeException, SerializationException, TimeoutException}
+import org.apache.kafka.test.MockSerializer
+import org.apache.samza.system.{OutgoingMessageEnvelope, SystemProducerException, SystemStream}
 import org.junit.Assert._
+import org.junit.Test
 import org.scalatest.Assertions.intercept
-import org.apache.kafka.common.errors.{TimeoutException, RecordTooLargeException}
-import org.apache.kafka.test.MockSerializer
-import org.apache.samza.SamzaException
 
 
 class TestKafkaSystemProducer {
-
-  val someMessage = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "test".getBytes)
-  val StreamNameNullOrEmptyErrorMsg = "Stream Name should be specified in the stream configuration file.";
+  val systemStream = new SystemStream("testSystem", "testStream")
+  val someMessage = new OutgoingMessageEnvelope(systemStream, "test".getBytes)
 
   @Test
   def testKafkaProducer {
@@ -64,9 +60,9 @@ class TestKafkaSystemProducer {
 
   @Test
   def testKafkaProducerBufferedSend {
-    val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
-    val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
-    val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
 
     val mockProducer = new MockKafkaProducer(1, "test", 1)
     val producerMetrics = new KafkaSystemProducerMetrics
@@ -91,9 +87,9 @@ class TestKafkaSystemProducer {
 
   @Test
   def testKafkaProducerFlushSuccessful {
-    val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
-    val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
-    val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
 
     val mockProducer = new MockKafkaProducer(1, "test", 1)
     val systemProducer = new KafkaSystemProducer(systemName = "test",
@@ -115,10 +111,10 @@ class TestKafkaSystemProducer {
 
   @Test
   def testKafkaProducerFlushWithException {
-    val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
-    val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
-    val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
-    val msg4 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "d".getBytes)
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
 
     val mockProducer = new MockKafkaProducer(1, "test", 1)
     val systemProducer = new KafkaSystemProducer(systemName = "test",
@@ -130,27 +126,27 @@ class TestKafkaSystemProducer {
 
     mockProducer.setShouldBuffer(true)
     systemProducer.send("test", msg2)
-    mockProducer.setErrorNext(true, new RecordTooLargeException())
+    mockProducer.setErrorNext(true, true, new RecordTooLargeException())
     systemProducer.send("test", msg3)
     systemProducer.send("test", msg4)
 
     assertEquals(1, mockProducer.getMsgsSent)
 
     mockProducer.startDelayedSendThread(2000)
-    val thrown = intercept[SamzaException] {
+    val thrown = intercept[SystemProducerException] {
       systemProducer.flush("test")
     }
-    assertTrue(thrown.isInstanceOf[SamzaException])
+    assertTrue(thrown.isInstanceOf[SystemProducerException])
     assertEquals(3, mockProducer.getMsgsSent) // msg1, msg2 and msg4 will be sent
     systemProducer.stop()
   }
 
   @Test
   def testKafkaProducerWithRetriableException {
-    val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
-    val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
-    val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
-    val msg4 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "d".getBytes)
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
 
     val mockProducer = new MockKafkaProducer(1, "test", 1)
     val producerMetrics = new KafkaSystemProducerMetrics()
@@ -165,52 +161,383 @@ class TestKafkaSystemProducer {
     producer.send("test", msg3)
     producer.flush("test")
 
-    mockProducer.setErrorNext(true, new TimeoutException())
+    mockProducer.setErrorNext(true, true, new TimeoutException())
 
     producer.send("test", msg4)
-    val thrown = intercept[SamzaException] {
+    val thrown = intercept[SystemProducerException] {
       producer.flush("test")
     }
-    assertTrue(thrown.isInstanceOf[SamzaException])
-    assertTrue(thrown.getCause.isInstanceOf[TimeoutException])
+    assertTrue(thrown.isInstanceOf[SystemProducerException])
+    assertTrue(thrown.getCause.getCause.isInstanceOf[TimeoutException])
     assertEquals(3, mockProducer.getMsgsSent)
     producer.stop()
   }
 
+  /**
+    * If there's an exception, we should:
+    * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order.
+    * 2. Record the original exception
+    * 3. Throw the exception every time a KafkaSystemProducer method is invoked until the container fails.
+    *
+    * Assumptions:
+    * 1. SystemProducer.flush() can happen concurrently with SystemProducer.send() for a particular TaskInstance (task.async.commit)
+    * 2. SystemProducer.flush() cannot happen concurrently with itself for a particular task instance
+    * 3. Any exception thrown from SystemProducer.flush() will prevent the checkpointing and fail the container
+    * 4. A single KafkaProducer is shared by all the tasks so any failure from one task can affect the others.
+    *
+    * Conclusions:
+    * It is only safe to handle the async exceptions from by closing the producer and failing the container.
+    * This prevents race conditons with setting/clearing exceptions and recreating the producer that could cause data
+    * loss by checkpointing a failed offset.
+    *
+    * Inaccuracies:
+    * A real kafka producer succeeds or fails all the messages in a batch. In other words, the messages of a batch all
+    * fail or they all succeed together. This test, however, fails individual callbacks in order to test boundary
+    * conditions where the batches align perfectly around the failed send().
+    */
   @Test
-  def testKafkaProducerWithNonRetriableExceptions {
-    val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
-    val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
-    val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
-    val msg4 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "d".getBytes)
+  def testKafkaProducerWithFatalExceptions {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+    val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
     val producerMetrics = new KafkaSystemProducerMetrics()
 
     val mockProducer = new MockKafkaProducer(1, "test", 1)
     val producer = new KafkaSystemProducer(systemName =  "test",
-                                           getProducer = () => mockProducer,
+                                           getProducer = () => {
+                                             mockProducer.open() // A new producer is never closed
+                                             mockProducer
+                                           },
                                            metrics = producerMetrics)
     producer.register("test")
     producer.start()
+
     producer.send("test", msg1)
     producer.send("test", msg2)
-    producer.send("test", msg3)
-    mockProducer.setErrorNext(true, new RecordTooLargeException())
+    mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+    producer.send("test", msg3) // Callback exception
+    assertTrue(mockProducer.isClosed)
+    assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
 
-    producer.send("test", msg4)
-    val thrown = intercept[SamzaException] {
-       producer.flush("test")
+    val senderException = intercept[SystemProducerException] {
+      producer.send("test", msg4) // Should fail because the producer is closed.
     }
-    assertTrue(thrown.isInstanceOf[SamzaException])
-    assertTrue(thrown.getCause.isInstanceOf[RecordTooLargeException])
-    assertEquals(3, mockProducer.getMsgsSent)
+    assertTrue(senderException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+    val callbackException = intercept[SystemProducerException] {
+       producer.flush("test") // Should throw the callback exception
+    }
+    assertTrue(callbackException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+    val postFlushException = intercept[SystemProducerException] {
+      producer.send("test", msg5) // Should not be able to send again after flush
+    }
+    assertTrue(postFlushException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+    val callbackException2 = intercept[SystemProducerException] {
+      producer.flush("test") // Should rethrow the exception
+    }
+    assertTrue(callbackException2.getCause.getCause.isInstanceOf[RecordTooLargeException])
+    assertEquals(2, mockProducer.getMsgsSent) // only the messages before the error get sent
+    producer.stop()
+  }
+
+  /**
+    * Recapping from [[testKafkaProducerWithFatalExceptions]]:
+    *
+    * If there's an exception, we should:
+    * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order.
+    * 2. Record the original exception
+    * 3. Throw the exception every time a KafkaSystemProducer method is invoked until the container fails.
+    *
+    * This test focuses on point 3. Particularly it ensures that the failures are handled properly across multiple sources
+    * which share the same producer.
+    */
+  @Test
+  def testKafkaProducerWithFatalExceptionsMultipleSources {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+    val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+    val msg6 = new OutgoingMessageEnvelope(systemStream, "f".getBytes)
+    val msg7 = new OutgoingMessageEnvelope(systemStream, "g".getBytes)
+    val producerMetrics = new KafkaSystemProducerMetrics()
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val producer = new KafkaSystemProducer(systemName =  "test",
+      getProducer = () => {
+        mockProducer.open() // A new producer is never closed
+        mockProducer
+      },
+      metrics = producerMetrics)
+    producer.register("test1")
+    producer.register("test2")
+
+    producer.start()
+
+    // Initial sends
+    producer.send("test1", msg1)
+    producer.send("test2", msg2)
+
+    // Inject error for next send
+    mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+    producer.send("test1", msg3) // Callback exception
+    assertTrue(mockProducer.isClosed)
+    assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
+
+    // Subsequent sends
+    val senderException = intercept[SystemProducerException] {
+      producer.send("test1", msg4) // Should fail because the producer is closed.
+    }
+    assertTrue(senderException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+    val callbackException = intercept[SystemProducerException] {
+      producer.send("test2", msg4) // First send from separate source gets a producer closed exception
+    }
+    assertTrue(callbackException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+    val callbackException2 = intercept[SystemProducerException] {
+      producer.send("test2", msg5) // Second send should still get the error
+    }
+    assertTrue(callbackException2.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+    // Flushes
+    val callbackException3 = intercept[SystemProducerException] {
+      producer.flush("test2") // Should rethrow the closed exception in flush
+    }
+    assertTrue(callbackException3.isInstanceOf[SystemProducerException])
+    assertTrue(callbackException3.getCause.getCause.isInstanceOf[RecordTooLargeException])
+    intercept[SystemProducerException] {
+      producer.send("test2", msg6) // Should still not be able to send after flush
+    }
+
+    val thrown3 = intercept[SystemProducerException] {
+      producer.flush("test1") // Should throw the callback exception
+    }
+    assertTrue(thrown3.isInstanceOf[SystemProducerException])
+    assertTrue(thrown3.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+    intercept[SystemProducerException] {
+      producer.send("test1", msg7) // Should still not be able to send after flush
+    }
+
+    intercept[SystemProducerException] {
+      producer.flush("test1") // Should throw the callback exception
+    }
+    assertEquals(2, mockProducer.getMsgsSent)
+    producer.stop()
+  }
+
+  @Test
+  def testKafkaProducerWithNonFatalExceptionsMultipleSources {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+    val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+    val producerMetrics = new KafkaSystemProducerMetrics()
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val producer = new KafkaSystemProducer(systemName = "test",
+      getProducer = () => {
+        mockProducer.open() // A new producer is never closed
+        mockProducer
+      },
+      metrics = producerMetrics)
+    producer.register("test1")
+    producer.register("test2")
+    producer.start()
+
+    producer.send("test1", msg1)
+    producer.send("test2", msg2)
+    mockProducer.setErrorNext(true, false, new SerializationException())
+    val sendException = intercept[SystemProducerException] {
+      producer.send("test1", msg3) // User-thread exception
+    }
+    assertTrue(sendException.getCause.isInstanceOf[SerializationException])
+    assertFalse(mockProducer.isClosed)
+    assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
+
+    producer.send("test1", msg3) // Should be able to resend msg3
+    producer.send("test2", msg4) // Second source should not be affected
+
+    producer.flush("test1") // Flush should be unaffected
+
+    producer.send("test1", msg5) // Should be able to send again after flush
+
+    assertEquals(5, mockProducer.getMsgsSent) // only the messages before the error get sent
+    producer.stop()
+  }
+
+  /**
+    * If there's an exception and the user configured task.drop.producer.errors, we should:
+    * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order.
+    * 2. Recreate the producer.
+    * 3. Ignore any messages that were dropped (user knows they're signing up for this if they enable the option)
+    *
+    * Assumptions:
+    * 1. SystemProducer.flush() can happen concurrently with SystemProducer.send() for a particular TaskInstance (task.async.commit)
+    * 2. SystemProducer.flush() cannot happen concurrently with itself for a particular task instance
+    * 3. Any exception thrown from SystemProducer.flush() will prevent the checkpointing and fail the container
+    * 4. A single KafkaProducer is shared by all the tasks so any failure from one task can affect the others.
+    *
+    * Conclusions:
+    * If the user is ok with dropping messages for the sake of availability, we will swallow all exceptions and
+    * recreate the producer to recover. There are no guarantees how many messages are lost, but the send-failed metric
+    * should be accurate, so users should alert on that.
+    *
+    * Inaccuracies:
+    * A real kafka producer succeeds or fails all the messages in a batch. In other words, the messages of a batch all
+    * fail or they all succeed together. This test, however, fails individual callbacks in order to test boundary
+    * conditions where the batches align perfectly around the failed send().
+    */
+  @Test
+  def testKafkaProducerWithFatalExceptionsDroppingExceptions {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+    val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+    val producerMetrics = new KafkaSystemProducerMetrics()
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val producer = new KafkaSystemProducer(systemName =  "test",
+      getProducer = () => {
+        mockProducer.open() // A new producer is never closed
+        mockProducer
+      },
+      metrics = producerMetrics,
+      dropProducerExceptions = true)  // Here's where we enable exception dropping.
+    producer.register("test")
+    producer.start()
+
+    producer.send("test", msg1)
+    producer.send("test", msg2)
+    mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+    producer.send("test", msg3) // Callback exception
+    assertFalse(mockProducer.isClosed)
+    assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount)
+
+    producer.send("test", msg4) // Should succeed because the producer recovered.
+    producer.flush("test") // Should not throw
+
+    producer.send("test", msg5) // Should be able to send again after flush
+    producer.flush("test")
+
+    assertEquals(4, mockProducer.getMsgsSent) // every message except the one with the error should get sent
+    producer.stop()
+  }
+
+  /**
+    * Recapping from [[testKafkaProducerWithFatalExceptionsDroppingExceptions]]:
+    *
+    * If there's an exception, we should:
+    * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order.
+    * 2. Recreate the producer.
+    * 3. Ignore any messages that were dropped (user knows they're signing up for this if they enable the option)
+    *
+    * This test ensures that the failures are handled properly across multiple sources
+    * which share the same producer.
+    */
+  @Test
+  def testKafkaProducerWithFatalExceptionsMultipleSourcesDroppingExceptions {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+    val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+    val msg6 = new OutgoingMessageEnvelope(systemStream, "f".getBytes)
+    val msg7 = new OutgoingMessageEnvelope(systemStream, "g".getBytes)
+    val producerMetrics = new KafkaSystemProducerMetrics()
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val producer = new KafkaSystemProducer(systemName =  "test",
+      getProducer = () => {
+        mockProducer.open() // A new producer is never closed
+        mockProducer
+      },
+      metrics = producerMetrics,
+      dropProducerExceptions = true)  // Here's where we enable exception dropping.
+    producer.register("test1")
+    producer.register("test2")
+
+    producer.start()
+
+    // Initial sends
+    producer.send("test1", msg1)
+    producer.send("test2", msg2)
+
+    // Inject error for next send
+    mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+    producer.send("test1", msg3) // Callback exception
+    assertFalse(mockProducer.isClosed)
+    assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount)
+
+    // Subsequent sends
+    producer.send("test1", msg4) // Should succeed because the producer recovered.
+    producer.send("test2", msg5) // Second source should also not have any error.
+
+    // Flushes
+    producer.flush("test2") // Should not throw for test2
+    producer.send("test2", msg6) // Should still work after flush
+
+    producer.flush("test1") // Should not throw for test1 either
+    producer.send("test1", msg7)
+
+    assertEquals(6, mockProducer.getMsgsSent) // every message except the one with the error should get sent
+    producer.stop()
+  }
+
+  @Test
+  def testKafkaProducerWithNonFatalExceptionsMultipleSourcesDroppingExceptions {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+    val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+    val producerMetrics = new KafkaSystemProducerMetrics()
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val producer = new KafkaSystemProducer(systemName = "test",
+      getProducer = () => {
+        mockProducer.open() // A new producer is never closed
+        mockProducer
+      },
+      metrics = producerMetrics,
+      dropProducerExceptions = true)  // Here's where we enable exception dropping.
+    producer.register("test1")
+    producer.register("test2")
+    producer.start()
+
+    producer.send("test1", msg1)
+    producer.send("test2", msg2)
+    mockProducer.setErrorNext(true, false, new SerializationException())
+    val sendException = intercept[SystemProducerException] {
+      producer.send("test1", msg3) // User-thread exception
+    }
+    assertTrue(sendException.getCause.isInstanceOf[SerializationException])
+    assertFalse(mockProducer.isClosed)
+    assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
+
+    producer.send("test1", msg3) // Should be able to resend msg3
+    producer.send("test2", msg4) // Second source should not be affected
+
+    producer.flush("test1") // Flush should be unaffected
+
+    producer.send("test1", msg5) // Should be able to send again after flush
+
+    assertEquals(5, mockProducer.getMsgsSent) // only the messages before the error get sent
     producer.stop()
   }
 
   @Test
   def testKafkaProducerFlushMsgsWhenStop {
-    val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a".getBytes)
-    val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b".getBytes)
-    val msg3 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "c".getBytes)
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
     val msg4 = new OutgoingMessageEnvelope(new SystemStream("test2", "test"), "d".getBytes)
 
     val mockProducer = new MockKafkaProducer(1, "test", 1)
@@ -253,7 +580,7 @@ class TestKafkaSystemProducer {
       producer.send("testSrc2", omeStreamNameEmpty)
       assertEquals(0, mockProducer.getMsgsSent)
     }
-    assertTrue(thrownNull.getMessage() == StreamNameNullOrEmptyErrorMsg)
-    assertTrue(thrownEmpty.getMessage() == StreamNameNullOrEmptyErrorMsg)
+    assertTrue(thrownNull.getMessage() == "Invalid system stream: " + omeStreamNameNull.getSystemStream)
+    assertTrue(thrownEmpty.getMessage() == "Invalid system stream: " + omeStreamNameEmpty.getSystemStream)
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
index 8276c18..f674685 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
@@ -28,7 +28,8 @@ import org.mockito.Mockito._
 
 class TestKeyValueStorageEngine {
   var engine: KeyValueStorageEngine[String, String] = null
-  var metrics: KeyValueStorageEngineMetrics = null;
+  var metrics: KeyValueStorageEngineMetrics = null
+  var now: Long = 0L
 
   @Before
   def setup() {
@@ -36,7 +37,7 @@ class TestKeyValueStorageEngine {
     val rawKv = mock(classOf[KeyValueStore[Array[Byte], Array[Byte]]])
     val properties = mock(classOf[StoreProperties])
     metrics = new KeyValueStorageEngineMetrics
-    engine = new KeyValueStorageEngine[String, String](properties, wrapperKv, rawKv, metrics)
+    engine = new KeyValueStorageEngine[String, String](properties, wrapperKv, rawKv, metrics, clock = () => { getNextTimestamp() })
   }
 
   @After
@@ -48,8 +49,8 @@ class TestKeyValueStorageEngine {
   def testGetAndPut(): Unit = {
     var prevGets = metrics.gets.getCount
     var prevGetNsSnapshotSize = metrics.getNs.getSnapshot.getSize
-    var valueForK1 = engine.get("k1");
-    assertNull("k1 is not existing before put", valueForK1);
+    var valueForK1 = engine.get("k1")
+    assertNull("k1 is not existing before put", valueForK1)
     assertEquals("get counter increments by 1", 1, metrics.gets.getCount - prevGets)
     assertEquals("get timer has 1 additional data point" , 1,  metrics.getNs.getSnapshot.getSize - prevGetNsSnapshotSize)
 
@@ -127,4 +128,9 @@ class TestKeyValueStorageEngine {
     }
     assertFalse("no next after iterating 2 keys in the range", iter.hasNext)
   }
+
+  def getNextTimestamp(): Long = {
+    now += 1
+    now
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 59e9a89..a007c77 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -132,6 +132,7 @@ object StreamTaskTestUtil {
     config.put("serializer.class", "kafka.serializer.StringEncoder")
     config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
     config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(Integer.MAX_VALUE-1)).toString())
+    config.put(ProducerConfig.LINGER_MS_CONFIG, "0")
     val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
 
     producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)

http://git-wip-us.apache.org/repos/asf/samza/blob/3773ec69/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 734487b..3ef08e5 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -97,7 +97,7 @@ class TestStatefulTask extends StreamTaskTestUtil {
     TestStatefulTask.MESSAGES_SEND_1.foreach(m => send(task, m))
 
     // Validate that messages appear in store stream.
-    val messages = readAll(TestStatefulTask.STATE_TOPIC_STREAM, 5, "testShouldStartTaskForFirstTime")
+    val messages = readAll(TestStatefulTask.STATE_TOPIC_STREAM, TestStatefulTask.MESSAGES_RECV_1.length-1, "testShouldStartTaskForFirstTime")
 
     assertEquals(TestStatefulTask.MESSAGES_RECV_1, messages)