You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2021/05/19 22:47:21 UTC

[samza] branch master updated: KafkaChangelogKeySerde should support deserializing multiple keys (#1502)

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 08931ce  KafkaChangelogKeySerde should support deserializing multiple keys (#1502)
08931ce is described below

commit 08931cedea99f35546f4b76923bfe6840bf2144a
Author: Daniel Chen <dc...@linkedin.com>
AuthorDate: Wed May 19 15:47:11 2021 -0700

    KafkaChangelogKeySerde should support deserializing multiple keys (#1502)
---
 .../checkpoint/kafka/KafkaCheckpointLogKey.java    |  3 -
 .../kafka/KafkaCheckpointLogKeySerde.java          |  4 --
 .../checkpoint/kafka/KafkaCheckpointManager.scala  | 33 ++++++-----
 .../kafka/TestKafkaCheckpointLogKeySerde.java      | 12 ++++
 .../kafka/TestKafkaCheckpointManager.scala         | 68 ++++++++++++++++++++--
 5 files changed, 95 insertions(+), 25 deletions(-)

diff --git a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
index 05114f9..a732aba 100644
--- a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
+++ b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
@@ -48,9 +48,6 @@ public class KafkaCheckpointLogKey {
     Preconditions.checkNotNull(type);
     Preconditions.checkState(!grouperFactoryClassName.isEmpty(), "Empty grouper factory class provided");
 
-    Preconditions.checkState(type.equals(CHECKPOINT_KEY_TYPE), String.format("Invalid type provided for checkpoint key. " +
-        "Expected: (%s) Actual: (%s)", CHECKPOINT_KEY_TYPE, type));
-
     this.grouperFactoryClassName = grouperFactoryClassName;
     this.taskName = taskName;
     this.type = type;
diff --git a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
index b00f12e..e738190 100644
--- a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
+++ b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
@@ -59,10 +59,6 @@ public class KafkaCheckpointLogKeySerde implements Serde<KafkaCheckpointLogKey>
     try {
       LinkedHashMap<String, String> deserializedKey = MAPPER.readValue(bytes, LinkedHashMap.class);
 
-      if (!KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE.equals(deserializedKey.get(TYPE_FIELD))) {
-        throw new IllegalArgumentException(String.format("Invalid key detected. Type of the key is %s", deserializedKey.get(TYPE_FIELD)));
-      }
-
       return new KafkaCheckpointLogKey(deserializedKey.get(TYPE_FIELD), new TaskName(deserializedKey.get(TASK_NAME_FIELD)), deserializedKey.get(SSP_GROUPER_FACTORY_FIELD)
       );
     } catch (Exception e) {
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 757e7ae..d88048e 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -159,19 +159,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
     * @inheritdoc
     */
   override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
-    val key = new KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE, taskName, expectedGrouperFactory)
-    val keyBytes = try {
-      checkpointKeySerde.toBytes(key)
-    } catch {
-      case e: Exception => throw new SamzaException(s"Exception when writing checkpoint-key for $taskName: $checkpoint", e)
-    }
-    val msgBytes = try {
-      checkpointMsgSerde.toBytes(checkpoint)
-    } catch {
-      case e: Exception => throw new SamzaException(s"Exception when writing checkpoint for $taskName: $checkpoint", e)
-    }
-
-    val envelope = new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes)
+    val envelope = buildOutgoingMessageEnvelope(taskName, checkpoint)
 
     // Used for exponential backoff retries on failure in sending messages through producer.
     val startTimeInMillis: Long = System.currentTimeMillis()
@@ -188,7 +176,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
       } catch {
         case exception: Exception => {
           producerException = exception
-          warn(s"Retrying failed checkpoint write to key: $key, checkpoint: $checkpoint for task: $taskName", exception)
+          warn(s"Retrying failed checkpoint write for checkpoint: $checkpoint for task: $taskName", exception)
           // TODO: Remove this producer recreation logic after SAMZA-1393.
           val newProducer: SystemProducer = getSystemProducer()
           producerCreationLock.synchronized {
@@ -333,4 +321,21 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
 
     partitionMetaData.getOldestOffset
   }
+
+  @VisibleForTesting
+  def buildOutgoingMessageEnvelope(taskName: TaskName, checkpoint: Checkpoint): OutgoingMessageEnvelope = {
+    val key = new KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE, taskName, expectedGrouperFactory)
+    val keyBytes = try {
+      checkpointKeySerde.toBytes(key)
+    } catch {
+      case e: Exception => throw new SamzaException(s"Exception when writing checkpoint-key for $taskName: $checkpoint", e)
+    }
+    val msgBytes = try {
+      checkpointMsgSerde.toBytes(checkpoint)
+    } catch {
+      case e: Exception => throw new SamzaException(s"Exception when writing checkpoint for $taskName: $checkpoint", e)
+    }
+
+    new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes)
+  }
 }
diff --git a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
index b648b1c..614aaba 100644
--- a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
+++ b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
@@ -50,4 +50,16 @@ public class TestKafkaCheckpointLogKeySerde {
     // test that deserialize(serialize(k)) == k
     Assert.assertEquals(key, checkpointSerde.fromBytes(checkpointSerde.toBytes(key)));
   }
+
+  @Test
+  public void testForwardsCompatibility() {
+    // Set the key to another value, this is for the future if we want to support multiple checkpoint keys
+    // we do not want to throw in the Serdes layer, but must be validated in the CheckpointManager
+    KafkaCheckpointLogKey key = new KafkaCheckpointLogKey("checkpoint-v2",
+        new TaskName("Partition 0"), GroupByPartitionFactory.class.getCanonicalName());
+    KafkaCheckpointLogKeySerde checkpointSerde = new KafkaCheckpointLogKeySerde();
+
+    // test that deserialize(serialize(k)) == k
+    Assert.assertEquals(key, checkpointSerde.fromBytes(checkpointSerde.toBytes(key)));
+  }
 }
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 7d6db64..2671067 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -127,6 +127,43 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
   }
 
   @Test
