You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2022/01/06 05:39:38 UTC

[kafka] branch trunk updated: KAFKA-13476: Increase resilience timestamp decoding Kafka Streams (#11535)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7567cbc  KAFKA-13476: Increase resilience timestamp decoding Kafka Streams (#11535)
7567cbc is described below

commit 7567cbc857eef4f535410a8f6256308197c3b9c8
Author: Richard <30...@users.noreply.github.com>
AuthorDate: Thu Jan 6 06:38:10 2022 +0100

    KAFKA-13476: Increase resilience timestamp decoding Kafka Streams (#11535)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../streams/processor/internals/StreamTask.java    | 23 +++++++++++++---------
 .../processor/internals/StreamTaskTest.java        |  7 +++++++
 2 files changed, 21 insertions(+), 9 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 32369c9..6823d2e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -1097,15 +1097,20 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         if (encryptedString.isEmpty()) {
             return RecordQueue.UNKNOWN;
         }
-        final ByteBuffer buffer = ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString));
-        final byte version = buffer.get();
-        switch (version) {
-            case LATEST_MAGIC_BYTE:
-                return buffer.getLong();
-            default:
-                log.warn("Unsupported offset metadata version found. Supported version {}. Found version {}.",
-                         LATEST_MAGIC_BYTE, version);
-                return RecordQueue.UNKNOWN;
+        try {
+            final ByteBuffer buffer = ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString));
+            final byte version = buffer.get();
+            switch (version) {
+                case LATEST_MAGIC_BYTE:
+                    return buffer.getLong();
+                default:
+                    log.warn("Unsupported offset metadata version found. Supported version {}. Found version {}.",
+                            LATEST_MAGIC_BYTE, version);
+                    return RecordQueue.UNKNOWN;
+            }
+        } catch (final Exception exception) {
+            log.warn("Unsupported offset metadata found");
+            return RecordQueue.UNKNOWN;
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1b3c500..3c1814e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1184,6 +1184,13 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldReturnUnknownTimestampIfInvalidMetadata() {
+        task = createStatelessTask(createConfig("100"));
+        final String invalidBase64String = "{}";
+        assertEquals(RecordQueue.UNKNOWN, task.decodeTimestamp(invalidBase64String));
+    }
+
+    @Test
     public void shouldBeProcessableIfAllPartitionsBuffered() {
         task = createStatelessTask(createConfig("100"));
         task.initializeIfNeeded();