You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/09/18 00:24:21 UTC

git commit: SAMZA-407; add ability to drop exceptions rather than fail container and add metrics to track

Repository: incubator-samza
Updated Branches:
  refs/heads/master 811f2897c -> 3e0b3a2b6


SAMZA-407; add ability to drop exceptions rather than fail container and add metrics to track


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/3e0b3a2b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/3e0b3a2b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/3e0b3a2b

Branch: refs/heads/master
Commit: 3e0b3a2b6c146b8fab13aaf4b6798a696b2e52f5
Parents: 811f289
Author: David Chen <dc...@linkedin.com>
Authored: Wed Sep 17 15:24:12 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Sep 17 15:24:12 2014 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |  10 ++
 .../java/org/apache/samza/metrics/Gauge.java    |   2 +-
 .../org/apache/samza/config/TaskConfig.scala    |   3 +
 .../apache/samza/container/SamzaContainer.scala |  15 +-
 .../apache/samza/container/TaskInstance.scala   |  27 +--
 .../TaskInstanceExceptionHandler.scala          | 103 ++++++++++++
 .../apache/samza/container/TestRunLoop.scala    |   2 +-
 .../samza/container/TestSamzaContainer.scala    |   4 +-
 .../samza/container/TestTaskInstance.scala      | 163 ++++++++++++++++++-
 .../samza/metrics/TestMetricsHelper.scala       |   2 +-
 10 files changed, 308 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 526ca9f..069babe 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -403,6 +403,16 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="task-ignored-exceptions">task.ignored.exceptions</td>
