You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/01 14:48:36 UTC

[GitHub] [kafka] lkokhreidze opened a new pull request #10802: KAFKA-6718 / Part1: Update SubscriptionInfoData with clientTags

lkokhreidze opened a new pull request #10802:
URL: https://github.com/apache/kafka/pull/10802


   Part 1 of [KIP-708](https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams) implementation. Full implementation can be checked here https://github.com/apache/kafka/pull/10785. Splitting PRs into three smaller PRs to make the review process easier to follow. Overall plan is the following:
   
   👉  Part-1: Protocol change, add `clientTags` to `SubscriptionInfoData`
   ⏭️  Part-2: Rack aware standby task assignment logic.
   ⏭️  Part-3: Add required configurations to `StreamsConfig` (public API change, at this point we should have full functionality)
   
   This PR adds `clientTags` to `SubscriptionInfoData` and implements Part-1 of above mentioned plan.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lkokhreidze commented on pull request #10802: KAFKA-6718 / Part1: Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
lkokhreidze commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-852188025


   Call for review @cadonna @vvcephei @ableegoldman 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-1063745684


   @lkokhreidze Could you please rebase this PR on latest trunk? Some system tests fail probably due to the absence of a fix on the PR branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lkokhreidze commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
lkokhreidze commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-1060325078


   Hi @cadonna @showuon 
   Sorry for the ping, but was wondering if it's possible to have a look at this PR this week?
   After this is merged, I'll have all the pieces to glue everything together in the last PR of this KIP https://github.com/apache/kafka/pull/11837
   
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-1063569496


   I'll start to take a look this week. Sorry for late response.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lkokhreidze commented on pull request #10802: KAFKA-6718 / Part1: Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
lkokhreidze commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-856813901


   Hi @cadonna 
   Giving it a bit more thought around the order of PRs, logically it makes more sense to have this PR first, as TaskAssignor gets data from the subscription info. 
   Coming back to your point about bumping the version increasing the number of rebalances in a rolling upgrade scenario - considering that version for the 3.0 release was already bumped either way via https://github.com/apache/kafka/pull/10609 do you think it's still a problem?
   I would prefer to finalise this PR first and avoid more context switching, but of course if it's really needed I can switch to task assignor implementation.
   
   Thanks again for the feedback!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lkokhreidze edited a comment on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
lkokhreidze edited a comment on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-857567693


   Hi @cadonna
   FYI, I've opened a PR around rack aware standby task assignment https://github.com/apache/kafka/pull/10851


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-1063569496


   I'll start to take a look this week. Sorry for late.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lkokhreidze commented on pull request #10802: KAFKA-6718 / Part1: Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
lkokhreidze commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-856507161


   Hi @cadonna @vvcephei @ableegoldman, I know it's busy times with 3.0 release on the radar, but would appreciate review on this PR when you got time. Thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lkokhreidze commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
lkokhreidze commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-857567693


   Hi @cadonna
   FYI, I've opened a PR around rack aware standby task assignment 
   https://github.com/apache/kafka/pull/10851


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lkokhreidze commented on pull request #10802: KAFKA-6718 / Part1: Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
lkokhreidze commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-856797369


   Hi @cadonna I've addressed all of your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lkokhreidze edited a comment on pull request #10802: KAFKA-6718 / Part1: Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
lkokhreidze edited a comment on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-856813901


   Hi @cadonna 
   Giving it a bit more thought around the order of PRs, logically it makes more sense to have this PR first, as TaskAssignor gets data from the subscription info. 
   Coming back to your point about bumping the version increasing the number of rebalances in a rolling upgrade scenario - considering that protocol version for the 3.0 was already increased via https://github.com/apache/kafka/pull/10609, do you think it's still a problem?
   I would prefer to finalise this PR first and avoid more context switching, but of course if it's really needed I can switch to task assignor implementation.
   
   Thanks again for the feedback!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10802: KAFKA-6718 / Part1: Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#discussion_r647295043



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
##########
@@ -407,32 +409,56 @@ public void shouldNotErrorAccessingFutureVars() {
     @Test
     public void shouldEncodeAndDecodeVersion9() {
         final SubscriptionInfo info =
-                new SubscriptionInfo(9, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+                new SubscriptionInfo(9, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap());
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion10() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap());
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion10WithNamedTopologies() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap());
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldThrowIfAttemptingToUseNamedTopologiesWithOlderVersion() {
         assertThrows(
             TaskAssignmentException.class,
-            () -> new SubscriptionInfo(MIN_NAMED_TOPOLOGY_VERSION - 1, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE)
+            () -> new SubscriptionInfo(MIN_NAMED_TOPOLOGY_VERSION - 1, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap())
         );
     }
 
+    @Test
+    public void shouldEncodeAndDecodeVersion11() {
+        final SubscriptionInfo info =
+            new SubscriptionInfo(11, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, mkMap(mkEntry("t1", "v1")));

Review comment:
       Could you use a map with more than just one entry? If you use the same map in multiple tests, you should put it into a class field. The same applies to the tests below.

##########
File path: streams/src/main/resources/common/message/SubscriptionInfoData.json
##########
@@ -135,6 +140,22 @@
           "type": "int64"
         }
       ]
