You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/04/14 04:56:45 UTC

[kafka] branch trunk updated: KAFKA-14834: [10/N] Reserve repartition topic formats to include isLatest (#13566)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f1eb260fea7 KAFKA-14834: [10/N] Reserve repartition topic formats to include isLatest (#13566)
f1eb260fea7 is described below

commit f1eb260fea75de9b861a35678e8c8233fb948e1a
Author: Victoria Xia <vi...@confluent.io>
AuthorDate: Fri Apr 14 00:56:36 2023 -0400

    KAFKA-14834: [10/N] Reserve repartition topic formats to include isLatest (#13566)
    
    KIP-914 introduced a new boolean isLatest into Change to indicate whether a change update represents the latest for the key. Even though Change is serialized into the table repartition topic, the new boolean does not need to be serialized in, because the table repartition map processor performs an optimization to drop records for which isLatest = false. If not for this optimization, the downstream table aggregate would have to drop such records instead, and isLatest would need to be s [...]
    
    In light of the possibility that isLatest may need to be serialized into the repartition topic in the future, e.g., if other downstream processors are added which need to distinguish between records for which isLatest = true vs isLatest = false, this PR reserves repartition topic formats which include isLatest. Reserving these formats now comes at no additional cost to users since a rolling bounce is already required for the upcoming release due to KIP-904. If we don't reserve them no [...]
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 docs/streams/developer-guide/dsl-api.html          |  4 +-
 .../kstream/internals/ChangedDeserializer.java     | 46 +++++++++++++-
 .../kstream/internals/ChangedSerdeTest.java        | 70 +++++++++++++++++++++-
 3 files changed, 114 insertions(+), 6 deletions(-)

diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index adf0f33bb4b..5fe7af249d3 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -1026,7 +1026,7 @@ KTable&lt;byte[], Long&gt; aggregatedTable = groupedTable.aggregate(
                                     <li>When the first non-<code class="docutils literal"><span class="pre">null</span></code> value is received for a key (e.g.,  INSERT), then only the adder is called.</li>
                                     <li>When subsequent non-<code class="docutils literal"><span class="pre">null</span></code> values are received for a key (e.g.,  UPDATE), then (1) the subtractor is
                                         called with the old value as stored in the table and (2) the adder is called with the new value of the
-                                        input record that was just received. The subtractor will be called before the adder if and only if the extracted grouping key of the old and new value is the same.
+                                        input record that was just received. The subtractor is guaranteed to be called before the adder if the extracted grouping key of the old and new value is the same.
                                         The detection of this case depends on the correct implementation of the equals() method of the extracted key type. Otherwise, the order of execution for the subtractor
                                         and adder is not defined.</li>
                                     <li>When a tombstone record &#8211; i.e. a record with a <code class="docutils literal"><span class="pre">null</span></code> value &#8211; is received for a key (e.g.,  DELETE),
@@ -1276,7 +1276,7 @@ KTable&lt;String, Long&gt; aggregatedTable = groupedTable.reduce(
                                     <li>When the first non-<code class="docutils literal"><span class="pre">null</span></code> value is received for a key (e.g.,  INSERT), then only the adder is called.</li>
                                     <li>When subsequent non-<code class="docutils literal"><span class="pre">null</span></code> values are received for a key (e.g.,  UPDATE), then (1) the subtractor is
                                         called with the old value as stored in the table and (2) the adder is called with the new value of the
-                                        input record that was just received. The subtractor will be called before the adder if and only if the extracted grouping key of the old and new value is the same.
+                                        input record that was just received. The subtractor is guaranteed be called before the adder if the extracted grouping key of the old and new value is the same.
                                         The detection of this case depends on the correct implementation of the equals() method of the extracted key type. Otherwise, the order of execution for the subtractor
                                         and adder is not defined.</li>
                                     <li>When a tombstone record &#8211; i.e. a record with a <code class="docutils literal"><span class="pre">null</span></code> value &#8211; is received for a key (e.g.,  DELETE),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
index de5a1e33b89..b5b2c6aa3c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 public class ChangedDeserializer<T> implements Deserializer<Change<T>>, WrappingNullableDeserializer<Change<T>, Void, T> {
 
     private static final int NEW_OLD_FLAG_SIZE = 1;
+    private static final int IS_LATEST_FLAG_SIZE = 1;
 
     private Deserializer<T> inner;
 
@@ -52,21 +53,27 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>>, Wrapping
         // {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
         // {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
         // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2}
+        // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3}
+        // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4}
+        // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5}
         final ByteBuffer buffer = ByteBuffer.wrap(data);
         final byte newOldFlag = buffer.get(data.length - NEW_OLD_FLAG_SIZE);
 
         final byte[] newData;
         final byte[] oldData;
+        final boolean isLatest;
         if (newOldFlag == (byte) 0) {
             newData = null;
             final int oldDataLength = data.length - NEW_OLD_FLAG_SIZE;
             oldData = new byte[oldDataLength];
             buffer.get(oldData);
+            isLatest = true;
         } else if (newOldFlag == (byte) 1) {
             oldData = null;
             final int newDataLength = data.length - NEW_OLD_FLAG_SIZE;
             newData = new byte[newDataLength];
             buffer.get(newData);
+            isLatest = true;
         } else if (newOldFlag == (byte) 2) {
             final int newDataLength = Math.toIntExact(ByteUtils.readUnsignedInt(buffer));
             newData = new byte[newDataLength];
@@ -76,13 +83,48 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>>, Wrapping
 
             buffer.get(newData);
             buffer.get(oldData);
+            isLatest = true;
+        } else if (newOldFlag == (byte) 3) {
+            newData = null;
+            final int oldDataLength = data.length - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE;
+            oldData = new byte[oldDataLength];
+            buffer.get(oldData);
+            isLatest = readIsLatestFlag(buffer);
+        } else if (newOldFlag == (byte) 4) {
+            oldData = null;
+            final int newDataLength = data.length - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE;
+            newData = new byte[newDataLength];
+            buffer.get(newData);
+            isLatest = readIsLatestFlag(buffer);
+        } else if (newOldFlag == (byte) 5) {
+            final int newDataLength = Math.toIntExact(ByteUtils.readUnsignedInt(buffer));
+            newData = new byte[newDataLength];
+
+            final int oldDataLength = data.length - Integer.BYTES - newDataLength - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE;
+            oldData = new byte[oldDataLength];
+
+            buffer.get(newData);
+            buffer.get(oldData);
+            isLatest = readIsLatestFlag(buffer);
         } else {
             throw new StreamsException("Encountered unknown byte value `" + newOldFlag + "` for oldNewFlag in ChangedDeserializer.");
         }
 
         return new Change<>(
-                inner.deserialize(topic, headers, newData),
-                inner.deserialize(topic, headers, oldData));
+            inner.deserialize(topic, headers, newData),
+            inner.deserialize(topic, headers, oldData),
+            isLatest);
+    }
+
+    private boolean readIsLatestFlag(final ByteBuffer buffer) {
+        final byte isLatestFlag = buffer.get(buffer.capacity() - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE);
+        if (isLatestFlag == (byte) 1) {
+            return true;
+        } else if (isLatestFlag == (byte) 0) {
+            return false;
+        } else {
+            throw new StreamsException("Encountered unexpected byte value `" + isLatestFlag + "` for isLatestFlag in ChangedDeserializer.");
+        }
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
index 21dbdb950df..475aa53ef0b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.junit.Assert;
@@ -36,12 +37,16 @@ import static org.junit.Assert.assertThrows;
 public class ChangedSerdeTest {
     private static final String TOPIC = "some-topic";
 
+    private static final Serializer<String> STRING_SERIALIZER = Serdes.String().serializer();
     private static final ChangedSerializer<String> CHANGED_STRING_SERIALIZER =
-            new ChangedSerializer<>(Serdes.String().serializer());
-
+            new ChangedSerializer<>(STRING_SERIALIZER);
     private static final ChangedDeserializer<String> CHANGED_STRING_DESERIALIZER =
             new ChangedDeserializer<>(Serdes.String().deserializer());
 
+    private static final int NEW_OLD_FLAG_SIZE = 1;
+    private static final int IS_LATEST_FLAG_SIZE = 1;
+    private static final int UINT32_SIZE = 4;
+
     final String nonNullNewValue = "hello";
     final String nonNullOldValue = "world";
 
@@ -106,4 +111,65 @@ public class ChangedSerdeTest {
             StreamsException.class,
             () -> CHANGED_STRING_DESERIALIZER.deserialize(TOPIC, serialized));
     }
+
+    @Test
+    public void shouldDeserializeReservedVersions3Through5() {
+        // `isLatest = true`
+        checkRoundTripForReservedVersion(new Change<>(nonNullNewValue, null, true));
+        checkRoundTripForReservedVersion(new Change<>(null, nonNullOldValue, true));
+        checkRoundTripForReservedVersion(new Change<>(nonNullNewValue, nonNullOldValue, true));
+
+        // `isLatest = false`
+        checkRoundTripForReservedVersion(new Change<>(nonNullNewValue, null, false));
+        checkRoundTripForReservedVersion(new Change<>(null, nonNullOldValue, false));
+        checkRoundTripForReservedVersion(new Change<>(nonNullNewValue, nonNullOldValue, false));
+    }
+
+    // versions 3 through 5 are reserved in the deserializer in case we want to use them in the
+    // future (in which case we save users from needing to perform another rolling upgrade by
+    // introducing these reserved versions in the same AK release as version 2).
+    // so, this serialization code is not actually in the serializer itself, but only here for
+    // now for purposes of testing the deserializer.
+    private static byte[] serializeVersions3Through5(final String topic, final Change<String> data) {
+        final boolean oldValueIsNotNull = data.oldValue != null;
+        final boolean newValueIsNotNull = data.newValue != null;
+
+        final byte[] newData = STRING_SERIALIZER.serialize(topic, null, data.newValue);
+        final byte[] oldData = STRING_SERIALIZER.serialize(topic, null, data.oldValue);
+
+        final int newDataLength = newValueIsNotNull ? newData.length : 0;
+        final int oldDataLength = oldValueIsNotNull ? oldData.length : 0;
+
+        // The serialization format is:
+        // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3}
+        // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4}
+        // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5}
+        final ByteBuffer buf;
+        final byte isLatest = data.isLatest ? (byte) 1 : (byte) 0;
+        if (newValueIsNotNull && oldValueIsNotNull) {
+            final int capacity = UINT32_SIZE + newDataLength + oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
+            buf = ByteBuffer.allocate(capacity);
+            ByteUtils.writeUnsignedInt(buf, newDataLength);
+            buf.put(newData).put(oldData).put(isLatest).put((byte) 5);
+        } else if (newValueIsNotNull) {
+            final int capacity = newDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
+            buf = ByteBuffer.allocate(capacity);
+            buf.put(newData).put(isLatest).put((byte) 4);
+        } else if (oldValueIsNotNull) {
+            final int capacity = oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
+            buf = ByteBuffer.allocate(capacity);
+            buf.put(oldData).put(isLatest).put((byte) 3);
+        } else {
+            throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
+        }
+
+        return buf.array();
+    }
+
+    private static void checkRoundTripForReservedVersion(final Change<String> data) {
+        final byte[] serialized = serializeVersions3Through5(TOPIC, data);
+        assertThat(serialized, is(notNullValue()));
+        final Change<String> deserialized = CHANGED_STRING_DESERIALIZER.deserialize(TOPIC, serialized);
+        assertThat(deserialized, is(data));
+    }
 }