+                    <td class="default"></td>
+                    <td class="description">
+                      This property specifies which exceptions should be ignored if thrown in a task's <code>process</code> or <code>window</code>
+                      methods. The exceptions to be ignored should be a comma-separated list of fully-qualified class names of the exceptions or
+                      <code>*</code> to ignore all exceptions.
+                    </td>
+                </tr>
+
+                <tr>
                     <th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th>
                 </tr>
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
index c37bfbb..be1f01d 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A Gauge is a {@link org.apache.samza.metrics.Metric} that wraps some instance of T in a thread-safe
- * reference and allows it to be set or retrieved.  Gauages record specific values over time.
+ * reference and allows it to be set or retrieved.  Gauges record specific values over time.
  * For example, the current length of a queue or the size of a buffer.
  *
  * @param <T> Instance to be wrapped in the gauge for metering.

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 21d8903..d066ed8 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -35,6 +35,7 @@ object TaskConfig {
   val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class"
   val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails
   val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
+  val IGNORED_EXCEPTIONS = "task.ignored.exceptions" // exceptions to ignore in process and window
 
   /**
    * Samza's container polls for more messages under two conditions. The first
@@ -95,4 +96,6 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) {
   def getDropSerialization = getOption(TaskConfig.DROP_SERIALIZATION_ERROR)
 
   def getPollIntervalMs = getOption(TaskConfig.POLL_INTERVAL_MS)
+
+  def getIgnoredExceptions = getOption(TaskConfig.IGNORED_EXCEPTIONS)
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 3288bf7..d91d6d7 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -19,8 +19,8 @@
 
 package org.apache.samza.container
 
-import org.apache.samza.util.Logging
 import java.io.File
+
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager}
@@ -56,11 +56,13 @@ import org.apache.samza.system.chooser.DefaultChooser
 import org.apache.samza.system.chooser.MessageChooserFactory
 import org.apache.samza.system.chooser.RoundRobinChooserFactory
 import org.apache.samza.task.StreamTask
+import org.apache.samza.task.TaskInstanceCollector
 import org.apache.samza.task.TaskLifecycleListener
 import org.apache.samza.task.TaskLifecycleListenerFactory
+import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
+
 import scala.collection.JavaConversions._
-import org.apache.samza.task.TaskInstanceCollector
 
 object SamzaContainer extends Logging {
 
@@ -86,8 +88,8 @@ object SamzaContainer extends Logging {
   }
 
   def safeMain(jmxServer: JmxServer = new JmxServer) {
-    // Break out the main method to make the JmxServer injectable so we can 
-    // validate that we don't leak JMX non-daemon threads if we have an 
+    // Break out the main method to make the JmxServer injectable so we can
+    // validate that we don't leak JMX non-daemon threads if we have an
     // exception in the main method.
     try {
       val containerName = System.getenv(ShellCommandConfig.ENV_CONTAINER_NAME)
@@ -374,7 +376,7 @@ object SamzaContainer extends Logging {
       }
     }
 
-    // TODO not sure how we should make this config based, or not. Kind of 
+    // TODO not sure how we should make this config based, or not. Kind of
     // strange, since it has some dynamic directories when used with YARN.
     val storeBaseDir = new File(System.getProperty("user.dir"), "state")
 
@@ -492,7 +494,8 @@ object SamzaContainer extends Logging {
         storageManager = storageManager,
         reporters = reporters,
         listeners = listeners,
-        systemStreamPartitions = systemStreamPartitions)
+        systemStreamPartitions = systemStreamPartitions,
+        exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, config))
 
       (taskName, taskInstance)
     }).toMap

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index b86fb0d..66f7dbe 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -19,24 +19,26 @@
 
 package org.apache.samza.container
 
-import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.OffsetManager
 import org.apache.samza.config.Config
-import org.apache.samza.util.Logging
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.storage.TaskStorageManager
+import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemConsumers
 import org.apache.samza.task.TaskContext
 import org.apache.samza.task.ClosableTask
 import org.apache.samza.task.InitableTask
-import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.task.WindowableTask
 import org.apache.samza.task.TaskLifecycleListener
 import org.apache.samza.task.StreamTask
-import org.apache.samza.system.SystemConsumers
 import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.checkpoint.OffsetManager
-import org.apache.samza.SamzaException
-import scala.collection.JavaConversions._
 import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.util.Logging
+
+import scala.collection.JavaConversions._
 
 class TaskInstance(
   task: StreamTask,
@@ -49,7 +51,8 @@ class TaskInstance(
   storageManager: TaskStorageManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
   listeners: Seq[TaskLifecycleListener] = Seq(),
-  val systemStreamPartitions: Set[SystemStreamPartition] = Set()) extends Logging {
+  val systemStreamPartitions: Set[SystemStreamPartition] = Set(),
+  val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler) extends Logging {
   val isInitableTask = task.isInstanceOf[InitableTask]
   val isWindowableTask = task.isInstanceOf[WindowableTask]
   val isClosableTask = task.isInstanceOf[ClosableTask]
@@ -130,7 +133,9 @@ class TaskInstance(
 
     trace("Processing incoming message envelope for taskName and SSP: %s, %s" format (taskName, envelope.getSystemStreamPartition))
 
-    task.process(envelope, collector, coordinator)
+    exceptionHandler.maybeHandle {
+      task.process(envelope, collector, coordinator)
+    }
 
     listeners.foreach(_.afterProcess(envelope, config, context))
 
@@ -145,7 +150,9 @@ class TaskInstance(
 
       metrics.windows.inc
 
-      task.asInstanceOf[WindowableTask].window(collector, coordinator)
+      exceptionHandler.maybeHandle {
+        task.asInstanceOf[WindowableTask].window(collector, coordinator)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala
new file mode 100644
index 0000000..99b729f
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import org.apache.samza.config.Config
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.util.Logging
+
+/**
+ * Handles exceptions thrown in a {@link TaskInstance}'s process or window
+ * methods and provides metrics on the number of times ignored exceptions are
+ * thrown. The exceptions to ignore are specified using the
+ * `task.ignored.exceptions` configuration property.
+ *
+ * @param metrics The {@link TaskInstanceMetrics} used to track exception
+ *        counts.
+ * @param ignoredExceptions Set of string names of exception classes to ignore
+ *        and count. If the set contains the wildcard "*", then all exceptions
+ *        are ignored and counted.
+ */
+class TaskInstanceExceptionHandler(
+  val metrics: MetricsHelper = new TaskInstanceMetrics,
+  val ignoredExceptions: Set[String] = Set[String]()) extends Logging {
+
+  val ignoreAll: Boolean = ignoredExceptions.contains("*")
+  var counters: Map[String, Counter] = Map[String, Counter]()
+
+  /**
+   * Takes a code block and handles any exception thrown in the code block.
+   *
+   * @param tryCodeBlock The code block to run and handle exceptions from.
+   */
+  def maybeHandle(tryCodeBlock: => Unit) {
+    try {
+      tryCodeBlock
+    } catch {
+      case e: Exception => handle(e)
+    }
+  }
+
+  /**
+   * Handles an exception. If the exception is in the set of exceptions to
+   * ignore or if the wildcard is used to ignore all exceptions, then the
+   * exception is counted and then ignored. Otherwise, the exception is thrown.
+   *
+   * @param exception The exception to handle.
+   */
+  def handle(exception: Exception) {
+    val className = exception.getClass.getName
+    if (!ignoreAll && !ignoredExceptions.contains(className)) {
+      throw exception
+    }
+
+    debug("Counting exception " + className)
+
+    counters.get(className) match {
+      case Some(counter) => counter.inc()
+      case _ => {
+        val counter = metrics.newCounter("exception-ignored-" + className)
+        counter.inc()
+        counters += className -> counter
+      }
+    }
+  }
+}
+
+object TaskInstanceExceptionHandler {
+  /**
+   * Creates a new TaskInstanceExceptionHandler using the provided
+   * configuration.
+   *
+   * @param metrics The {@link TaskInstanceMetrics} used to track exception
+   *        counts.
+   * @param config The configuration to read the list of ignored exceptions
+   *        from.
+   */
+  def apply(metrics: MetricsHelper, config: Config) =
+    new TaskInstanceExceptionHandler(
+      metrics = metrics,
+      ignoredExceptions = config.getIgnoredExceptions match {
+        case Some(exceptions) => exceptions.split(",").toSet
+        case _ => Set[String]()
+      })
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index ff425da..ea48853 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -211,4 +211,4 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ShouldMatche
     testMetrics.processMs.getSnapshot.getSize should equal(2)
     testMetrics.commitMs.getSnapshot.getSize should equal(2)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index b7a9569..c200601 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -56,7 +56,7 @@ class TestSamzaContainer extends AssertionsForJUnit {
       }
     }
     intercept[Exception] {
-      // Calling main will trigger an NPE since the container checks for an 
+      // Calling main will trigger an NPE since the container checks for an
       // isCompressed environment variable, which isn't set.
       SamzaContainer.safeMain(jmxServer)
     }
@@ -128,7 +128,7 @@ class TestSamzaContainer extends AssertionsForJUnit {
       container.run
       fail("Expected exception to be thrown in run method.")
     } catch {
-      case e: Exception => // Expected 
+      case e: Exception => // Expected
     }
     assertTrue(task.wasShutdown)
   }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index c31a74e..11eab16 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -19,11 +19,16 @@
 
 package org.apache.samza.container
 
