You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/04/19 23:29:55 UTC
[kafka] branch 3.4 updated: MINOR: update comment for FK join processor renames (#13610)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new 960110dd85f MINOR: update comment for FK join processor renames (#13610)
960110dd85f is described below
commit 960110dd85f3b4865d14579347542b07b522f283
Author: Victoria Xia <vi...@confluent.io>
AuthorDate: Wed Apr 19 19:29:45 2023 -0400
MINOR: update comment for FK join processor renames (#13610)
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../java/org/apache/kafka/streams/kstream/internals/KTableImpl.java | 2 +-
.../internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java | 2 +-
.../internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java | 2 +-
3 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 1dcaf56f078..f6b7f1a0874 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -1076,7 +1076,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
//not be done needlessly.
((KTableImpl<?, ?, ?>) foreignKeyTable).enableSendingOldValues(true);
- //Old values must be sent such that the ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the correct node.
+ //Old values must be sent such that the SubscriptionSendProcessorSupplier can propagate deletions to the correct node.
//This occurs whenever the extracted foreignKey changes values.
enableSendingOldValues(true);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
index 490f70ffc8d..7d31ef44223 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
@@ -33,7 +33,7 @@ import java.util.Objects;
/**
* Receives {@code SubscriptionWrapper<K>} events and processes them according to their Instruction.
* Depending on the results, {@code SubscriptionResponseWrapper}s are created, which will be propagated to
- * the {@code SubscriptionResolverJoinProcessorSupplier} instance.
+ * the {@code ResponseJoinProcessorSupplier} instance.
*
* @param <K> Type of primary keys
* @param <KO> Type of foreign key
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
index 5c386cc735b..cf88aec6f9a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
@@ -103,7 +103,7 @@ public class SubscriptionReceiveProcessorSupplier<K, KO>
final ValueAndTimestamp<SubscriptionWrapper<K>> newValue = ValueAndTimestamp.make(record.value(), record.timestamp());
final ValueAndTimestamp<SubscriptionWrapper<K>> oldValue = store.get(subscriptionKey);
- //This store is used by the prefix scanner in ForeignJoinSubscriptionProcessorSupplier
+ //This store is used by the prefix scanner in ForeignTableJoinProcessorSupplier
if (record.value().getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE) ||
record.value().getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE)) {
store.delete(subscriptionKey);