You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2014/07/14 18:44:21 UTC

git commit: SAMZA-59: support dropping messages on Serde failures

Repository: incubator-samza
Updated Branches:
  refs/heads/master 7d1a01c7b -> e0ce7ec00


SAMZA-59: support dropping messages on Serde failures


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/e0ce7ec0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/e0ce7ec0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/e0ce7ec0

Branch: refs/heads/master
Commit: e0ce7ec00aa3aa5caae7ac8408bc7491b68cb438
Parents: 7d1a01c
Author: Yan Fang <ya...@gmail.com>
Authored: Mon Jul 14 09:41:55 2014 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Mon Jul 14 09:41:55 2014 -0700

----------------------------------------------------------------------
 .../0.7.0/jobs/configuration-table.html         | 20 ++++++
 .../org/apache/samza/config/TaskConfig.scala    |  6 ++
 .../apache/samza/container/SamzaContainer.scala | 21 ++++--
 .../apache/samza/system/SystemConsumers.scala   | 24 ++++++-
 .../samza/system/SystemConsumersMetrics.scala   |  1 +
 .../apache/samza/system/SystemProducers.scala   | 25 ++++++-
 .../samza/system/SystemProducersMetrics.scala   |  1 +
 .../samza/system/TestSystemConsumers.scala      | 74 ++++++++++++++++++--
 .../samza/system/TestSystemProducers.scala      | 69 ++++++++++++++++++
 9 files changed, 227 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/docs/learn/documentation/0.7.0/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/jobs/configuration-table.html b/docs/learn/documentation/0.7.0/jobs/configuration-table.html
index 07cb83f..edcb74f 100644
--- a/docs/learn/documentation/0.7.0/jobs/configuration-table.html
+++ b/docs/learn/documentation/0.7.0/jobs/configuration-table.html
@@ -357,6 +357,26 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="task-drop-deserialization-errors">task.drop.deserialization.errors</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        This property is to define how the system deals with deserialization failure situation. If set to true, the system will
+                        skip the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default 
+                        is false.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="task-drop-serialization-errors">task.drop.serialization.errors</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        This property is to define how the system deals with serialization failure situation. If set to true, the system will
+                        drop the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default 
+                        is false.
+                    </td>
+                </tr>
+
+                <tr>
                     <th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th>
                 </tr>
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 18a9510..8b881f2 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -33,6 +33,8 @@ object TaskConfig {
   val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" // task.lifecycle.listener.li-generator.class
   val CHECKPOINT_MANAGER_FACTORY = "task.checkpoint.factory" // class name to use when sending offset checkpoints
   val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class"
+  val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails
+  val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
 
   implicit def Config2Task(config: Config) = new TaskConfig(config)
 }
