You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2022/01/06 05:39:38 UTC
[kafka] branch trunk updated: KAFKA-13476: Increase resilience timestamp decoding Kafka Streams (#11535)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7567cbc KAFKA-13476: Increase resilience timestamp decoding Kafka Streams (#11535)
7567cbc is described below
commit 7567cbc857eef4f535410a8f6256308197c3b9c8
Author: Richard <30...@users.noreply.github.com>
AuthorDate: Thu Jan 6 06:38:10 2022 +0100
KAFKA-13476: Increase resilience timestamp decoding Kafka Streams (#11535)
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../streams/processor/internals/StreamTask.java | 23 +++++++++++++---------
.../processor/internals/StreamTaskTest.java | 7 +++++++
2 files changed, 21 insertions(+), 9 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 32369c9..6823d2e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -1097,15 +1097,20 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
if (encryptedString.isEmpty()) {
return RecordQueue.UNKNOWN;
}
- final ByteBuffer buffer = ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString));
- final byte version = buffer.get();
- switch (version) {
- case LATEST_MAGIC_BYTE:
- return buffer.getLong();
- default:
- log.warn("Unsupported offset metadata version found. Supported version {}. Found version {}.",
- LATEST_MAGIC_BYTE, version);
- return RecordQueue.UNKNOWN;
+ try {
+ final ByteBuffer buffer = ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString));
+ final byte version = buffer.get();
+ switch (version) {
+ case LATEST_MAGIC_BYTE:
+ return buffer.getLong();
+ default:
+ log.warn("Unsupported offset metadata version found. Supported version {}. Found version {}.",
+ LATEST_MAGIC_BYTE, version);
+ return RecordQueue.UNKNOWN;
+ }
+ } catch (final Exception exception) {
+ log.warn("Unsupported offset metadata found");
+ return RecordQueue.UNKNOWN;
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1b3c500..3c1814e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1184,6 +1184,13 @@ public class StreamTaskTest {
}
@Test
+ public void shouldReturnUnknownTimestampIfInvalidMetadata() {
+ task = createStatelessTask(createConfig("100"));
+ final String invalidBase64String = "{}";
+ assertEquals(RecordQueue.UNKNOWN, task.decodeTimestamp(invalidBase64String));
+ }
+
+ @Test
public void shouldBeProcessableIfAllPartitionsBuffered() {
task = createStatelessTask(createConfig("100"));
task.initializeIfNeeded();