You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2021/05/19 22:47:21 UTC
[samza] branch master updated: KafkaChangelogKeySerde should
support deserializing multiple keys (#1502)
This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 08931ce KafkaChangelogKeySerde should support deserializing multiple keys (#1502)
08931ce is described below
commit 08931cedea99f35546f4b76923bfe6840bf2144a
Author: Daniel Chen <dc...@linkedin.com>
AuthorDate: Wed May 19 15:47:11 2021 -0700
KafkaChangelogKeySerde should support deserializing multiple keys (#1502)
---
.../checkpoint/kafka/KafkaCheckpointLogKey.java | 3 -
.../kafka/KafkaCheckpointLogKeySerde.java | 4 --
.../checkpoint/kafka/KafkaCheckpointManager.scala | 33 ++++++-----
.../kafka/TestKafkaCheckpointLogKeySerde.java | 12 ++++
.../kafka/TestKafkaCheckpointManager.scala | 68 ++++++++++++++++++++--
5 files changed, 95 insertions(+), 25 deletions(-)
diff --git a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
index 05114f9..a732aba 100644
--- a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
+++ b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
@@ -48,9 +48,6 @@ public class KafkaCheckpointLogKey {
Preconditions.checkNotNull(type);
Preconditions.checkState(!grouperFactoryClassName.isEmpty(), "Empty grouper factory class provided");
- Preconditions.checkState(type.equals(CHECKPOINT_KEY_TYPE), String.format("Invalid type provided for checkpoint key. " +
- "Expected: (%s) Actual: (%s)", CHECKPOINT_KEY_TYPE, type));
-
this.grouperFactoryClassName = grouperFactoryClassName;
this.taskName = taskName;
this.type = type;
diff --git a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
index b00f12e..e738190 100644
--- a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
+++ b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
@@ -59,10 +59,6 @@ public class KafkaCheckpointLogKeySerde implements Serde<KafkaCheckpointLogKey>
try {
LinkedHashMap<String, String> deserializedKey = MAPPER.readValue(bytes, LinkedHashMap.class);
- if (!KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE.equals(deserializedKey.get(TYPE_FIELD))) {
- throw new IllegalArgumentException(String.format("Invalid key detected. Type of the key is %s", deserializedKey.get(TYPE_FIELD)));
- }
-
return new KafkaCheckpointLogKey(deserializedKey.get(TYPE_FIELD), new TaskName(deserializedKey.get(TASK_NAME_FIELD)), deserializedKey.get(SSP_GROUPER_FACTORY_FIELD)
);
} catch (Exception e) {
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 757e7ae..d88048e 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -159,19 +159,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
* @inheritdoc
*/
override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
- val key = new KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE, taskName, expectedGrouperFactory)
- val keyBytes = try {
- checkpointKeySerde.toBytes(key)
- } catch {
- case e: Exception => throw new SamzaException(s"Exception when writing checkpoint-key for $taskName: $checkpoint", e)
- }
- val msgBytes = try {
- checkpointMsgSerde.toBytes(checkpoint)
- } catch {
- case e: Exception => throw new SamzaException(s"Exception when writing checkpoint for $taskName: $checkpoint", e)
- }
-
- val envelope = new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes)
+ val envelope = buildOutgoingMessageEnvelope(taskName, checkpoint)
// Used for exponential backoff retries on failure in sending messages through producer.
val startTimeInMillis: Long = System.currentTimeMillis()
@@ -188,7 +176,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
} catch {
case exception: Exception => {
producerException = exception
- warn(s"Retrying failed checkpoint write to key: $key, checkpoint: $checkpoint for task: $taskName", exception)
+ warn(s"Retrying failed checkpoint write for checkpoint: $checkpoint for task: $taskName", exception)
// TODO: Remove this producer recreation logic after SAMZA-1393.
val newProducer: SystemProducer = getSystemProducer()
producerCreationLock.synchronized {
@@ -333,4 +321,21 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
partitionMetaData.getOldestOffset
}
+
+ @VisibleForTesting
+ def buildOutgoingMessageEnvelope(taskName: TaskName, checkpoint: Checkpoint): OutgoingMessageEnvelope = {
+ val key = new KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE, taskName, expectedGrouperFactory)
+ val keyBytes = try {
+ checkpointKeySerde.toBytes(key)
+ } catch {
+ case e: Exception => throw new SamzaException(s"Exception when writing checkpoint-key for $taskName: $checkpoint", e)
+ }
+ val msgBytes = try {
+ checkpointMsgSerde.toBytes(checkpoint)
+ } catch {
+ case e: Exception => throw new SamzaException(s"Exception when writing checkpoint for $taskName: $checkpoint", e)
+ }
+
+ new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes)
+ }
}
diff --git a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
index b648b1c..614aaba 100644
--- a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
+++ b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
@@ -50,4 +50,16 @@ public class TestKafkaCheckpointLogKeySerde {
// test that deserialize(serialize(k)) == k
Assert.assertEquals(key, checkpointSerde.fromBytes(checkpointSerde.toBytes(key)));
}
+
+ @Test
+ public void testForwardsCompatibility() {
+ // Set the key to another value, this is for the future if we want to support multiple checkpoint keys
+ // we do not want to throw in the Serdes layer, but must be validated in the CheckpointManager
+ KafkaCheckpointLogKey key = new KafkaCheckpointLogKey("checkpoint-v2",
+ new TaskName("Partition 0"), GroupByPartitionFactory.class.getCanonicalName());
+ KafkaCheckpointLogKeySerde checkpointSerde = new KafkaCheckpointLogKeySerde();
+
+ // test that deserialize(serialize(k)) == k
+ Assert.assertEquals(key, checkpointSerde.fromBytes(checkpointSerde.toBytes(key)));
+ }
}
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 7d6db64..2671067 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -127,6 +127,43 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
}
@Test
+ def testReadCheckpointShouldIgnoreUnknownCheckpointKeys(): Unit = {
+ val checkpointTopic = "checkpoint-topic-1"
+ val kcm1 = createKafkaCheckpointManager(checkpointTopic)
+ kcm1.register(taskName)
+ kcm1.createResources
+ kcm1.start
+ kcm1.stop
+
+ // check that start actually creates the topic with log compaction enabled
+ val topicConfig = adminZkClient.getAllTopicConfigs().getOrElse(checkpointTopic, new Properties())
+
+ assertEquals(topicConfig, new KafkaConfig(config).getCheckpointTopicProperties())
+ assertEquals("compact", topicConfig.get("cleanup.policy"))
+ assertEquals("26214400", topicConfig.get("segment.bytes"))
+
+ // read before topic exists should result in a null checkpoint
+ val readCp = readCheckpoint(checkpointTopic, taskName)
+ assertNull(readCp)
+
+ // skips unknown checkpoints from checkpoint topic
+ writeCheckpoint(checkpointTopic, taskName, checkpoint1, "checkpoint-v2")
+ assertNull(readCheckpoint(checkpointTopic, taskName))
+
+ // reads latest v1 checkpoints
+ writeCheckpoint(checkpointTopic, taskName, checkpoint1)
+ assertEquals(checkpoint1, readCheckpoint(checkpointTopic, taskName))
+
+ // writing checkpoint v2 still returns the previous v1 checkpoint
+ writeCheckpoint(checkpointTopic, taskName, checkpoint2, "checkpoint-v2")
+ assertEquals(checkpoint1, readCheckpoint(checkpointTopic, taskName))
+
+ // writing checkpoint2 with the correct key returns the checkpoint2
+ writeCheckpoint(checkpointTopic, taskName, checkpoint2)
+ assertEquals(checkpoint2, readCheckpoint(checkpointTopic, taskName))
+ }
+
+ @Test
def testWriteCheckpointShouldRetryFiniteTimesOnFailure(): Unit = {
val checkpointTopic = "checkpoint-topic-2"
val mockKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer])
@@ -251,7 +288,8 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
.build())
}
- private def createKafkaCheckpointManager(cpTopic: String, serde: CheckpointSerde = new CheckpointSerde, failOnTopicValidation: Boolean = true) = {
+ private def createKafkaCheckpointManager(cpTopic: String, serde: CheckpointSerde = new CheckpointSerde,
+ failOnTopicValidation: Boolean = true, checkpointKey: String = KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE) = {
val kafkaConfig = new org.apache.samza.config.KafkaConfig(config)
val props = kafkaConfig.getCheckpointTopicProperties()
val systemName = kafkaConfig.getCheckpointSystem.getOrElse(
@@ -264,7 +302,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
val systemFactory = ReflectionUtil.getObj(systemFactoryClassName, classOf[SystemFactory])
val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, props)
- new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, config, new NoOpMetricsRegistry, serde)
+ new MockKafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, serde, checkpointKey)
}
private def readCheckpoint(checkpointTopic: String, taskName: TaskName) : Checkpoint = {
@@ -276,8 +314,9 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
checkpoint
}
- private def writeCheckpoint(checkpointTopic: String, taskName: TaskName, checkpoint: Checkpoint): Unit = {
- val kcm = createKafkaCheckpointManager(checkpointTopic)
+ private def writeCheckpoint(checkpointTopic: String, taskName: TaskName, checkpoint: Checkpoint,
+ checkpointKey: String = KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE): Unit = {
+ val kcm = createKafkaCheckpointManager(checkpointTopic, checkpointKey = checkpointKey)
kcm.register(taskName)
kcm.start
kcm.writeCheckpoint(taskName, checkpoint)
@@ -300,4 +339,25 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
}
}
+ class MockKafkaCheckpointManager(spec: KafkaStreamSpec, systemFactory: SystemFactory, failOnTopicValidation: Boolean,
+ serde: CheckpointSerde = new CheckpointSerde, checkpointKey: String)
+ extends KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, config,
+ new NoOpMetricsRegistry, serde) {
+ override def buildOutgoingMessageEnvelope(taskName: TaskName, checkpoint: Checkpoint): OutgoingMessageEnvelope = {
+ val key = new KafkaCheckpointLogKey(checkpointKey, taskName, expectedGrouperFactory)
+ val keySerde = new KafkaCheckpointLogKeySerde
+ val checkpointMsgSerde = new CheckpointSerde
+ val keyBytes = try {
+ keySerde.toBytes(key)
+ } catch {
+ case e: Exception => throw new SamzaException(s"Exception when writing checkpoint-key for $taskName: $checkpoint", e)
+ }
+ val msgBytes = try {
+ checkpointMsgSerde.toBytes(checkpoint)
+ } catch {
+ case e: Exception => throw new SamzaException(s"Exception when writing checkpoint for $taskName: $checkpoint", e)
+ }
+ new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes)
+ }
+ }
}