You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2022/07/18 21:10:17 UTC
[kafka] branch trunk updated: KAFKA-13769 Fix version check in SubscriptionJoinForeignProcessorSupplier (#12420)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4eef28018a KAFKA-13769 Fix version check in SubscriptionJoinForeignProcessorSupplier (#12420)
4eef28018a is described below
commit 4eef28018a301fb0b858dce25d73b0f7c5f7dde7
Author: Alex Sorokoumov <91...@users.noreply.github.com>
AuthorDate: Mon Jul 18 23:10:02 2022 +0200
KAFKA-13769 Fix version check in SubscriptionJoinForeignProcessorSupplier (#12420)
This commit changes the version check from != to > as the process method
works correctly on both version 1 and 2. != incorrectly throws on v1
records.
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
index fea8e73867..56d6a13321 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
@@ -70,7 +70,7 @@ public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
Objects.requireNonNull(valueAndTimestamp, "This processor should never see a null newValue.");
final SubscriptionWrapper<K> value = valueAndTimestamp.value();
- if (value.getVersion() != SubscriptionWrapper.CURRENT_VERSION) {
+ if (value.getVersion() > SubscriptionWrapper.CURRENT_VERSION) {
//Guard against modifications to SubscriptionWrapper. Need to ensure that there is compatibility
//with previous versions to enable rolling upgrades. Must develop a strategy for upgrading
//from older SubscriptionWrapper versions to newer versions.