-import org.junit.Assert._
-import org.junit.Test
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.samza.SamzaException
 import org.apache.samza.Partition
 import org.apache.samza.checkpoint.OffsetManager
+import org.apache.samza.config.Config
 import org.apache.samza.config.MapConfig
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.Metric
+import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemConsumer
@@ -40,6 +45,10 @@ import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskCoordinator
 import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.task.WindowableTask
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.Assertions.intercept
 
 import scala.collection.JavaConversions._
 
@@ -81,4 +90,154 @@ class TestTaskInstance {
     assertTrue(lastProcessedOffset.isDefined)
     assertEquals("2", lastProcessedOffset.get)
   }
+
+  /**
+   * Mock exception used to test exception counts metrics.
+   */
+  class TroublesomeException extends RuntimeException {
+  }
+
+  /**
+   * Mock exception used to test exception counts metrics.
+   */
+  class NonFatalException extends RuntimeException {
+  }
+
+  /**
+   * Mock exception used to test exception counts metrics.
+   */
+  class FatalException extends RuntimeException {
+  }
+
+  /**
+   * Task used to test exception counts metrics.
+   */
+  class TroublesomeTask extends StreamTask with WindowableTask {
+    def process(
+      envelope: IncomingMessageEnvelope,
+      collector: MessageCollector,
+      coordinator: TaskCoordinator) {
+
+      envelope.getOffset().toInt match {
+        case offset if offset % 2 == 0 => throw new TroublesomeException
+        case _ => throw new NonFatalException
+      }
+    }
+
+    def window(collector: MessageCollector, coordinator: TaskCoordinator) {
+      throw new FatalException
+    }
+  }
+
+  /*
+   * Helper method used to retrieve the value of a counter from a group.
+   */
+  private def getCount(
+    group: ConcurrentHashMap[String, Metric],
+    name: String): Long = {
+    group.get("exception-ignored-" + name.toLowerCase).asInstanceOf[Counter].getCount
+  }
+
+  /**
+   * Test task instance exception metrics with two ignored exceptions and one
+   * exception not ignored.
+   */
+  @Test
+  def testExceptionCounts {
+    val task = new TroublesomeTask
+    val ignoredExceptions = classOf[TroublesomeException].getName + "," +
+      classOf[NonFatalException].getName
+    val config = new MapConfig(Map[String, String](
+      "task.ignored.exceptions" -> ignoredExceptions))
+
+    val partition = new Partition(0)
+    val consumerMultiplexer = new SystemConsumers(
+      new RoundRobinChooser,
+      Map[String, SystemConsumer]())
+    val producerMultiplexer = new SystemProducers(
+      Map[String, SystemProducer](),
+      new SerdeManager)
+    val systemStream = new SystemStream("test-system", "test-stream")
+    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+    // Pretend our last checkpointed (next) offset was 2.
+    val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
+    val taskName = new TaskName("taskName")
+    val collector = new TaskInstanceCollector(producerMultiplexer)
+
+    val registry = new MetricsRegistryMap
+    val taskMetrics = new TaskInstanceMetrics(registry = registry)
+    val taskInstance: TaskInstance = new TaskInstance(
+      task,
+      taskName,
+      config,
+      taskMetrics,
+      consumerMultiplexer,
+      collector,
+      offsetManager,
+      exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config))
+
+    val coordinator = new ReadableCoordinator(taskName)
+    taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "1", null, null), coordinator)
+    taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), coordinator)
+    taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "3", null, null), coordinator)
+
+    val group = registry.getGroup(taskMetrics.group)
+    assertEquals(1L, getCount(group, classOf[TroublesomeException].getName))
+    assertEquals(2L, getCount(group, classOf[NonFatalException].getName))
+
+    intercept[FatalException] {
+      taskInstance.window(coordinator)
+    }
+    assertFalse(group.contains(classOf[FatalException].getName.toLowerCase))
+  }
+
+  /**
+   * Test task instance exception metrics with all exception ignored using a
+   * wildcard.
+   */
+  @Test
+  def testIgnoreAllExceptions {
+    val task = new TroublesomeTask
+    val config = new MapConfig(Map[String, String](
+      "task.ignored.exceptions" -> "*"))
+
+    val partition = new Partition(0)
+    val consumerMultiplexer = new SystemConsumers(
+      new RoundRobinChooser,
+      Map[String, SystemConsumer]())
+    val producerMultiplexer = new SystemProducers(
+      Map[String, SystemProducer](),
+      new SerdeManager)
+    val systemStream = new SystemStream("test-system", "test-stream")
+    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+    // Pretend our last checkpointed (next) offset was 2.
+    val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
+    val taskName = new TaskName("taskName")
+    val collector = new TaskInstanceCollector(producerMultiplexer)
+
+    val registry = new MetricsRegistryMap
+    val taskMetrics = new TaskInstanceMetrics(registry = registry)
+    val taskInstance: TaskInstance = new TaskInstance(
+      task,
+      taskName,
+      config,
+      taskMetrics,
+      consumerMultiplexer,
+      collector,
+      offsetManager,
+      exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config))
+
+    val coordinator = new ReadableCoordinator(taskName)
+    taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "1", null, null), coordinator)
+    taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), coordinator)
+    taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "3", null, null), coordinator)
+    taskInstance.window(coordinator)
+
+    val group = registry.getGroup(taskMetrics.group)
+    assertEquals(1L, getCount(group, classOf[TroublesomeException].getName))
+    assertEquals(2L, getCount(group, classOf[NonFatalException].getName))
+    assertEquals(1L, getCount(group, classOf[FatalException].getName))
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala b/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala
index 0a62bd0..bd3e5fe 100644
--- a/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala
@@ -25,7 +25,7 @@ import org.apache.samza.container.SamzaContainerMetrics
 
 class TestMetricsHelper {
   @Test
-  def testMetricsHelperGroupShouldBePAckageName {
+  def testMetricsHelperGroupShouldBePackageName {
     assertEquals(classOf[SamzaContainerMetrics].getName, new SamzaContainerMetrics().group)
   }
 }