@@ -70,4 +72,8 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) {
   def getCheckpointManagerFactory() = getOption(TaskConfig.CHECKPOINT_MANAGER_FACTORY)
 
   def getMessageChooserClass = getOption(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME)
+
+  def getDropDeserialization = getOption(TaskConfig.DROP_DESERIALIZATION_ERROR)
+
+  def getDropSerialization = getOption(TaskConfig.DROP_SERIALIZATION_ERROR)
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 356adbb..b303615 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -87,7 +87,7 @@ object SamzaContainer extends Logging {
 
   def apply(containerName: String, inputStreams: Set[SystemStreamPartition], config: Config) = {
     val containerPID = Util.getContainerPID
-    
+
     info("Setting up Samza container: %s" format containerName)
     info("Samza container PID: %s" format containerPID)
     info("Using streams and partitions: %s" format inputStreams)
@@ -282,17 +282,29 @@ object SamzaContainer extends Logging {
 
     info("Got offset manager: %s" format offsetManager)
 
+    val dropDeserializationError: Boolean = config.getDropDeserialization match {
+      case Some(dropError) => dropError.toBoolean
+      case _ => false
+    }
+
+    val dropSerializationError: Boolean = config.getDropSerialization match {
+      case Some(dropError) => dropError.toBoolean
+      case _ => false
+    }
+
     val consumerMultiplexer = new SystemConsumers(
       // TODO add config values for no new message timeout and max msgs per stream partition
       chooser = chooser,
       consumers = consumers,
       serdeManager = serdeManager,
-      metrics = systemConsumersMetrics)
+      metrics = systemConsumersMetrics,
+      dropDeserializationError = dropDeserializationError)
 
     val producerMultiplexer = new SystemProducers(
       producers = producers,
       serdeManager = serdeManager,
-      metrics = systemProducersMetrics)
+      metrics = systemProducersMetrics,
+      dropSerializationError = dropSerializationError)
 
     val listeners = config.getLifecycleListeners match {
       case Some(listeners) => {
@@ -438,8 +450,7 @@ object SamzaContainer extends Logging {
       consumerMultiplexer = consumerMultiplexer,
       metrics = samzaContainerMetrics,
       windowMs = taskWindowMs,
-      commitMs = taskCommitMs
-    )
+      commitMs = taskCommitMs)
 
     info("Samza container setup complete.")
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index b537046..9eb70f2 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -70,7 +70,14 @@ class SystemConsumers(
    * thread will sit in a tight loop polling every SystemConsumer over and
    * over again if no new messages are available.
    */
-  noNewMessagesTimeout: Long = 10) extends Logging {
+  noNewMessagesTimeout: Long = 10,
+
+  /**
+   * This parameter is to define how to deal with deserialization failure. If set to true,
+   * the task will skip the messages when deserialization fails. If set to false, the task
+   * will throw SamzaException and fail the container.
+   */
+  dropDeserializationError: Boolean = false) extends Logging {
 
   /**
    * The buffer where SystemConsumers stores all incoming message envelopes.
@@ -242,7 +249,20 @@ class SystemConsumers(
     incomingEnvelopes.foreach(envelope => {
       val systemStreamPartition = envelope.getSystemStreamPartition
 
-      buffer.update(serdeManager.fromBytes(envelope))
+      val messageEnvelope = try {
+        Some(serdeManager.fromBytes(envelope))
+      } catch {
+        case e: Exception if !dropDeserializationError => throw new SystemConsumersException("can not deserialize the message", e)
+        case ex: Throwable => {
+          debug("Deserialization fails: %s . Drop the error message" format ex)
+          metrics.deserializationError.inc
+          None
+        }
+      }
+
+      if (!messageEnvelope.isEmpty) {
+        buffer.update(messageEnvelope.get)
+      }
 
       debug("Got message for: %s, %s" format (systemStreamPartition, envelope))
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
index d632314..b065ae6 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
@@ -27,6 +27,7 @@ import org.apache.samza.metrics.MetricsHelper
 class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
   val choseNull = newCounter("chose-null")
   val choseObject = newCounter("chose-object")
+  val deserializationError = newCounter("deserialization error")
   val systemPolls = scala.collection.mutable.Map[String, Counter]()
   val systemStreamPartitionFetchesPerPoll = scala.collection.mutable.Map[String, Counter]()
   val systemMessagesPerPoll = scala.collection.mutable.Map[String, Counter]()

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
index 8fb36b3..928b47e 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
@@ -21,11 +21,19 @@ package org.apache.samza.system
 
 import org.apache.samza.serializers.SerdeManager
 import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
 
 class SystemProducers(
   producers: Map[String, SystemProducer],
   serdeManager: SerdeManager,
-  metrics: SystemProducersMetrics = new SystemProducersMetrics) extends Logging {
+  metrics: SystemProducersMetrics = new SystemProducersMetrics,
+
+  /**
+   * If set to true, Samza will drop the messages that have serialization errors
+   * and keep running. If set to false, Samza will throw the SamzaException
+   * to fail the container. Default is false.
+   */
+  dropSerializationError: Boolean = false) extends Logging {
 
   def start {
     debug("Starting producers.")
@@ -62,6 +70,19 @@ class SystemProducers(
     metrics.sends.inc
     metrics.sourceSends(source).inc
 
-    producers(envelope.getSystemStream.getSystem).send(source, serdeManager.toBytes(envelope))
+    val bytesEnvelope = try {
+      Some(serdeManager.toBytes(envelope))
+    } catch {
+      case e: Exception if !dropSerializationError => throw new SamzaException("can not serialize the message", e)
+      case ex: Throwable => {
+        debug("Serialization fails: %s . Drop the error message" format ex)
+        metrics.serializationError.inc
+        None
+      }
+    }
+
+    if (!bytesEnvelope.isEmpty) {
+      producers(envelope.getSystemStream.getSystem).send(source, bytesEnvelope.get)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala
index 594dd51..49d9aef 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala
@@ -29,6 +29,7 @@ class SystemProducersMetrics(val registry: MetricsRegistry = new MetricsRegistry
   val sends = newCounter("sends")
   val sourceFlushes = scala.collection.mutable.Map[String, Counter]()
   val sourceSends = scala.collection.mutable.Map[String, Counter]()
+  val serializationError = newCounter("serialization error")
 
   def registerSource(source: String) {
     sourceFlushes += source -> newCounter("%s-flushes" format source)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index e1a4c15..97e65eb 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -24,6 +24,9 @@ import org.apache.samza.Partition
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.system.chooser.MessageChooser
+import org.apache.samza.system.chooser.DefaultChooser
+import org.apache.samza.util.BlockingEnvelopeMap
+import org.apache.samza.serializers._
 
 class TestSystemConsumers {
   @Test
@@ -68,7 +71,7 @@ class TestSystemConsumers {
     var started = 0
     var stopped = 0
     var registered = Map[SystemStreamPartition, String]()
-    
+
     val consumer = Map(system -> new SystemConsumer {
       def start {}
       def stop {}
@@ -82,15 +85,76 @@ class TestSystemConsumers {
       def stop = stopped += 1
       def register(systemStreamPartition: SystemStreamPartition, offset: String) = registered += systemStreamPartition -> offset
     }, consumer, null)
-     
+
     // it should throw a SystemConsumersException because system2 does not have a consumer
     var caughtRightException = false
     try {
-    	consumers.register(systemStreamPartition2, "0")
+      consumers.register(systemStreamPartition2, "0")
     } catch {
-    	case e: SystemConsumersException => caughtRightException = true
-    	case _: Throwable => caughtRightException = false
+      case e: SystemConsumersException => caughtRightException = true
+      case _: Throwable => caughtRightException = false
     }
     assertTrue("suppose to throw SystemConsumersException, but apparently it did not", caughtRightException)
   }
+
+  @Test
+  def testDroppingMsgOrThrowExceptionWhenSerdeFails() {
+    val system = "test-system"
+    val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1))
+    val msgChooser = new DefaultChooser
+    val consumer = Map(system -> new SimpleConsumer)
+    val systemMessageSerdes = Map(system -> (new StringSerde("UTF-8")).asInstanceOf[Serde[Object]]);
+    val serdeManager = new SerdeManager(systemMessageSerdes = systemMessageSerdes)
+
+    // it should throw exceptions when the deserialization has error
+    val consumers = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = false)
+    consumers.register(systemStreamPartition, "0")
+    consumers.start
+    consumer(system).putBytesMessage
+    consumer(system).putStringMessage
+
+    var caughtRightException = false
+    try {
+      consumers.choose
+    } catch {
+      case e: SystemConsumersException => caughtRightException = true
+      case _: Throwable => caughtRightException = false
+    }
+    assertTrue("suppose to throw SystemConsumersException", caughtRightException);
+    consumers.stop
+
+    // it should not throw exceptions when deserializaion fails if dropDeserializationError is set to true
+    val consumers2 = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = true)
+    consumers2.register(systemStreamPartition, "0")
+    consumers2.start
+    consumer(system).putBytesMessage
+    consumer(system).putStringMessage
+
+    var notThrowException = true;
+    try {
+      consumers2.choose
+    } catch {
+      case e: Throwable => notThrowException = false
+    }
+
+    assertTrue("it should not throw any exception", notThrowException)
+    consumers2.stop
+  }
+
+  /**
+   * a simple consumer that provides two extra methods, one is to put bytes format message
+   * and the other to put string format message
+   */
+  private class SimpleConsumer extends BlockingEnvelopeMap {
+    val systemStreamPartition = new SystemStreamPartition("test-system", "some-stream", new Partition(1))
+    def putBytesMessage {
+      put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "0", "0", "test".getBytes()))
+    }
+    def putStringMessage {
+      put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "0", "1", "test"))
+    }
+    def start {}
+    def stop {}
+    def register { super.register(systemStreamPartition, "0") }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e0ce7ec0/samza-core/src/test/scala/org/apache/samza/system/TestSystemProducers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemProducers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemProducers.scala
new file mode 100644
index 0000000..45da7b6
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemProducers.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.serializers._
+import org.apache.samza.SamzaException
+
+class TestSystemProducers {
+
+  @Test
+  def testDroppingMsgOrThrowExceptionWhenSerdeFails() {
+    val system = "test-system"
+    val systemStream = new SystemStream(system, "stream1")
+    val systemMessageSerdes = Map(system -> (new StringSerde("UTF-8")).asInstanceOf[Serde[Object]]);
+    val serdeManager = new SerdeManager(systemMessageSerdes = systemMessageSerdes)
+    val systemProducer = new SystemProducer {
+      def start {}
+      def stop {}
+      def register(source: String) {}
+      def send(source: String, envelope: OutgoingMessageEnvelope) {}
+      def flush(source: String) {}
+    }
+    val systemProducers = new SystemProducers(Map(system -> systemProducer), serdeManager, new SystemProducersMetrics, false)
+    systemProducers.register(system)
+    val outgoingEnvelopeCorrectMsg = new OutgoingMessageEnvelope (systemStream, "test")
+    val outgoingEnvelopeErrorMsg = new OutgoingMessageEnvelope (systemStream, 123)
+    systemProducers.send(system, outgoingEnvelopeCorrectMsg)
+
+    var getCorrectException = false
+    try {
+      systemProducers.send(system, outgoingEnvelopeErrorMsg)
+    } catch {
+      case e: SamzaException => getCorrectException = true
+      case _: Throwable => getCorrectException = false
+    }
+    assertTrue(getCorrectException)
+
+    val systemProducers2 = new SystemProducers(Map(system -> systemProducer), serdeManager, new SystemProducersMetrics, true)
+    systemProducers2.register(system)
+    systemProducers2.send(system, outgoingEnvelopeCorrectMsg)
+
+    var notThrowException = true
+    try {
+      systemProducers2.send(system, outgoingEnvelopeErrorMsg)
+    } catch {
+      case _: Throwable => notThrowException = false
+    }
+    assertTrue(notThrowException)
+  }
+}
\ No newline at end of file