You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/09/18 00:24:21 UTC
git commit: SAMZA-407;
add ability to drop exceptions rather than fail container and add
metrics to track
Repository: incubator-samza
Updated Branches:
refs/heads/master 811f2897c -> 3e0b3a2b6
SAMZA-407; add ability to drop exceptions rather than fail container and add metrics to track
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/3e0b3a2b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/3e0b3a2b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/3e0b3a2b
Branch: refs/heads/master
Commit: 3e0b3a2b6c146b8fab13aaf4b6798a696b2e52f5
Parents: 811f289
Author: David Chen <dc...@linkedin.com>
Authored: Wed Sep 17 15:24:12 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Sep 17 15:24:12 2014 -0700
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 10 ++
.../java/org/apache/samza/metrics/Gauge.java | 2 +-
.../org/apache/samza/config/TaskConfig.scala | 3 +
.../apache/samza/container/SamzaContainer.scala | 15 +-
.../apache/samza/container/TaskInstance.scala | 27 +--
.../TaskInstanceExceptionHandler.scala | 103 ++++++++++++
.../apache/samza/container/TestRunLoop.scala | 2 +-
.../samza/container/TestSamzaContainer.scala | 4 +-
.../samza/container/TestTaskInstance.scala | 163 ++++++++++++++++++-
.../samza/metrics/TestMetricsHelper.scala | 2 +-
10 files changed, 308 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 526ca9f..069babe 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -403,6 +403,16 @@
</tr>
<tr>
+ <td class="property" id="task-ignored-exceptions">task.ignored.exceptions</td>
+ <td class="default"></td>
+ <td class="description">
+ This property specifies which exceptions should be ignored if thrown in a task's <code>process</code> or <code>window</code>
+ methods. The exceptions to be ignored should be a comma-separated list of fully-qualified class names of the exceptions or
+ <code>*</code> to ignore all exceptions.
+ </td>
+ </tr>
+
+ <tr>
<th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th>
</tr>
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
index c37bfbb..be1f01d 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* A Gauge is a {@link org.apache.samza.metrics.Metric} that wraps some instance of T in a thread-safe
- * reference and allows it to be set or retrieved. Gauages record specific values over time.
+ * reference and allows it to be set or retrieved. Gauges record specific values over time.
* For example, the current length of a queue or the size of a buffer.
*
* @param <T> Instance to be wrapped in the gauge for metering.
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 21d8903..d066ed8 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -35,6 +35,7 @@ object TaskConfig {
val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class"
val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails
val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
+ val IGNORED_EXCEPTIONS = "task.ignored.exceptions" // exceptions to ignore in process and window
/**
* Samza's container polls for more messages under two conditions. The first
@@ -95,4 +96,6 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) {
def getDropSerialization = getOption(TaskConfig.DROP_SERIALIZATION_ERROR)
def getPollIntervalMs = getOption(TaskConfig.POLL_INTERVAL_MS)
+
+ def getIgnoredExceptions = getOption(TaskConfig.IGNORED_EXCEPTIONS)
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 3288bf7..d91d6d7 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -19,8 +19,8 @@
package org.apache.samza.container
-import org.apache.samza.util.Logging
import java.io.File
+
import org.apache.samza.Partition
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager}
@@ -56,11 +56,13 @@ import org.apache.samza.system.chooser.DefaultChooser
import org.apache.samza.system.chooser.MessageChooserFactory
import org.apache.samza.system.chooser.RoundRobinChooserFactory
import org.apache.samza.task.StreamTask
+import org.apache.samza.task.TaskInstanceCollector
import org.apache.samza.task.TaskLifecycleListener
import org.apache.samza.task.TaskLifecycleListenerFactory
+import org.apache.samza.util.Logging
import org.apache.samza.util.Util
+
import scala.collection.JavaConversions._
-import org.apache.samza.task.TaskInstanceCollector
object SamzaContainer extends Logging {
@@ -86,8 +88,8 @@ object SamzaContainer extends Logging {
}
def safeMain(jmxServer: JmxServer = new JmxServer) {
- // Break out the main method to make the JmxServer injectable so we can
- // validate that we don't leak JMX non-daemon threads if we have an
+ // Break out the main method to make the JmxServer injectable so we can
+ // validate that we don't leak JMX non-daemon threads if we have an
// exception in the main method.
try {
val containerName = System.getenv(ShellCommandConfig.ENV_CONTAINER_NAME)
@@ -374,7 +376,7 @@ object SamzaContainer extends Logging {
}
}
- // TODO not sure how we should make this config based, or not. Kind of
+ // TODO not sure how we should make this config based, or not. Kind of
// strange, since it has some dynamic directories when used with YARN.
val storeBaseDir = new File(System.getProperty("user.dir"), "state")
@@ -492,7 +494,8 @@ object SamzaContainer extends Logging {
storageManager = storageManager,
reporters = reporters,
listeners = listeners,
- systemStreamPartitions = systemStreamPartitions)
+ systemStreamPartitions = systemStreamPartitions,
+ exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, config))
(taskName, taskInstance)
}).toMap
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index b86fb0d..66f7dbe 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -19,24 +19,26 @@
package org.apache.samza.container
-import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.OffsetManager
import org.apache.samza.config.Config
-import org.apache.samza.util.Logging
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.metrics.MetricsReporter
import org.apache.samza.storage.TaskStorageManager
+import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemConsumers
import org.apache.samza.task.TaskContext
import org.apache.samza.task.ClosableTask
import org.apache.samza.task.InitableTask
-import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.task.WindowableTask
import org.apache.samza.task.TaskLifecycleListener
import org.apache.samza.task.StreamTask
-import org.apache.samza.system.SystemConsumers
import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.checkpoint.OffsetManager
-import org.apache.samza.SamzaException
-import scala.collection.JavaConversions._
import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.util.Logging
+
+import scala.collection.JavaConversions._
class TaskInstance(
task: StreamTask,
@@ -49,7 +51,8 @@ class TaskInstance(
storageManager: TaskStorageManager = null,
reporters: Map[String, MetricsReporter] = Map(),
listeners: Seq[TaskLifecycleListener] = Seq(),
- val systemStreamPartitions: Set[SystemStreamPartition] = Set()) extends Logging {
+ val systemStreamPartitions: Set[SystemStreamPartition] = Set(),
+ val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler) extends Logging {
val isInitableTask = task.isInstanceOf[InitableTask]
val isWindowableTask = task.isInstanceOf[WindowableTask]
val isClosableTask = task.isInstanceOf[ClosableTask]
@@ -130,7 +133,9 @@ class TaskInstance(
trace("Processing incoming message envelope for taskName and SSP: %s, %s" format (taskName, envelope.getSystemStreamPartition))
- task.process(envelope, collector, coordinator)
+ exceptionHandler.maybeHandle {
+ task.process(envelope, collector, coordinator)
+ }
listeners.foreach(_.afterProcess(envelope, config, context))
@@ -145,7 +150,9 @@ class TaskInstance(
metrics.windows.inc
- task.asInstanceOf[WindowableTask].window(collector, coordinator)
+ exceptionHandler.maybeHandle {
+ task.asInstanceOf[WindowableTask].window(collector, coordinator)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala
new file mode 100644
index 0000000..99b729f
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import org.apache.samza.config.Config
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.util.Logging
+
+/**
+ * Handles exceptions thrown in a {@link TaskInstance}'s process or window
+ * methods and provides metrics on the number of times ignored exceptions are
+ * thrown. The exceptions to ignore are specified using the
+ * `task.ignored.exceptions` configuration property.
+ *
+ * @param metrics The {@link TaskInstanceMetrics} used to track exception
+ * counts.
+ * @param ignoredExceptions Set of string names of exception classes to ignore
+ * and count. If the set contains the wildcard "*", then all exceptions
+ * are ignored and counted.
+ */
+class TaskInstanceExceptionHandler(
+ val metrics: MetricsHelper = new TaskInstanceMetrics,
+ val ignoredExceptions: Set[String] = Set[String]()) extends Logging {
+
+ val ignoreAll: Boolean = ignoredExceptions.contains("*")
+ var counters: Map[String, Counter] = Map[String, Counter]()
+
+ /**
+ * Takes a code block and handles any exception thrown in the code block.
+ *
+ * @param tryCodeBlock The code block to run and handle exceptions from.
+ */
+ def maybeHandle(tryCodeBlock: => Unit) {
+ try {
+ tryCodeBlock
+ } catch {
+ case e: Exception => handle(e)
+ }
+ }
+
+ /**
+ * Handles an exception. If the exception is in the set of exceptions to
+ * ignore or if the wildcard is used to ignore all exceptions, then the
+ * exception is counted and then ignored. Otherwise, the exception is thrown.
+ *
+ * @param exception The exception to handle.
+ */
+ def handle(exception: Exception) {
+ val className = exception.getClass.getName
+ if (!ignoreAll && !ignoredExceptions.contains(className)) {
+ throw exception
+ }
+
+ debug("Counting exception " + className)
+
+ counters.get(className) match {
+ case Some(counter) => counter.inc()
+ case _ => {
+ val counter = metrics.newCounter("exception-ignored-" + className)
+ counter.inc()
+ counters += className -> counter
+ }
+ }
+ }
+}
+
+object TaskInstanceExceptionHandler {
+ /**
+ * Creates a new TaskInstanceExceptionHandler using the provided
+ * configuration.
+ *
+ * @param metrics The {@link TaskInstanceMetrics} used to track exception
+ * counts.
+ * @param config The configuration to read the list of ignored exceptions
+ * from.
+ */
+ def apply(metrics: MetricsHelper, config: Config) =
+ new TaskInstanceExceptionHandler(
+ metrics = metrics,
+ ignoredExceptions = config.getIgnoredExceptions match {
+ case Some(exceptions) => exceptions.split(",").toSet
+ case _ => Set[String]()
+ })
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index ff425da..ea48853 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -211,4 +211,4 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ShouldMatche
testMetrics.processMs.getSnapshot.getSize should equal(2)
testMetrics.commitMs.getSnapshot.getSize should equal(2)
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index b7a9569..c200601 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -56,7 +56,7 @@ class TestSamzaContainer extends AssertionsForJUnit {
}
}
intercept[Exception] {
- // Calling main will trigger an NPE since the container checks for an
+ // Calling main will trigger an NPE since the container checks for an
// isCompressed environment variable, which isn't set.
SamzaContainer.safeMain(jmxServer)
}
@@ -128,7 +128,7 @@ class TestSamzaContainer extends AssertionsForJUnit {
container.run
fail("Expected exception to be thrown in run method.")
} catch {
- case e: Exception => // Expected
+ case e: Exception => // Expected
}
assertTrue(task.wasShutdown)
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index c31a74e..11eab16 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -19,11 +19,16 @@
package org.apache.samza.container
-import org.junit.Assert._
-import org.junit.Test
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.samza.SamzaException
import org.apache.samza.Partition
import org.apache.samza.checkpoint.OffsetManager
+import org.apache.samza.config.Config
import org.apache.samza.config.MapConfig
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.Metric
+import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.serializers.SerdeManager
import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.SystemConsumer
@@ -40,6 +45,10 @@ import org.apache.samza.task.ReadableCoordinator
import org.apache.samza.task.StreamTask
import org.apache.samza.task.TaskCoordinator
import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.task.WindowableTask
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.Assertions.intercept
import scala.collection.JavaConversions._
@@ -81,4 +90,154 @@ class TestTaskInstance {
assertTrue(lastProcessedOffset.isDefined)
assertEquals("2", lastProcessedOffset.get)
}
+
+ /**
+ * Mock exception used to test exception counts metrics.
+ */
+ class TroublesomeException extends RuntimeException {
+ }
+
+ /**
+ * Mock exception used to test exception counts metrics.
+ */
+ class NonFatalException extends RuntimeException {
+ }
+
+ /**
+ * Mock exception used to test exception counts metrics.
+ */
+ class FatalException extends RuntimeException {
+ }
+
+ /**
+ * Task used to test exception counts metrics.
+ */
+ class TroublesomeTask extends StreamTask with WindowableTask {
+ def process(
+ envelope: IncomingMessageEnvelope,
+ collector: MessageCollector,
+ coordinator: TaskCoordinator) {
+
+ envelope.getOffset().toInt match {
+ case offset if offset % 2 == 0 => throw new TroublesomeException
+ case _ => throw new NonFatalException
+ }
+ }
+
+ def window(collector: MessageCollector, coordinator: TaskCoordinator) {
+ throw new FatalException
+ }
+ }
+
+ /*
+ * Helper method used to retrieve the value of a counter from a group.
+ */
+ private def getCount(
+ group: ConcurrentHashMap[String, Metric],
+ name: String): Long = {
+ group.get("exception-ignored-" + name.toLowerCase).asInstanceOf[Counter].getCount
+ }
+
+ /**
+ * Test task instance exception metrics with two ignored exceptions and one
+ * exception not ignored.
+ */
+ @Test
+ def testExceptionCounts {
+ val task = new TroublesomeTask
+ val ignoredExceptions = classOf[TroublesomeException].getName + "," +
+ classOf[NonFatalException].getName
+ val config = new MapConfig(Map[String, String](
+ "task.ignored.exceptions" -> ignoredExceptions))
+
+ val partition = new Partition(0)
+ val consumerMultiplexer = new SystemConsumers(
+ new RoundRobinChooser,
+ Map[String, SystemConsumer]())
+ val producerMultiplexer = new SystemProducers(
+ Map[String, SystemProducer](),
+ new SerdeManager)
+ val systemStream = new SystemStream("test-system", "test-stream")
+ val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ // Pretend our last checkpointed (next) offset was 2.
+ val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+ val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
+ val taskName = new TaskName("taskName")
+ val collector = new TaskInstanceCollector(producerMultiplexer)
+
+ val registry = new MetricsRegistryMap
+ val taskMetrics = new TaskInstanceMetrics(registry = registry)
+ val taskInstance: TaskInstance = new TaskInstance(
+ task,
+ taskName,
+ config,
+ taskMetrics,
+ consumerMultiplexer,
+ collector,
+ offsetManager,
+ exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config))
+
+ val coordinator = new ReadableCoordinator(taskName)
+ taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "1", null, null), coordinator)
+ taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), coordinator)
+ taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "3", null, null), coordinator)
+
+ val group = registry.getGroup(taskMetrics.group)
+ assertEquals(1L, getCount(group, classOf[TroublesomeException].getName))
+ assertEquals(2L, getCount(group, classOf[NonFatalException].getName))
+
+ intercept[FatalException] {
+ taskInstance.window(coordinator)
+ }
+ assertFalse(group.contains(classOf[FatalException].getName.toLowerCase))
+ }
+
+ /**
+ * Test task instance exception metrics with all exception ignored using a
+ * wildcard.
+ */
+ @Test
+ def testIgnoreAllExceptions {
+ val task = new TroublesomeTask
+ val config = new MapConfig(Map[String, String](
+ "task.ignored.exceptions" -> "*"))
+
+ val partition = new Partition(0)
+ val consumerMultiplexer = new SystemConsumers(
+ new RoundRobinChooser,
+ Map[String, SystemConsumer]())
+ val producerMultiplexer = new SystemProducers(
+ Map[String, SystemProducer](),
+ new SerdeManager)
+ val systemStream = new SystemStream("test-system", "test-stream")
+ val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ // Pretend our last checkpointed (next) offset was 2.
+ val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+ val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
+ val taskName = new TaskName("taskName")
+ val collector = new TaskInstanceCollector(producerMultiplexer)
+
+ val registry = new MetricsRegistryMap
+ val taskMetrics = new TaskInstanceMetrics(registry = registry)
+ val taskInstance: TaskInstance = new TaskInstance(
+ task,
+ taskName,
+ config,
+ taskMetrics,
+ consumerMultiplexer,
+ collector,
+ offsetManager,
+ exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config))
+
+ val coordinator = new ReadableCoordinator(taskName)
+ taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "1", null, null), coordinator)
+ taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), coordinator)
+ taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "3", null, null), coordinator)
+ taskInstance.window(coordinator)
+
+ val group = registry.getGroup(taskMetrics.group)
+ assertEquals(1L, getCount(group, classOf[TroublesomeException].getName))
+ assertEquals(2L, getCount(group, classOf[NonFatalException].getName))
+ assertEquals(1L, getCount(group, classOf[FatalException].getName))
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala b/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala
index 0a62bd0..bd3e5fe 100644
--- a/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala
@@ -25,7 +25,7 @@ import org.apache.samza.container.SamzaContainerMetrics
class TestMetricsHelper {
@Test
- def testMetricsHelperGroupShouldBePAckageName {
+ def testMetricsHelperGroupShouldBePackageName {
assertEquals(classOf[SamzaContainerMetrics].getName, new SamzaContainerMetrics().group)
}
}