You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/06 20:23:18 UTC

[GitHub] [kafka] mjsax opened a new pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

mjsax opened a new pull request #9384:
URL: https://github.com/apache/kafka/pull/9384


   Currently, we pass `AdminClient` and `TaskManager` into `StreamsPartitionAssignor` and use `TaskManager#mainConsumer()` to get access to the main consumer. However, TM also had a reference to `AdminClient` and thus we can simplify the setup by only passing the `TaskManager` reference.
   
   Call for review @vvcephei 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r504136735



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -900,9 +900,7 @@
 
         // These are not settable in the main Streams config; they are set by the StreamThread to pass internal
         // state into the assignor.
-        public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__";
-        public static final String STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = "__streams.metadata.state.instance__";
-        public static final String STREAMS_ADMIN_CLIENT = "__streams.admin.client.instance__";
+        public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR = "__reference.container.instance__";
         public static final String ASSIGNMENT_ERROR_CODE = "__assignment.error.code__";
         public static final String NEXT_SCHEDULED_REBALANCE_MS = "__next.probing.rebalance.ms__";
         public static final String TIME = "__time__";

Review comment:
       Should these be in the reference container as well?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -192,12 +191,11 @@ public String toString() {
      */
     @Override
     public void configure(final Map<String, ?> configs) {
-        final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(configs);
+        assignorConfiguration = new AssignorConfiguration(configs);

Review comment:
       Thanks. Should we instead make `referenceContainer` a field and access its members to get the references? The purpose of `AssignorConfiguration` is to parse the configs, not to be a general container for the configured values. Otherwise, we wouldn't need to assign any of the other fields here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -1091,8 +1096,4 @@ ConsumerRebalanceListener rebalanceListener() {
     Admin adminClient() {
         return adminClient;
     }
-
-    InternalTopologyBuilder internalTopologyBuilder() {

Review comment:
       Thanks!

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -120,10 +120,6 @@ void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
         this.mainConsumer = mainConsumer;
     }
 
-    Consumer<byte[], byte[]> mainConsumer() {

Review comment:
       Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r504763968



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -900,9 +900,7 @@
 
         // These are not settable in the main Streams config; they are set by the StreamThread to pass internal
         // state into the assignor.
-        public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__";
-        public static final String STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = "__streams.metadata.state.instance__";
-        public static final String STREAMS_ADMIN_CLIENT = "__streams.admin.client.instance__";
+        public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR = "__reference.container.instance__";
         public static final String ASSIGNMENT_ERROR_CODE = "__assignment.error.code__";
         public static final String NEXT_SCHEDULED_REBALANCE_MS = "__next.probing.rebalance.ms__";
         public static final String TIME = "__time__";

Review comment:
       Sorry, @mjsax , I was referring to all three of "assignment error code", "next scheduled rebalance ms", and "time".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r504190638



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -900,9 +900,7 @@
 
         // These are not settable in the main Streams config; they are set by the StreamThread to pass internal
         // state into the assignor.
-        public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__";
-        public static final String STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = "__streams.metadata.state.instance__";
-        public static final String STREAMS_ADMIN_CLIENT = "__streams.admin.client.instance__";
+        public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR = "__reference.container.instance__";
         public static final String ASSIGNMENT_ERROR_CODE = "__assignment.error.code__";
         public static final String NEXT_SCHEDULED_REBALANCE_MS = "__next.probing.rebalance.ms__";
         public static final String TIME = "__time__";

Review comment:
       Ack




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r504910261



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -900,9 +900,7 @@
 
         // These are not settable in the main Streams config; they are set by the StreamThread to pass internal
         // state into the assignor.
-        public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__";
-        public static final String STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = "__streams.metadata.state.instance__";
-        public static final String STREAMS_ADMIN_CLIENT = "__streams.admin.client.instance__";
+        public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR = "__reference.container.instance__";
         public static final String ASSIGNMENT_ERROR_CODE = "__assignment.error.code__";
         public static final String NEXT_SCHEDULED_REBALANCE_MS = "__next.probing.rebalance.ms__";
         public static final String TIME = "__time__";

Review comment:
       Ah. Ack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r502683615



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -192,12 +191,11 @@ public String toString() {
      */
     @Override
     public void configure(final Map<String, ?> configs) {
-        final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(configs);
+        assignorConfiguration = new AssignorConfiguration(configs);

Review comment:
       we need to keep a reference as we cannot get the `mainConsumer` at this point, as it will be set after this method finished

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -206,7 +204,6 @@ public void configure(final Map<String, ?> configs) {
         assignmentConfigs = assignorConfiguration.assignmentConfigs();
         partitionGrouper = assignorConfiguration.partitionGrouper();
         userEndPoint = assignorConfiguration.userEndPoint();
-        adminClient = assignorConfiguration.adminClient();

Review comment:
       Only used at a single place, just removeing the variable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r504183734



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -192,12 +191,11 @@ public String toString() {
      */
     @Override
     public void configure(final Map<String, ?> configs) {
-        final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(configs);
+        assignorConfiguration = new AssignorConfiguration(configs);

Review comment:
       Sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#issuecomment-708678450


   Address comments. @vvcephei 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#issuecomment-707963598


   @vvcephei Updated this PR (including PR description).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r502683418



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -1091,8 +1096,4 @@ ConsumerRebalanceListener rebalanceListener() {
     Admin adminClient() {
         return adminClient;
     }
-
-    InternalTopologyBuilder internalTopologyBuilder() {

Review comment:
       Remove unused method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #9384:
URL: https://github.com/apache/kafka/pull/9384


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r502683749



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -206,7 +204,6 @@ public void configure(final Map<String, ?> configs) {
         assignmentConfigs = assignorConfiguration.assignmentConfigs();
         partitionGrouper = assignorConfiguration.partitionGrouper();
         userEndPoint = assignorConfiguration.userEndPoint();
-        adminClient = assignorConfiguration.adminClient();

Review comment:
       Only used at a single place, just removing the variable.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -120,10 +120,6 @@ void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
         this.mainConsumer = mainConsumer;
     }
 
-    Consumer<byte[], byte[]> mainConsumer() {

Review comment:
       Decouple TM from StreamPartitionAssignor now and not "miss-use" it to get the consumer any longer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r500574645



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
##########
@@ -131,12 +130,13 @@
 
     // Make sure to complete setting up any mocks (such as TaskManager or AdminClient) before configuring the assignor
     private void configurePartitionAssignorWith(final Map<String, Object> props) {
+        EasyMock.replay(taskManager, adminClient);
+
         final Map<String, Object> configMap = configProps();
         configMap.putAll(props);
 
         streamsConfig = new StreamsConfig(configMap);
         partitionAssignor.configure(configMap);
-        EasyMock.replay(taskManager, adminClient);

Review comment:
       We need to setup the mocks before calling `partitionAssignor.configure()` now, as we call `taskManager#adminClient()` in this method (similar elsewhere)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r500574127



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##########
@@ -291,11 +271,7 @@ public String userEndPoint() {
         }
     }
 
-    public Admin adminClient() {
-        return adminClient;
-    }
-
-    public InternalTopicManager internalTopicManager() {
+    public InternalTopicManager internalTopicManager(final Admin adminClient) {

Review comment:
       Instead of passing the `Admin` as parameter, we could call `taskManager#adminClient()` in the next line, however, this requires to make the method `public` (it's package-private atm) thus I opted to for parameter-passing instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r504775858



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -192,12 +191,11 @@ public String toString() {
      */
     @Override
     public void configure(final Map<String, ?> configs) {
-        final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(configs);
+        assignorConfiguration = new AssignorConfiguration(configs);

Review comment:
       Thanks for the update! Are you still planning to drop the `assignorConfiguration` field in favor of a `referenceContainer` field?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r504909676



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -192,12 +191,11 @@ public String toString() {
      */
     @Override
     public void configure(final Map<String, ?> configs) {
-        final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(configs);
+        assignorConfiguration = new AssignorConfiguration(configs);

Review comment:
       I never intended to drop it. Maybe I miss understand your comment?
   
   We could replace the field `StreamsParttionAssignor#taskManager` etc with `StreamsPartitionAssigner#referenceContainer` but it just make the code lines longer each time we need to access the TM (etc). Thus, it seems to make the code more readable if we just "extract" those field from the reference container once?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r504910261



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -900,9 +900,7 @@
 
         // These are not settable in the main Streams config; they are set by the StreamThread to pass internal
         // state into the assignor.
-        public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__";
-        public static final String STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = "__streams.metadata.state.instance__";
-        public static final String STREAMS_ADMIN_CLIENT = "__streams.admin.client.instance__";
+        public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR = "__reference.container.instance__";
         public static final String ASSIGNMENT_ERROR_CODE = "__assignment.error.code__";
         public static final String NEXT_SCHEDULED_REBALANCE_MS = "__next.probing.rebalance.ms__";
         public static final String TIME = "__time__";

Review comment:
       Ah. Ack.
   
   Actually, we can also pull `INTERNAL_TASK_ASSIGNOR_CLASS` and `ASSIGNMENT_LISTENER` into the reference container.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r500574894



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
##########
@@ -203,8 +204,8 @@ public void shouldReturnAllActiveTasksToPreviousOwnerRegardlessOfBalanceAndTrigg
         builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor1");
         final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
 
-        createMockTaskManager(allTasks);
         adminClient = EasyMock.createMock(AdminClient.class);
+        createMockTaskManager(allTasks);

Review comment:
       We need to setup the admin mock before the TM mock now (similar elsewhere)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org