You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/04/22 21:35:04 UTC

git commit: SAMZA-64; Fail KafkaCheckpointManager on unrecoverable errors

Repository: incubator-samza
Updated Branches:
  refs/heads/master 156d4a4f7 -> 47edbc3a2


SAMZA-64; Fail KafkaCheckpointManager on unrecoverable errors


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/47edbc3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/47edbc3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/47edbc3a

Branch: refs/heads/master
Commit: 47edbc3a28a269269f2ef4494b2f6c89a6c46450
Parents: 156d4a4
Author: Yan Fang <ya...@gmail.com>
Authored: Tue Apr 22 12:31:53 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Apr 22 12:31:53 2014 -0700

----------------------------------------------------------------------
 .../kafka/KafkaCheckpointManager.scala          |  6 +++
 .../kafka/TestKafkaCheckpointManager.scala      | 48 ++++++++++++++++++++
 2 files changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/47edbc3a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index fed6eee..62c91e8 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -28,6 +28,8 @@ import kafka.api.PartitionOffsetRequestInfo
 import kafka.common.ErrorMapping
 import kafka.common.TopicAndPartition
 import kafka.common.TopicExistsException
+import kafka.common.InvalidMessageSizeException
+import kafka.common.UnknownTopicOrPartitionException
 import kafka.consumer.SimpleConsumer
 import kafka.producer.KeyedMessage
 import kafka.producer.Partitioner
@@ -36,6 +38,7 @@ import kafka.serializer.Decoder
 import kafka.serializer.Encoder
 import kafka.utils.Utils
 import kafka.utils.VerifiableProperties
+import kafka.message.InvalidMessageException
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.Checkpoint
@@ -181,6 +184,9 @@ class KafkaCheckpointManager(
 
       (exception, loop) => {
         exception match {
+          case e: InvalidMessageException => throw new KafkaCheckpointException ("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e)
+          case e: InvalidMessageSizeException => throw new KafkaCheckpointException ("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e)
+          case e: UnknownTopicOrPartitionException => throw new KafkaCheckpointException ("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e)
           case e: KafkaCheckpointException => throw e
           case e: Exception =>
             warn("While trying to read last checkpoint for topic %s and partition %s: %s. Retrying." format (checkpointTopic, partition, e))

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/47edbc3a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index f1a8f8a..92ac61e 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -39,8 +39,12 @@ import scala.collection.JavaConversions._
 import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore }
 import org.apache.samza.config.MapConfig
 import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.serializers.CheckpointSerde
 import org.apache.samza.system.SystemStream
 import kafka.utils.ZKStringSerializer
+import kafka.message.InvalidMessageException
+import kafka.common.InvalidMessageSizeException
+import kafka.common.UnknownTopicOrPartitionException
 
 object TestKafkaCheckpointManager {
   val zkConnect: String = TestZKUtils.zookeeperConnect
@@ -129,6 +133,25 @@ class TestKafkaCheckpointManager {
     kcm.stop
   }
 
+  @Test
+  def testUnrecovableKafkaErrorShouldThrowKafkaCheckpointManagerException {
+    val exceptions = List("InvalidMessageException", "InvalidMessageSizeException", "UnknownTopicOrPartitionException")
+    exceptions.foreach { exceptionName =>
+      val kcm = getKafkaCheckpointManagerWithInvalidSerde(exceptionName)
+      kcm.register(partition)
+      kcm.start
+      kcm.writeCheckpoint(partition, cp1)
+      // because serde will throw unrecoverable errors, it should result a KafkaCheckpointException
+      try {
+        val readCpInvalide = kcm.readLastCheckpoint(partition)
+        fail("Expected a KafkaCheckpointException.")
+      } catch {
+        case e: KafkaCheckpointException => None
+      }
+      kcm.stop
+    }
+  }
+
   private def getKafkaCheckpointManager = new KafkaCheckpointManager(
     clientId = "some-client-id",
     checkpointTopic = "checkpoint-topic",
@@ -141,4 +164,29 @@ class TestKafkaCheckpointManager {
     metadataStore = metadataStore,
     connectProducer = () => new Producer[Partition, Array[Byte]](producerConfig),
     connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer))
+
+  // inject serde. Kafka exceptions will be thrown when serde.fromBytes is called
+  private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = new KafkaCheckpointManager(
+    clientId = "some-client-id-invalid-serde",
+    checkpointTopic = "checkpoint-topic-invalid-serde",
+    systemName = "kafka",
+    totalPartitions = 1,
+    replicationFactor = 3,
+    socketTimeout = 30000,
+    bufferSize = 64 * 1024,
+    fetchSize = 300 * 1024,
+    metadataStore = metadataStore,
+    connectProducer = () => new Producer[Partition, Array[Byte]](producerConfig),
+    connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
+    serde = new InvalideSerde(exception))
+
+  class InvalideSerde(exception: String) extends CheckpointSerde {
+    override def fromBytes(bytes: Array[Byte]): Checkpoint = {
+      exception match {
+        case "InvalidMessageException" => throw new InvalidMessageException
+        case "InvalidMessageSizeException" => throw new InvalidMessageSizeException
+        case "UnknownTopicOrPartitionException" => throw new UnknownTopicOrPartitionException
+      }
+    }
+  }
 }