You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/01/18 02:26:48 UTC

samza git commit: SAMZA-1560: Handle key-serde errors in KafkaCheckpointManager

Repository: samza
Updated Branches:
  refs/heads/master b38e6622f -> b7805df10


SAMZA-1560: Handle key-serde errors in KafkaCheckpointManager

Author: Jagadish <jv...@linkedin.com>

Reviewers: Xinyu Liu<xi...@gmail.com>

Closes #408 from vjagadish1989/kcm-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b7805df1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b7805df1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b7805df1

Branch: refs/heads/master
Commit: b7805df10f74c4a2b9bb89e49bbaefb72a8526b5
Parents: b38e662
Author: Jagadish <jv...@linkedin.com>
Authored: Wed Jan 17 18:27:40 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Jan 17 18:27:40 2018 -0800

----------------------------------------------------------------------
 .../kafka/KafkaCheckpointLogKeySerde.java       |  5 ++
 .../kafka/KafkaCheckpointManager.scala          | 51 +++++++++++---------
 .../kafka/TestKafkaCheckpointManagerJava.java   | 43 +++++++++++++++--
 3 files changed, 72 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b7805df1/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
index cc883b6..8e0c815 100644
--- a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
+++ b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
@@ -58,6 +58,11 @@ public class KafkaCheckpointLogKeySerde implements Serde<KafkaCheckpointLogKey>
   public KafkaCheckpointLogKey fromBytes(byte[] bytes) {
     try {
       LinkedHashMap<String, String> deserializedKey = mapper.readValue(bytes, LinkedHashMap.class);
+
+      if (!KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE.equals(deserializedKey.get(TYPE_FIELD))) {
+        throw new IllegalArgumentException(String.format("Invalid key detected. Type of the key is %s", deserializedKey.get(TYPE_FIELD)));
+      }
+
       return new KafkaCheckpointLogKey(deserializedKey.get(TYPE_FIELD), new TaskName(deserializedKey.get(TASK_NAME_FIELD)), deserializedKey.get(SSP_GROUPER_FACTORY_FIELD)
       );
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/samza/blob/b7805df1/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index e1187c5..ca138c7 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -50,7 +50,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
                              validateCheckpoint: Boolean,
                              config: Config,
                              metricsRegistry: MetricsRegistry,
-                             checkpointMsgSerde: Serde[Checkpoint] = new CheckpointSerde) extends CheckpointManager with Logging {
+                             checkpointMsgSerde: Serde[Checkpoint] = new CheckpointSerde,
+                             checkpointKeySerde: Serde[KafkaCheckpointLogKey] = new KafkaCheckpointLogKeySerde) extends CheckpointManager with Logging {
 
   info(s"Creating KafkaCheckpointManager for checkpointTopic:$checkpointTopic, systemName:$checkpointSystem " +
     s"validateCheckpoints:$validateCheckpoint")
@@ -58,7 +59,6 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
   val checkpointSystem: String = checkpointSpec.getSystemName
   val checkpointTopic: String = checkpointSpec.getPhysicalName
   val checkpointSsp = new SystemStreamPartition(checkpointSystem, checkpointTopic, new Partition(0))
-  val checkpointKeySerde = new KafkaCheckpointLogKeySerde
   val expectedGrouperFactory = new JobConfig(config).getSystemStreamPartitionGrouperFactory
 
   val systemProducer = systemFactory.getProducer(checkpointSystem, config, metricsRegistry)
@@ -215,31 +215,38 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
       val checkpointKey = try {
         checkpointKeySerde.fromBytes(keyBytes)
       } catch {
-        case e: Exception => throw new SamzaException(s"Exception while serializing checkpoint-key. " +
-          s"Topic: $checkpointTopic Offset: $offset", e)
-      }
-
-      // If the grouper in the key is not equal to the configured grouper, error out.
-      val actualGrouperFactory = checkpointKey.getGrouperFactoryClassName
-      if (!expectedGrouperFactory.equals(actualGrouperFactory)) {
-        warn(s"Grouper mismatch. Configured: $expectedGrouperFactory Actual: $actualGrouperFactory ")
-        if (validateCheckpoint) {
-          throw new SamzaException("SSPGrouperFactory in the checkpoint topic does not match the configured value" +
-            s"Configured value: $expectedGrouperFactory; Actual value: $actualGrouperFactory Offset: $offset")
+        case e: Exception => if (validateCheckpoint) {
+          throw new SamzaException(s"Exception while serializing checkpoint-key. " +
+            s"Topic: $checkpointTopic Offset: $offset", e)
+        } else {
+          warn(s"Ignoring exception while serializing checkpoint-key. Topic: $checkpointTopic Offset: $offset", e)
+          null
         }
       }
 
-      // If the type of the key is not KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE, it can safely be ignored.
-      if (KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE.equals(checkpointKey.getType)) {
-        val checkpointBytes = checkpointEnvelope.getMessage.asInstanceOf[Array[Byte]]
-        val checkpoint = try {
-          checkpointMsgSerde.fromBytes(checkpointBytes)
-        } catch {
-          case e: Exception => throw new SamzaException(s"Exception while serializing checkpoint-message. " +
-            s"Topic: $checkpointTopic Offset: $offset", e)
+      if (checkpointKey != null) {
+        // If the grouper in the key is not equal to the configured grouper, error out.
+        val actualGrouperFactory = checkpointKey.getGrouperFactoryClassName
+        if (!expectedGrouperFactory.equals(actualGrouperFactory)) {
+          warn(s"Grouper mismatch. Configured: $expectedGrouperFactory Actual: $actualGrouperFactory ")
+          if (validateCheckpoint) {
+            throw new SamzaException("SSPGrouperFactory in the checkpoint topic does not match the configured value" +
+              s"Configured value: $expectedGrouperFactory; Actual value: $actualGrouperFactory Offset: $offset")
+          }
         }
 
-        checkpoints.put(checkpointKey.getTaskName, checkpoint)
+        // If the type of the key is not KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE, it can safely be ignored.
+        if (KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE.equals(checkpointKey.getType)) {
+          val checkpointBytes = checkpointEnvelope.getMessage.asInstanceOf[Array[Byte]]
+          val checkpoint = try {
+            checkpointMsgSerde.fromBytes(checkpointBytes)
+          } catch {
+            case e: Exception => throw new SamzaException(s"Exception while serializing checkpoint-message. " +
+              s"Topic: $checkpointTopic Offset: $offset", e)
+          }
+
+          checkpoints.put(checkpointKey.getTaskName, checkpoint)
+        }
       }
     }
     info(s"Read $numMessagesRead messages from system:$checkpointSystem topic:$checkpointTopic")

http://git-wip-us.apache.org/repos/asf/samza/blob/b7805df1/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
index a2ae94c..8a3a2e1 100644
--- a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
@@ -72,7 +72,7 @@ public class TestKafkaCheckpointManagerJava {
 
     SystemFactory factory = newFactory(mock(SystemProducer.class), mock(SystemConsumer.class), mockAdmin);
     KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
-        true, mock(Config.class), mock(MetricsRegistry.class), null);
+        true, mock(Config.class), mock(MetricsRegistry.class), null, new KafkaCheckpointLogKeySerde());
 
     // expect an exception during startup
     checkpointManager.start();
@@ -90,7 +90,7 @@ public class TestKafkaCheckpointManagerJava {
 
     SystemFactory factory = newFactory(mock(SystemProducer.class), mock(SystemConsumer.class), mockAdmin);
     KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
-        true, mock(Config.class), mock(MetricsRegistry.class), null);
+        true, mock(Config.class), mock(MetricsRegistry.class), null, new KafkaCheckpointLogKeySerde());
 
     // expect an exception during startup
     checkpointManager.start();
@@ -114,7 +114,7 @@ public class TestKafkaCheckpointManagerJava {
 
     // wire up an exception throwing serde with the checkpointmanager
     KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
-        true, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde());
+        true, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde(), new KafkaCheckpointLogKeySerde());
     checkpointManager.register(TASK1);
     checkpointManager.start();
 
@@ -123,6 +123,33 @@ public class TestKafkaCheckpointManagerJava {
   }
 
   @Test
+  public void testReadSucceedsOnKeySerdeExceptionsWhenValidationIsDisabled() throws Exception {
+    KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
+        CHECKPOINT_SYSTEM, 1);
+    Config mockConfig = mock(Config.class);
+    when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);
+
+    // mock out a consumer that returns a single checkpoint IME
+    SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
+    List<List<IncomingMessageEnvelope>> checkpointEnvelopes = ImmutableList.of(
+        ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0")));
+    SystemConsumer mockConsumer = newConsumer(checkpointEnvelopes);
+
+    SystemAdmin mockAdmin = newAdmin("0", "1");
+    SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
+
+    // wire up an exception throwing serde with the checkpointmanager
+    KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
+        false, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde(),
+        new ExceptionThrowingCheckpointKeySerde());
+    checkpointManager.register(TASK1);
+    checkpointManager.start();
+
+    // expect the read to succeed inspite of the exception from ExceptionThrowingSerde
+    checkpointManager.readLastCheckpoint(TASK1);
+  }
+
+  @Test
   public void testCheckpointsAreReadFromOldestOffset() throws Exception {
     KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
         CHECKPOINT_SYSTEM, 1);
@@ -138,7 +165,7 @@ public class TestKafkaCheckpointManagerJava {
     SystemAdmin mockAdmin = newAdmin(oldestOffset, "1");
     SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
     KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
-        true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde());
+        true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde(), new KafkaCheckpointLogKeySerde());
     checkpointManager.register(TASK1);
 
     // 1. verify that consumer.register is called only during checkpointManager.start.
@@ -177,7 +204,7 @@ public class TestKafkaCheckpointManagerJava {
     SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
 
     KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
-        true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde());
+        true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde(), new KafkaCheckpointLogKeySerde());
     checkpointManager.register(TASK1);
     checkpointManager.start();
 
@@ -244,4 +271,10 @@ public class TestKafkaCheckpointManagerJava {
       throw new KafkaException("exception");
     }
   }
+
+  private static class ExceptionThrowingCheckpointKeySerde extends KafkaCheckpointLogKeySerde {
+    public KafkaCheckpointLogKey fromBytes(byte[] bytes) {
+      throw new KafkaException("exception");
+    }
+  }
 }