You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/18 13:13:41 UTC

[GitHub] [kafka] Gerrrr opened a new pull request, #12535: KAFKA-13769 Fix version check in SubscriptionStoreReceiveProcessorSupplier

Gerrrr opened a new pull request, #12535:
URL: https://github.com/apache/kafka/pull/12535

   This patch fixes another incorrect version check in the FK code and adds unit tests that would have caught this bug.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Gerrrr commented on a diff in pull request #12535: KAFKA-13769 Fix version check in SubscriptionStoreReceiveProcessorSupplier

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on code in PR #12535:
URL: https://github.com/apache/kafka/pull/12535#discussion_r949398138


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java:
##########
@@ -38,8 +38,22 @@ public KP getPrimaryKey() {
         return primaryKey;
     }
 
-    public boolean equals(final KF foreignKey, final KP primaryKey) {
-        return this.foreignKey.equals(foreignKey) && this.primaryKey.equals(primaryKey);
+    @Override
+    public int hashCode() {
+        return Objects.hash(foreignKey, primaryKey);
+    }

Review Comment:
   Both fields of `CombinedKey` are immutable:
   
   https://github.com/apache/kafka/blob/2ff4c0a364379d231edfe1836042574359b1023e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java#L22-L24



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vvcephei commented on a diff in pull request #12535: KAFKA-13769 Fix version check in SubscriptionStoreReceiveProcessorSupplier

Posted by GitBox <gi...@apache.org>.
vvcephei commented on code in PR #12535:
URL: https://github.com/apache/kafka/pull/12535#discussion_r949388743


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java:
##########
@@ -91,7 +91,7 @@ public void process(final Record<KO, SubscriptionWrapper<K>> record) {
                     droppedRecordsSensor.record();
                     return;
                 }
-                if (record.value().getVersion() != SubscriptionWrapper.CURRENT_VERSION) {
+                if (record.value().getVersion() > SubscriptionWrapper.CURRENT_VERSION) {

Review Comment:
   I assume that you've now searched the code base for every usage of `getVersion` and `CURRENT_VERSION` to make sure we don't have any more sneaky bugs, right?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java:
##########
@@ -38,8 +38,22 @@ public KP getPrimaryKey() {
         return primaryKey;
     }
 
-    public boolean equals(final KF foreignKey, final KP primaryKey) {
-        return this.foreignKey.equals(foreignKey) && this.primaryKey.equals(primaryKey);
+    @Override
+    public int hashCode() {
+        return Objects.hash(foreignKey, primaryKey);
+    }

Review Comment:
   Do we need to implement hashCode? It's not free of side effects. If we don't actually need it, but also want to have a correct hashCode/equals contract, you can throw an UnsupportedOperationException from the hashCode method, effectively preventing anyone from using these objects in hash collections.
   
   Why would you want to do that? Mostly if you can't guarantee that all fields are immutable. OTOH, if all fields are immutable, then it's safe to just implement it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Gerrrr commented on a diff in pull request #12535: KAFKA-13769 Fix version check in SubscriptionStoreReceiveProcessorSupplier

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on code in PR #12535:
URL: https://github.com/apache/kafka/pull/12535#discussion_r949396415


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java:
##########
@@ -91,7 +91,7 @@ public void process(final Record<KO, SubscriptionWrapper<K>> record) {
                     droppedRecordsSensor.record();
                     return;
                 }
-                if (record.value().getVersion() != SubscriptionWrapper.CURRENT_VERSION) {
+                if (record.value().getVersion() > SubscriptionWrapper.CURRENT_VERSION) {

Review Comment:
   There is https://github.com/apache/kafka/blob/2ff4c0a364379d231edfe1836042574359b1023e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java#L85-L91
   
   I did not update it because the version check there is for `SubscriptionResponseWrapper` and its version wasn't incremented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Gerrrr commented on pull request #12535: KAFKA-13769 Fix version check in SubscriptionStoreReceiveProcessorSupplier

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on PR #12535:
URL: https://github.com/apache/kafka/pull/12535#issuecomment-1219496131

   @vvcephei Can you please review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vvcephei commented on a diff in pull request #12535: KAFKA-13769 Fix version check in SubscriptionStoreReceiveProcessorSupplier

Posted by GitBox <gi...@apache.org>.
vvcephei commented on code in PR #12535:
URL: https://github.com/apache/kafka/pull/12535#discussion_r949454530


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java:
##########
@@ -91,7 +91,7 @@ public void process(final Record<KO, SubscriptionWrapper<K>> record) {
                     droppedRecordsSensor.record();
                     return;
                 }
-                if (record.value().getVersion() != SubscriptionWrapper.CURRENT_VERSION) {
+                if (record.value().getVersion() > SubscriptionWrapper.CURRENT_VERSION) {

Review Comment:
   Ok, let's make sure we follow-up with that fix as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Gerrrr commented on a diff in pull request #12535: KAFKA-13769 Fix version check in SubscriptionStoreReceiveProcessorSupplier

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on code in PR #12535:
URL: https://github.com/apache/kafka/pull/12535#discussion_r949398138


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java:
##########
@@ -38,8 +38,22 @@ public KP getPrimaryKey() {
         return primaryKey;
     }
 
-    public boolean equals(final KF foreignKey, final KP primaryKey) {
-        return this.foreignKey.equals(foreignKey) && this.primaryKey.equals(primaryKey);
+    @Override
+    public int hashCode() {
+        return Objects.hash(foreignKey, primaryKey);
+    }

Review Comment:
   Both fields of `CombinedKey` are immutable:
   
   https://github.com/apache/kafka/blob/2ff4c0a364379d231edfe1836042574359b1023e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java#L22-L24
   
   so I'd keep the `hashCode` implementation unless you insist on replacing it with `UnsupportedOperationException`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Gerrrr commented on pull request #12535: KAFKA-13769 Fix version check in SubscriptionStoreReceiveProcessorSupplier

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on PR #12535:
URL: https://github.com/apache/kafka/pull/12535#issuecomment-1219722137

   Test failure:
   
   ```
   [2022-08-18T14:24:17.527Z] FetchRequestTestDowngrade > testTopicIdIsRemovedFromFetcherWhenControllerDowngrades() STARTED
   [2022-08-18T14:24:17.527Z] integration.kafka.server.FetchRequestTestDowngrade.testTopicIdIsRemovedFromFetcherWhenControllerDowngrades() failed, log available in /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-12535/core/build/reports/testOutput/integration.kafka.server.FetchRequestTestDowngrade.testTopicIdIsRemovedFromFetcherWhenControllerDowngrades().test.stdout
   [2022-08-18T14:24:17.527Z] 
   [2022-08-18T14:24:17.527Z] FetchRequestTestDowngrade > testTopicIdIsRemovedFromFetcherWhenControllerDowngrades() FAILED
   [2022-08-18T14:24:17.527Z]     org.apache.kafka.common.errors.TopicExistsException: Topic '__consumer_offsets' already exists.
   ```
   
   seems unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vvcephei merged pull request #12535: KAFKA-13769 Fix version check in SubscriptionStoreReceiveProcessorSupplier

Posted by GitBox <gi...@apache.org>.
vvcephei merged PR #12535:
URL: https://github.com/apache/kafka/pull/12535


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Gerrrr commented on pull request #12535: KAFKA-13769 Fix version check in SubscriptionStoreReceiveProcessorSupplier

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on PR #12535:
URL: https://github.com/apache/kafka/pull/12535#issuecomment-1219734073

   I added an upgrade test back in https://github.com/apache/kafka/pull/12122. Unfortunately, it didn't catch these version check bugs. I am going to figure out why and improve the test. I'd prefer to do that in a separate patch though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vvcephei commented on a diff in pull request #12535: KAFKA-13769 Fix version check in SubscriptionStoreReceiveProcessorSupplier

Posted by GitBox <gi...@apache.org>.
vvcephei commented on code in PR #12535:
URL: https://github.com/apache/kafka/pull/12535#discussion_r949454836


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java:
##########
@@ -38,8 +38,22 @@ public KP getPrimaryKey() {
         return primaryKey;
     }
 
-    public boolean equals(final KF foreignKey, final KP primaryKey) {
-        return this.foreignKey.equals(foreignKey) && this.primaryKey.equals(primaryKey);
+    @Override
+    public int hashCode() {
+        return Objects.hash(foreignKey, primaryKey);
+    }

Review Comment:
   Sounds good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vvcephei commented on pull request #12535: KAFKA-13769 Fix version check in SubscriptionStoreReceiveProcessorSupplier

Posted by GitBox <gi...@apache.org>.
vvcephei commented on PR #12535:
URL: https://github.com/apache/kafka/pull/12535#issuecomment-1219795362

   Test failure was unrelated:
   
   ```
   [Build / JDK 11 and Scala 2.13 / integration.kafka.server.FetchRequestTestDowngrade.testTopicIdIsRemovedFromFetcherWhenControllerDowngrades()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12535/1/testReport/junit/integration.kafka.server/FetchRequestTestDowngrade/Build___JDK_11_and_Scala_2_13___testTopicIdIsRemovedFromFetcherWhenControllerDowngrades__/)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org