You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2022/07/18 21:11:58 UTC

[kafka] branch 3.3 updated: KAFKA-13769 Fix version check in SubscriptionJoinForeignProcessorSupplier (#12420)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 9dd25ecd9c KAFKA-13769 Fix version check in SubscriptionJoinForeignProcessorSupplier (#12420)
9dd25ecd9c is described below

commit 9dd25ecd9ce17e608c6aba98e0422b26ed133c12
Author: Alex Sorokoumov <91...@users.noreply.github.com>
AuthorDate: Mon Jul 18 23:10:02 2022 +0200

    KAFKA-13769 Fix version check in SubscriptionJoinForeignProcessorSupplier (#12420)
    
    This commit changes the version check from != to > as the process method
    works correctly on both version 1 and 2. != incorrectly throws on v1
    records.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java        | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
index fea8e73867..56d6a13321 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
@@ -70,7 +70,7 @@ public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
                 Objects.requireNonNull(valueAndTimestamp, "This processor should never see a null newValue.");
                 final SubscriptionWrapper<K> value = valueAndTimestamp.value();
 
-                if (value.getVersion() != SubscriptionWrapper.CURRENT_VERSION) {
+                if (value.getVersion() > SubscriptionWrapper.CURRENT_VERSION) {
                     //Guard against modifications to SubscriptionWrapper. Need to ensure that there is compatibility
                     //with previous versions to enable rolling upgrades. Must develop a strategy for upgrading
                     //from older SubscriptionWrapper versions to newer versions.