You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/19 22:22:51 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Directly use Arrays.equals for version comparison

vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r443062971



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -339,7 +341,7 @@ private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch
                             recordContext
                         )
                     );
-                } else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
+                } else if (Arrays.equals(record.headers().lastHeader("v").value(), V_1_CHANGELOG_HEADER_VALUE)) {

Review comment:
       This is the fix (although it was probably fine before). The implementation of Header.equals is not specified by any contract, so it's safer to perform a direct comparison on the header values. Just as before, I'm comparing byte arrays to avoid deserializing the value.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -286,6 +289,15 @@ private void logTombstone(final Bytes key) {
 
     private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch) {
         for (final ConsumerRecord<byte[], byte[]> record : batch) {
+            if (record.partition() != partition) {
+                throw new IllegalStateException(
+                    String.format(
+                        "record partition [%d] is being restored by the wrong suppress partition [%d]",
+                        record.partition(),
+                        partition
+                    )
+                );
+            }

Review comment:
       On the side, I realized we can consolidate this check and perform it first, rather than after we're already written bad data into the buffer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org