+    },
+    {
+      "name": "ClientTag",
+      "versions": "1+",

Review comment:
       I think this should be 11+. While technically it probably does not make any difference, it better documents when the struct was introduced.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
##########
@@ -96,8 +98,8 @@ public void shouldThrowForUnknownVersion1() {
             "localhost:80",
             TASK_OFFSET_SUMS,
             IGNORED_UNIQUE_FIELD,
-            IGNORED_ERROR_CODE
-        ));
+            IGNORED_ERROR_CODE,
+            Collections.emptyMap()));

Review comment:
       Could you use a static final variable named `IGNORED_CLIENT_TAGS` to better document the code as was done for some other fields?
   Here and below.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##########
@@ -125,10 +130,33 @@ private SubscriptionInfo(final SubscriptionInfoData subscriptionInfoData) {
         this.data = subscriptionInfoData;
     }
 
+    public Map<String, String> clientTags() {
+        return data.clientTags()
+                   .stream()
+                   .collect(
+                       Collectors.toMap(
+                           clientTag -> new String(clientTag.key(), StandardCharsets.UTF_8),
+                           clientTag -> new String(clientTag.value(), StandardCharsets.UTF_8)
+                       )
+                   );

Review comment:
       nit:
   ```suggestion
           return data.clientTags().stream()
               .collect(
                   Collectors.toMap(
                       clientTag -> new String(clientTag.key(), StandardCharsets.UTF_8),
                       clientTag -> new String(clientTag.value(), StandardCharsets.UTF_8)
                   )
               );
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
##########
@@ -407,32 +409,56 @@ public void shouldNotErrorAccessingFutureVars() {
     @Test
     public void shouldEncodeAndDecodeVersion9() {
         final SubscriptionInfo info =
-                new SubscriptionInfo(9, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+                new SubscriptionInfo(9, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap());
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion10() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap());
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion10WithNamedTopologies() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap());
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldThrowIfAttemptingToUseNamedTopologiesWithOlderVersion() {
         assertThrows(
             TaskAssignmentException.class,
-            () -> new SubscriptionInfo(MIN_NAMED_TOPOLOGY_VERSION - 1, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE)
+            () -> new SubscriptionInfo(MIN_NAMED_TOPOLOGY_VERSION - 1, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap())
         );
     }
 
+    @Test
+    public void shouldEncodeAndDecodeVersion11() {
+        final SubscriptionInfo info =
+            new SubscriptionInfo(11, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, mkMap(mkEntry("t1", "v1")));
+        assertThat(info, is(SubscriptionInfo.decode(info.encode())));
+    }
+
+    @Test
+    public void shouldReturnEmptyMapOfClientTagsOnOlderVersions() {
+        final SubscriptionInfo info =
+            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, mkMap(mkEntry("t1", "v1")));
+
+        assertThat(info.clientTags(), is(anEmptyMap()));
+    }
+
+    @Test
+    public void shouldReturnMapOfClientTagsOnVersion11() {
+        final Map<String, String> clientTags = mkMap(mkEntry("t1", "v1"));
+        final SubscriptionInfo info =
+            new SubscriptionInfo(11, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, clientTags);
+
+        assertThat(info.clientTags(), is(clientTags));
+    }
+

Review comment:
       Could you add a test to verify what happens when an empty client tags map is passed to a version 11 subscription info?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##########
@@ -125,10 +130,33 @@ private SubscriptionInfo(final SubscriptionInfoData subscriptionInfoData) {
         this.data = subscriptionInfoData;
     }
 
+    public Map<String, String> clientTags() {
+        return data.clientTags()
+                   .stream()
+                   .collect(
+                       Collectors.toMap(
+                           clientTag -> new String(clientTag.key(), StandardCharsets.UTF_8),
+                           clientTag -> new String(clientTag.value(), StandardCharsets.UTF_8)
+                       )
+                   );
+    }
+
     public int errorCode() {
         return data.errorCode();
     }
 
+    private List<ClientTag> buildClientTagsFromMap(final Map<String, String> clientTags) {
+        return clientTags.entrySet()
+                         .stream()
+                         .map(clientTagEntry -> {
+                             final ClientTag clientTag = new ClientTag();
+                             clientTag.setKey(clientTagEntry.getKey().getBytes(StandardCharsets.UTF_8));
+                             clientTag.setValue(clientTagEntry.getValue().getBytes(StandardCharsets.UTF_8));
+                             return clientTag;
+                         })
+                         .collect(Collectors.toList());
+    }

Review comment:
       nit:
   ```suggestion
       private List<ClientTag> buildClientTagsFromMap(final Map<String, String> clientTags) {
           return clientTags.entrySet().stream()
               .map(clientTagEntry -> {
                   final ClientTag clientTag = new ClientTag();
                   clientTag.setKey(clientTagEntry.getKey().getBytes(StandardCharsets.UTF_8));
                   clientTag.setValue(clientTagEntry.getValue().getBytes(StandardCharsets.UTF_8));
                   return clientTag;
               })
               .collect(Collectors.toList());
       }
   ```

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -474,8 +474,8 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                     monitors[first_other_processor] = first_other_monitor
                     monitors[second_other_processor] = second_other_monitor
 
-                    version_probing_message = "Sent a version 10 subscription and got version 10 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 10 and trigger new rebalance.",
-                    end_of_upgrade_message = "Sent a version 10 subscription and group.s latest commonly supported version is 11 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 11 for next rebalance."
+                    version_probing_message = "Sent a version 11 subscription and got version 11 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 10 and trigger new rebalance.",
+                    end_of_upgrade_message = "Sent a version 11 subscription and group.s latest commonly supported version is 12 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 12 for next rebalance."

Review comment:
       Awesome that you thought about this!

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
##########
@@ -161,8 +162,8 @@ public ByteBuffer subscriptionUserData(final Set<String> topics) {
                     userEndPoint(),
                     taskManager.getTaskOffsetSums(),
                     uniqueField,
-                    0
-                ).encode();
+                    0,
+                    Collections.emptyMap()).encode();

Review comment:
       Could you put the map into variable with a meaningful name?
   Would make sense to use a non-empty map here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lkokhreidze commented on a change in pull request #10802: KAFKA-6718 / Part1: Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
lkokhreidze commented on a change in pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#discussion_r647470689



##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
##########
@@ -161,8 +162,8 @@ public ByteBuffer subscriptionUserData(final Set<String> topics) {
                     userEndPoint(),
                     taskManager.getTaskOffsetSums(),
                     uniqueField,
-                    0
-                ).encode();
+                    0,
+                    Collections.emptyMap()).encode();

Review comment:
       Makes sense. Added non-empty map as a static final field.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna edited a comment on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
cadonna edited a comment on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-1060511060


   @lkokhreidze Thank you for the ping! I will plan to look at your PR this week. Let's try to get the whole feature merged before feature freeze for AK 3.2.0 which is March 16th!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-1060511060


   @lkokhreidze Thank you for the ping! I will plan to look at your PR this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon merged pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
showuon merged pull request #10802:
URL: https://github.com/apache/kafka/pull/10802


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lkokhreidze commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
lkokhreidze commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-1063755741


   @cadonna done!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lkokhreidze commented on pull request #10802: KAFKA-6718 / Part1: Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
lkokhreidze commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-856654402


   Hi @cadonna,
   Thanks for the feedback. I think that's a good call. I didn't know that consequence of bumping the version. I will open PR around assignor changes shortly, and re-brand this PR as "Part 2"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-1064887225


   Failed tests are unrelated:
   ```
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOffsetSyncsTopicsOnTarget()
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorClientPolicyIntegrationTest.testCreateWithNoAllowedOverridesForNonePolicy
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.SinkConnectorsIntegrationTest.testEagerConsumerPartitionAssignment
       Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testClose()
       Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testClose()
       Build / JDK 11 and Scala 2.13 / kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment()
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest.testSwitchingToTopicCreationEnabled
       Build / JDK 17 and Scala 2.13 / kafka.api.ConsumerBounceTest.testClose()
       Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testClose()
       Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testClose()
       Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
       Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
       Build / JDK 17 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
       Build / JDK 17 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#discussion_r824488286



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -477,7 +477,7 @@ def do_rolling_bounce(self, processor, counter, current_generation):
                     monitors[first_other_processor] = first_other_monitor
                     monitors[second_other_processor] = second_other_monitor
 
-                    highest_version = 10
+                    highest_version = 11

Review comment:
       Thanks for remembering the system tests!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-1063898824


   @lkokhreidze The system tests passed:
   
   http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2022-03-10--001.system-test-kafka-branch-builder--1646904256--lkokhreidze--KAFKA-6718-part1-subscription-info-changes--3e73e5b962/report.html
   
   @showuon Could you merge this PR after your review? (Of course, only if the PR looks good to you)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org