You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2014/07/14 18:44:21 UTC
git commit: SAMZA-59: support dropping messages on Serde failures
Repository: incubator-samza
Updated Branches:
refs/heads/master 7d1a01c7b -> e0ce7ec00
SAMZA-59: support dropping messages on Serde failures
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/e0ce7ec0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/e0ce7ec0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/e0ce7ec0
Branch: refs/heads/master
Commit: e0ce7ec00aa3aa5caae7ac8408bc7491b68cb438
Parents: 7d1a01c
Author: Yan Fang <ya...@gmail.com>
Authored: Mon Jul 14 09:41:55 2014 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Mon Jul 14 09:41:55 2014 -0700
----------------------------------------------------------------------
.../0.7.0/jobs/configuration-table.html | 20 ++++++
.../org/apache/samza/config/TaskConfig.scala | 6 ++
.../apache/samza/container/SamzaContainer.scala | 21 ++++--
.../apache/samza/system/SystemConsumers.scala | 24 ++++++-
.../samza/system/SystemConsumersMetrics.scala | 1 +
.../apache/samza/system/SystemProducers.scala | 25 ++++++-
.../samza/system/SystemProducersMetrics.scala | 1 +
.../samza/system/TestSystemConsumers.scala | 74 ++++++++++++++++++--
.../samza/system/TestSystemProducers.scala | 69 ++++++++++++++++++
9 files changed, 227 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/docs/learn/documentation/0.7.0/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/jobs/configuration-table.html b/docs/learn/documentation/0.7.0/jobs/configuration-table.html
index 07cb83f..edcb74f 100644
--- a/docs/learn/documentation/0.7.0/jobs/configuration-table.html
+++ b/docs/learn/documentation/0.7.0/jobs/configuration-table.html
@@ -357,6 +357,26 @@
</tr>
<tr>
+ <td class="property" id="task-drop-deserialization-errors">task.drop.deserialization.errors</td>
+ <td class="default"></td>
+ <td class="description">
+ This property is to define how the system deals with deserialization failure situation. If set to true, the system will
+ skip the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default
+ is false.
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property" id="task-drop-serialization-errors">task.drop.serialization.errors</td>
+ <td class="default"></td>
+ <td class="description">
+ This property is to define how the system deals with serialization failure situation. If set to true, the system will
+ drop the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default
+ is false.
+ </td>
+ </tr>
+
+ <tr>
<th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th>
</tr>
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 18a9510..8b881f2 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -33,6 +33,8 @@ object TaskConfig {
val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" // task.lifecycle.listener.li-generator.class
val CHECKPOINT_MANAGER_FACTORY = "task.checkpoint.factory" // class name to use when sending offset checkpoints
val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class"
+ val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails
+ val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
implicit def Config2Task(config: Config) = new TaskConfig(config)
}
@@ -70,4 +72,8 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) {
def getCheckpointManagerFactory() = getOption(TaskConfig.CHECKPOINT_MANAGER_FACTORY)
def getMessageChooserClass = getOption(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME)
+
+ def getDropDeserialization = getOption(TaskConfig.DROP_DESERIALIZATION_ERROR)
+
+ def getDropSerialization = getOption(TaskConfig.DROP_SERIALIZATION_ERROR)
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 356adbb..b303615 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -87,7 +87,7 @@ object SamzaContainer extends Logging {
def apply(containerName: String, inputStreams: Set[SystemStreamPartition], config: Config) = {
val containerPID = Util.getContainerPID
-
+
info("Setting up Samza container: %s" format containerName)
info("Samza container PID: %s" format containerPID)
info("Using streams and partitions: %s" format inputStreams)
@@ -282,17 +282,29 @@ object SamzaContainer extends Logging {
info("Got offset manager: %s" format offsetManager)
+ val dropDeserializationError: Boolean = config.getDropDeserialization match {
+ case Some(dropError) => dropError.toBoolean
+ case _ => false
+ }
+
+ val dropSerializationError: Boolean = config.getDropSerialization match {
+ case Some(dropError) => dropError.toBoolean
+ case _ => false
+ }
+
val consumerMultiplexer = new SystemConsumers(
// TODO add config values for no new message timeout and max msgs per stream partition
chooser = chooser,
consumers = consumers,
serdeManager = serdeManager,
- metrics = systemConsumersMetrics)
+ metrics = systemConsumersMetrics,
+ dropDeserializationError = dropDeserializationError)
val producerMultiplexer = new SystemProducers(
producers = producers,
serdeManager = serdeManager,
- metrics = systemProducersMetrics)
+ metrics = systemProducersMetrics,
+ dropSerializationError = dropSerializationError)
val listeners = config.getLifecycleListeners match {
case Some(listeners) => {
@@ -438,8 +450,7 @@ object SamzaContainer extends Logging {
consumerMultiplexer = consumerMultiplexer,
metrics = samzaContainerMetrics,
windowMs = taskWindowMs,
- commitMs = taskCommitMs
- )
+ commitMs = taskCommitMs)
info("Samza container setup complete.")
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index b537046..9eb70f2 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -70,7 +70,14 @@ class SystemConsumers(
* thread will sit in a tight loop polling every SystemConsumer over and
* over again if no new messages are available.
*/
- noNewMessagesTimeout: Long = 10) extends Logging {
+ noNewMessagesTimeout: Long = 10,
+
+ /**
+ * This parameter is to define how to deal with deserialization failure. If set to true,
+ * the task will skip the messages when deserialization fails. If set to false, the task
+ * will throw SamzaException and fail the container.
+ */
+ dropDeserializationError: Boolean = false) extends Logging {
/**
* The buffer where SystemConsumers stores all incoming message envelopes.
@@ -242,7 +249,20 @@ class SystemConsumers(
incomingEnvelopes.foreach(envelope => {
val systemStreamPartition = envelope.getSystemStreamPartition
- buffer.update(serdeManager.fromBytes(envelope))
+ val messageEnvelope = try {
+ Some(serdeManager.fromBytes(envelope))
+ } catch {
+ case e: Exception if !dropDeserializationError => throw new SystemConsumersException("can not deserialize the message", e)
+ case ex: Throwable => {
+ debug("Deserialization fails: %s . Drop the error message" format ex)
+ metrics.deserializationError.inc
+ None
+ }
+ }
+
+ if (!messageEnvelope.isEmpty) {
+ buffer.update(messageEnvelope.get)
+ }
debug("Got message for: %s, %s" format (systemStreamPartition, envelope))
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
index d632314..b065ae6 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
@@ -27,6 +27,7 @@ import org.apache.samza.metrics.MetricsHelper
class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
val choseNull = newCounter("chose-null")
val choseObject = newCounter("chose-object")
+ val deserializationError = newCounter("deserialization error")
val systemPolls = scala.collection.mutable.Map[String, Counter]()
val systemStreamPartitionFetchesPerPoll = scala.collection.mutable.Map[String, Counter]()
val systemMessagesPerPoll = scala.collection.mutable.Map[String, Counter]()
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
index 8fb36b3..928b47e 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
@@ -21,11 +21,19 @@ package org.apache.samza.system
import org.apache.samza.serializers.SerdeManager
import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
class SystemProducers(
producers: Map[String, SystemProducer],
serdeManager: SerdeManager,
- metrics: SystemProducersMetrics = new SystemProducersMetrics) extends Logging {
+ metrics: SystemProducersMetrics = new SystemProducersMetrics,
+
+ /**
+ * If set to true, Samza will drop the messages that have serialization errors
+ * and keep running. If set to false, Samza will throw the SamzaException
+ * to fail the container. Default is false.
+ */
+ dropSerializationError: Boolean = false) extends Logging {
def start {
debug("Starting producers.")
@@ -62,6 +70,19 @@ class SystemProducers(
metrics.sends.inc
metrics.sourceSends(source).inc
- producers(envelope.getSystemStream.getSystem).send(source, serdeManager.toBytes(envelope))
+ val bytesEnvelope = try {
+ Some(serdeManager.toBytes(envelope))
+ } catch {
+ case e: Exception if !dropSerializationError => throw new SamzaException("can not serialize the message", e)
+ case ex: Throwable => {
+ debug("Serialization fails: %s . Drop the error message" format ex)
+ metrics.serializationError.inc
+ None
+ }
+ }
+
+ if (!bytesEnvelope.isEmpty) {
+ producers(envelope.getSystemStream.getSystem).send(source, bytesEnvelope.get)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala
index 594dd51..49d9aef 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala
@@ -29,6 +29,7 @@ class SystemProducersMetrics(val registry: MetricsRegistry = new MetricsRegistry
val sends = newCounter("sends")
val sourceFlushes = scala.collection.mutable.Map[String, Counter]()
val sourceSends = scala.collection.mutable.Map[String, Counter]()
+ val serializationError = newCounter("serialization error")
def registerSource(source: String) {
sourceFlushes += source -> newCounter("%s-flushes" format source)
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index e1a4c15..97e65eb 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -24,6 +24,9 @@ import org.apache.samza.Partition
import org.junit.Assert._
import org.junit.Test
import org.apache.samza.system.chooser.MessageChooser
+import org.apache.samza.system.chooser.DefaultChooser
+import org.apache.samza.util.BlockingEnvelopeMap
+import org.apache.samza.serializers._
class TestSystemConsumers {
@Test
@@ -68,7 +71,7 @@ class TestSystemConsumers {
var started = 0
var stopped = 0
var registered = Map[SystemStreamPartition, String]()
-
+
val consumer = Map(system -> new SystemConsumer {
def start {}
def stop {}
@@ -82,15 +85,76 @@ class TestSystemConsumers {
def stop = stopped += 1
def register(systemStreamPartition: SystemStreamPartition, offset: String) = registered += systemStreamPartition -> offset
}, consumer, null)
-
+
// it should throw a SystemConsumersException because system2 does not have a consumer
var caughtRightException = false
try {
- consumers.register(systemStreamPartition2, "0")
+ consumers.register(systemStreamPartition2, "0")
} catch {
- case e: SystemConsumersException => caughtRightException = true
- case _: Throwable => caughtRightException = false
+ case e: SystemConsumersException => caughtRightException = true
+ case _: Throwable => caughtRightException = false
}
assertTrue("suppose to throw SystemConsumersException, but apparently it did not", caughtRightException)
}
+
+ @Test
+ def testDroppingMsgOrThrowExceptionWhenSerdeFails() {
+ val system = "test-system"
+ val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1))
+ val msgChooser = new DefaultChooser
+ val consumer = Map(system -> new SimpleConsumer)
+ val systemMessageSerdes = Map(system -> (new StringSerde("UTF-8")).asInstanceOf[Serde[Object]]);
+ val serdeManager = new SerdeManager(systemMessageSerdes = systemMessageSerdes)
+
+ // it should throw exceptions when the deserialization has error
+ val consumers = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = false)
+ consumers.register(systemStreamPartition, "0")
+ consumers.start
+ consumer(system).putBytesMessage
+ consumer(system).putStringMessage
+
+ var caughtRightException = false
+ try {
+ consumers.choose
+ } catch {
+ case e: SystemConsumersException => caughtRightException = true
+ case _: Throwable => caughtRightException = false
+ }
+ assertTrue("suppose to throw SystemConsumersException", caughtRightException);
+ consumers.stop
+
+ // it should not throw exceptions when deserializaion fails if dropDeserializationError is set to true
+ val consumers2 = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = true)
+ consumers2.register(systemStreamPartition, "0")
+ consumers2.start
+ consumer(system).putBytesMessage
+ consumer(system).putStringMessage
+
+ var notThrowException = true;
+ try {
+ consumers2.choose
+ } catch {
+ case e: Throwable => notThrowException = false
+ }
+
+ assertTrue("it should not throw any exception", notThrowException)
+ consumers2.stop
+ }
+
+ /**
+ * a simple consumer that provides two extra methods, one is to put bytes format message
+ * and the other to put string format message
+ */
+ private class SimpleConsumer extends BlockingEnvelopeMap {
+ val systemStreamPartition = new SystemStreamPartition("test-system", "some-stream", new Partition(1))
+ def putBytesMessage {
+ put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "0", "0", "test".getBytes()))
+ }
+ def putStringMessage {
+ put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "0", "1", "test"))
+ }
+ def start {}
+ def stop {}
+ def register { super.register(systemStreamPartition, "0") }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/test/scala/org/apache/samza/system/TestSystemProducers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemProducers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemProducers.scala
new file mode 100644
index 0000000..45da7b6
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemProducers.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.serializers._
+import org.apache.samza.SamzaException
+
+class TestSystemProducers {
+
+ @Test
+ def testDroppingMsgOrThrowExceptionWhenSerdeFails() {
+ val system = "test-system"
+ val systemStream = new SystemStream(system, "stream1")
+ val systemMessageSerdes = Map(system -> (new StringSerde("UTF-8")).asInstanceOf[Serde[Object]]);
+ val serdeManager = new SerdeManager(systemMessageSerdes = systemMessageSerdes)
+ val systemProducer = new SystemProducer {
+ def start {}
+ def stop {}
+ def register(source: String) {}
+ def send(source: String, envelope: OutgoingMessageEnvelope) {}
+ def flush(source: String) {}
+ }
+ val systemProducers = new SystemProducers(Map(system -> systemProducer), serdeManager, new SystemProducersMetrics, false)
+ systemProducers.register(system)
+ val outgoingEnvelopeCorrectMsg = new OutgoingMessageEnvelope (systemStream, "test")
+ val outgoingEnvelopeErrorMsg = new OutgoingMessageEnvelope (systemStream, 123)
+ systemProducers.send(system, outgoingEnvelopeCorrectMsg)
+
+ var getCorrectException = false
+ try {
+ systemProducers.send(system, outgoingEnvelopeErrorMsg)
+ } catch {
+ case e: SamzaException => getCorrectException = true
+ case _: Throwable => getCorrectException = false
+ }
+ assertTrue(getCorrectException)
+
+ val systemProducers2 = new SystemProducers(Map(system -> systemProducer), serdeManager, new SystemProducersMetrics, true)
+ systemProducers2.register(system)
+ systemProducers2.send(system, outgoingEnvelopeCorrectMsg)
+
+ var notThrowException = true
+ try {
+ systemProducers2.send(system, outgoingEnvelopeErrorMsg)
+ } catch {
+ case _: Throwable => notThrowException = false
+ }
+ assertTrue(notThrowException)
+ }
+}
\ No newline at end of file