You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/01/18 02:26:48 UTC
samza git commit: SAMZA-1560: Handle key-serde errors in
KafkaCheckpointManager
Repository: samza
Updated Branches:
refs/heads/master b38e6622f -> b7805df10
SAMZA-1560: Handle key-serde errors in KafkaCheckpointManager
Author: Jagadish <jv...@linkedin.com>
Reviewers: Xinyu Liu<xi...@gmail.com>
Closes #408 from vjagadish1989/kcm-fix
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b7805df1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b7805df1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b7805df1
Branch: refs/heads/master
Commit: b7805df10f74c4a2b9bb89e49bbaefb72a8526b5
Parents: b38e662
Author: Jagadish <jv...@linkedin.com>
Authored: Wed Jan 17 18:27:40 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Jan 17 18:27:40 2018 -0800
----------------------------------------------------------------------
.../kafka/KafkaCheckpointLogKeySerde.java | 5 ++
.../kafka/KafkaCheckpointManager.scala | 51 +++++++++++---------
.../kafka/TestKafkaCheckpointManagerJava.java | 43 +++++++++++++++--
3 files changed, 72 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/b7805df1/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
index cc883b6..8e0c815 100644
--- a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
+++ b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
@@ -58,6 +58,11 @@ public class KafkaCheckpointLogKeySerde implements Serde<KafkaCheckpointLogKey>
public KafkaCheckpointLogKey fromBytes(byte[] bytes) {
try {
LinkedHashMap<String, String> deserializedKey = mapper.readValue(bytes, LinkedHashMap.class);
+
+ if (!KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE.equals(deserializedKey.get(TYPE_FIELD))) {
+ throw new IllegalArgumentException(String.format("Invalid key detected. Type of the key is %s", deserializedKey.get(TYPE_FIELD)));
+ }
+
return new KafkaCheckpointLogKey(deserializedKey.get(TYPE_FIELD), new TaskName(deserializedKey.get(TASK_NAME_FIELD)), deserializedKey.get(SSP_GROUPER_FACTORY_FIELD)
);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/samza/blob/b7805df1/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index e1187c5..ca138c7 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -50,7 +50,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
validateCheckpoint: Boolean,
config: Config,
metricsRegistry: MetricsRegistry,
- checkpointMsgSerde: Serde[Checkpoint] = new CheckpointSerde) extends CheckpointManager with Logging {
+ checkpointMsgSerde: Serde[Checkpoint] = new CheckpointSerde,
+ checkpointKeySerde: Serde[KafkaCheckpointLogKey] = new KafkaCheckpointLogKeySerde) extends CheckpointManager with Logging {
info(s"Creating KafkaCheckpointManager for checkpointTopic:$checkpointTopic, systemName:$checkpointSystem " +
s"validateCheckpoints:$validateCheckpoint")
@@ -58,7 +59,6 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
val checkpointSystem: String = checkpointSpec.getSystemName
val checkpointTopic: String = checkpointSpec.getPhysicalName
val checkpointSsp = new SystemStreamPartition(checkpointSystem, checkpointTopic, new Partition(0))
- val checkpointKeySerde = new KafkaCheckpointLogKeySerde
val expectedGrouperFactory = new JobConfig(config).getSystemStreamPartitionGrouperFactory
val systemProducer = systemFactory.getProducer(checkpointSystem, config, metricsRegistry)
@@ -215,31 +215,38 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
val checkpointKey = try {
checkpointKeySerde.fromBytes(keyBytes)
} catch {
- case e: Exception => throw new SamzaException(s"Exception while serializing checkpoint-key. " +
- s"Topic: $checkpointTopic Offset: $offset", e)
- }
-
- // If the grouper in the key is not equal to the configured grouper, error out.
- val actualGrouperFactory = checkpointKey.getGrouperFactoryClassName
- if (!expectedGrouperFactory.equals(actualGrouperFactory)) {
- warn(s"Grouper mismatch. Configured: $expectedGrouperFactory Actual: $actualGrouperFactory ")
- if (validateCheckpoint) {
- throw new SamzaException("SSPGrouperFactory in the checkpoint topic does not match the configured value" +
- s"Configured value: $expectedGrouperFactory; Actual value: $actualGrouperFactory Offset: $offset")
+ case e: Exception => if (validateCheckpoint) {
+ throw new SamzaException(s"Exception while serializing checkpoint-key. " +
+ s"Topic: $checkpointTopic Offset: $offset", e)
+ } else {
+ warn(s"Ignoring exception while serializing checkpoint-key. Topic: $checkpointTopic Offset: $offset", e)
+ null
}
}
- // If the type of the key is not KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE, it can safely be ignored.
- if (KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE.equals(checkpointKey.getType)) {
- val checkpointBytes = checkpointEnvelope.getMessage.asInstanceOf[Array[Byte]]
- val checkpoint = try {
- checkpointMsgSerde.fromBytes(checkpointBytes)
- } catch {
- case e: Exception => throw new SamzaException(s"Exception while serializing checkpoint-message. " +
- s"Topic: $checkpointTopic Offset: $offset", e)
+ if (checkpointKey != null) {
+ // If the grouper in the key is not equal to the configured grouper, error out.
+ val actualGrouperFactory = checkpointKey.getGrouperFactoryClassName
+ if (!expectedGrouperFactory.equals(actualGrouperFactory)) {
+ warn(s"Grouper mismatch. Configured: $expectedGrouperFactory Actual: $actualGrouperFactory ")
+ if (validateCheckpoint) {
+ throw new SamzaException("SSPGrouperFactory in the checkpoint topic does not match the configured value" +
+ s"Configured value: $expectedGrouperFactory; Actual value: $actualGrouperFactory Offset: $offset")
+ }
}
- checkpoints.put(checkpointKey.getTaskName, checkpoint)
+ // If the type of the key is not KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE, it can safely be ignored.
+ if (KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE.equals(checkpointKey.getType)) {
+ val checkpointBytes = checkpointEnvelope.getMessage.asInstanceOf[Array[Byte]]
+ val checkpoint = try {
+ checkpointMsgSerde.fromBytes(checkpointBytes)
+ } catch {
+ case e: Exception => throw new SamzaException(s"Exception while serializing checkpoint-message. " +
+ s"Topic: $checkpointTopic Offset: $offset", e)
+ }
+
+ checkpoints.put(checkpointKey.getTaskName, checkpoint)
+ }
}
}
info(s"Read $numMessagesRead messages from system:$checkpointSystem topic:$checkpointTopic")
http://git-wip-us.apache.org/repos/asf/samza/blob/b7805df1/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
index a2ae94c..8a3a2e1 100644
--- a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
@@ -72,7 +72,7 @@ public class TestKafkaCheckpointManagerJava {
SystemFactory factory = newFactory(mock(SystemProducer.class), mock(SystemConsumer.class), mockAdmin);
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
- true, mock(Config.class), mock(MetricsRegistry.class), null);
+ true, mock(Config.class), mock(MetricsRegistry.class), null, new KafkaCheckpointLogKeySerde());
// expect an exception during startup
checkpointManager.start();
@@ -90,7 +90,7 @@ public class TestKafkaCheckpointManagerJava {
SystemFactory factory = newFactory(mock(SystemProducer.class), mock(SystemConsumer.class), mockAdmin);
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
- true, mock(Config.class), mock(MetricsRegistry.class), null);
+ true, mock(Config.class), mock(MetricsRegistry.class), null, new KafkaCheckpointLogKeySerde());
// expect an exception during startup
checkpointManager.start();
@@ -114,7 +114,7 @@ public class TestKafkaCheckpointManagerJava {
// wire up an exception throwing serde with the checkpointmanager
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
- true, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde());
+ true, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde(), new KafkaCheckpointLogKeySerde());
checkpointManager.register(TASK1);
checkpointManager.start();
@@ -123,6 +123,33 @@ public class TestKafkaCheckpointManagerJava {
}
@Test
+ public void testReadSucceedsOnKeySerdeExceptionsWhenValidationIsDisabled() throws Exception {
+ KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
+ CHECKPOINT_SYSTEM, 1);
+ Config mockConfig = mock(Config.class);
+ when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);
+
+ // mock out a consumer that returns a single checkpoint IME
+ SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
+ List<List<IncomingMessageEnvelope>> checkpointEnvelopes = ImmutableList.of(
+ ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0")));
+ SystemConsumer mockConsumer = newConsumer(checkpointEnvelopes);
+
+ SystemAdmin mockAdmin = newAdmin("0", "1");
+ SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
+
+ // wire up an exception throwing serde with the checkpointmanager
+ KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
+ false, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde(),
+ new ExceptionThrowingCheckpointKeySerde());
+ checkpointManager.register(TASK1);
+ checkpointManager.start();
+
+ // expect the read to succeed inspite of the exception from ExceptionThrowingSerde
+ checkpointManager.readLastCheckpoint(TASK1);
+ }
+
+ @Test
public void testCheckpointsAreReadFromOldestOffset() throws Exception {
KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
CHECKPOINT_SYSTEM, 1);
@@ -138,7 +165,7 @@ public class TestKafkaCheckpointManagerJava {
SystemAdmin mockAdmin = newAdmin(oldestOffset, "1");
SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
- true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde());
+ true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde(), new KafkaCheckpointLogKeySerde());
checkpointManager.register(TASK1);
// 1. verify that consumer.register is called only during checkpointManager.start.
@@ -177,7 +204,7 @@ public class TestKafkaCheckpointManagerJava {
SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
- true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde());
+ true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde(), new KafkaCheckpointLogKeySerde());
checkpointManager.register(TASK1);
checkpointManager.start();
@@ -244,4 +271,10 @@ public class TestKafkaCheckpointManagerJava {
throw new KafkaException("exception");
}
}
+
+ private static class ExceptionThrowingCheckpointKeySerde extends KafkaCheckpointLogKeySerde {
+ public KafkaCheckpointLogKey fromBytes(byte[] bytes) {
+ throw new KafkaException("exception");
+ }
+ }
}