+  def testReadCheckpointShouldIgnoreUnknownCheckpointKeys(): Unit = {
+    val checkpointTopic = "checkpoint-topic-1"
+    val kcm1 = createKafkaCheckpointManager(checkpointTopic)
+    kcm1.register(taskName)
+    kcm1.createResources
+    kcm1.start
+    kcm1.stop
+
+    // check that start actually creates the topic with log compaction enabled
+    val topicConfig = adminZkClient.getAllTopicConfigs().getOrElse(checkpointTopic, new Properties())
+
+    assertEquals(topicConfig, new KafkaConfig(config).getCheckpointTopicProperties())
+    assertEquals("compact", topicConfig.get("cleanup.policy"))
+    assertEquals("26214400", topicConfig.get("segment.bytes"))
+
+    // read before topic exists should result in a null checkpoint
+    val readCp = readCheckpoint(checkpointTopic, taskName)
+    assertNull(readCp)
+
+    // skips unknown checkpoints from checkpoint topic
+    writeCheckpoint(checkpointTopic, taskName, checkpoint1, "checkpoint-v2")
+    assertNull(readCheckpoint(checkpointTopic, taskName))
+
+    // reads latest v1 checkpoints
+    writeCheckpoint(checkpointTopic, taskName, checkpoint1)
+    assertEquals(checkpoint1, readCheckpoint(checkpointTopic, taskName))
+
+    // writing checkpoint v2 still returns the previous v1 checkpoint
+    writeCheckpoint(checkpointTopic, taskName, checkpoint2, "checkpoint-v2")
+    assertEquals(checkpoint1, readCheckpoint(checkpointTopic, taskName))
+
+    // writing checkpoint2 with the correct key returns the checkpoint2
+    writeCheckpoint(checkpointTopic, taskName, checkpoint2)
+    assertEquals(checkpoint2, readCheckpoint(checkpointTopic, taskName))
+  }
+
+  @Test
   def testWriteCheckpointShouldRetryFiniteTimesOnFailure(): Unit = {
     val checkpointTopic = "checkpoint-topic-2"
     val mockKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer])
@@ -251,7 +288,8 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
       .build())
   }
 
-  private def createKafkaCheckpointManager(cpTopic: String, serde: CheckpointSerde = new CheckpointSerde, failOnTopicValidation: Boolean = true) = {
+  private def createKafkaCheckpointManager(cpTopic: String, serde: CheckpointSerde = new CheckpointSerde,
+    failOnTopicValidation: Boolean = true, checkpointKey: String = KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE) = {
     val kafkaConfig = new org.apache.samza.config.KafkaConfig(config)
     val props = kafkaConfig.getCheckpointTopicProperties()
     val systemName = kafkaConfig.getCheckpointSystem.getOrElse(
@@ -264,7 +302,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     val systemFactory = ReflectionUtil.getObj(systemFactoryClassName, classOf[SystemFactory])
 
     val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, props)
-    new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, config, new NoOpMetricsRegistry, serde)
+    new MockKafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, serde, checkpointKey)
   }
 
   private def readCheckpoint(checkpointTopic: String, taskName: TaskName) : Checkpoint = {
@@ -276,8 +314,9 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     checkpoint
   }
 
-  private def writeCheckpoint(checkpointTopic: String, taskName: TaskName, checkpoint: Checkpoint): Unit = {
-    val kcm = createKafkaCheckpointManager(checkpointTopic)
+  private def writeCheckpoint(checkpointTopic: String, taskName: TaskName, checkpoint: Checkpoint,
+    checkpointKey: String = KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE): Unit = {
+    val kcm = createKafkaCheckpointManager(checkpointTopic, checkpointKey = checkpointKey)
     kcm.register(taskName)
     kcm.start
     kcm.writeCheckpoint(taskName, checkpoint)
@@ -300,4 +339,25 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     }
   }
 
+  class MockKafkaCheckpointManager(spec: KafkaStreamSpec, systemFactory: SystemFactory, failOnTopicValidation: Boolean,
+    serde: CheckpointSerde = new CheckpointSerde, checkpointKey: String)
+    extends KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, config,
+      new NoOpMetricsRegistry, serde) {
+    override def buildOutgoingMessageEnvelope(taskName: TaskName, checkpoint: Checkpoint): OutgoingMessageEnvelope = {
+      val key = new KafkaCheckpointLogKey(checkpointKey, taskName, expectedGrouperFactory)
+      val keySerde = new KafkaCheckpointLogKeySerde
+      val checkpointMsgSerde = new CheckpointSerde
+      val keyBytes = try {
+        keySerde.toBytes(key)
+      } catch {
+        case e: Exception => throw new SamzaException(s"Exception when writing checkpoint-key for $taskName: $checkpoint", e)
+      }
+      val msgBytes = try {
+        checkpointMsgSerde.toBytes(checkpoint)
+      } catch {
+        case e: Exception => throw new SamzaException(s"Exception when writing checkpoint for $taskName: $checkpoint", e)
+      }
+      new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes)
+    }
+  }
 }