You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/09 13:45:47 UTC

[GitHub] [kafka] dengziming opened a new pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

dengziming opened a new pull request #10512:
URL: https://github.com/apache/kafka/pull/10512


   *More detailed description of your change*
   1. Remove PartitionAssignor and related classes, update docs
   2. Move unit test
   3. fix some related typo
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610954191



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateAssignor() {
+        classNames = Collections.singletonList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldInstantiateListOfAssignors() {
+        classNames = Arrays.asList(StickyAssignor.class.getName(), CooperativeStickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnNonAssignor() {
+        classNames = Collections.singletonList(String.class.getName());
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+        classNames = Collections.singletonList("Non-existent assignor");
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldInstantiateFromClassType() {

Review comment:
       Ah, I was suggesting to just replicate the `shouldInstantiateAssignor` and `shouldInstantiateListOfAssignors` tests exactly, but with the `classTypes` being eg `StickyAssignor.class` instead of `StickyAssignor.class.getName()`. For example
   
   ```
   classNames = Collections.singletonList(StickyAssignor.class);
   List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
   assertTrue(assignors.get(0) instanceof StickyAssignor);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610942845



##########
File path: docs/upgrade.html
##########
@@ -49,8 +49,9 @@ <h5><a id="upgrade_300_notable" href="#upgrade_300_notable">Notable changes in 3
             were removed. These methods were not intended to be public API and there is no replacement.</li>
         <li>The <code>NoOffsetForPartitionException.partition()</code> method was removed. Please use <code>partitions()</code>
             instead.</li>
-        <li>The Scala <code>kafka.common.MessageFormatter</code> was removed. Plese use the Java <code>org.apache.kafka.common.MessageFormatter</code>.</li>
+        <li>The Scala <code>kafka.common.MessageFormatter</code> was removed. Please use the Java <code>org.apache.kafka.common.MessageFormatter</code>.</li>
         <li>The <code>MessageFormatter.init(Properties)</code> method was removed. Please use <code>configure(Map)</code> instead.</li>
+        <li>The <code>PartitionAssignor</code> classes have been removed. Please use <code>ConsumerPartitionAssignor</code> instead.</li>

Review comment:
       I changed it to the singular `class` intentionally since the `PartitionAssignorAdaptor` was never a public API, the only thing that was exposed to users -- and therefore the only thing they care or need to know about -- was the `PartitionAssignor`. Does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610955990



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateAssignor() {
+        classNames = Collections.singletonList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldInstantiateListOfAssignors() {
+        classNames = Arrays.asList(StickyAssignor.class.getName(), CooperativeStickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnNonAssignor() {
+        classNames = Collections.singletonList(String.class.getName());
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+        classNames = Collections.singletonList("Non-existent assignor");
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldInstantiateFromClassType() {

Review comment:
       It seems that `getAssignorInstances()` can only accept `List<String>` as the first parameter, so this could not work.😳




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610965340



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateAssignor() {
+        classNames = Collections.singletonList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldInstantiateListOfAssignors() {
+        classNames = Arrays.asList(StickyAssignor.class.getName(), CooperativeStickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnNonAssignor() {
+        classNames = Collections.singletonList(String.class.getName());
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+        classNames = Collections.singletonList("Non-existent assignor");
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldInstantiateFromClassType() {

Review comment:
       Yes, this awkward, I fixed this with some code refactoring and fixed the comment in upgrade.html, PTAL, Thank you for your patience.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#issuecomment-818367236


   Merged to trunk -- thanks @dengziming !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610949055



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateAssignor() {
+        classNames = Collections.singletonList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldInstantiateListOfAssignors() {
+        classNames = Arrays.asList(StickyAssignor.class.getName(), CooperativeStickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnNonAssignor() {
+        classNames = Collections.singletonList(String.class.getName());
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+        classNames = Collections.singletonList("Non-existent assignor");
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldInstantiateFromClassType() {

Review comment:
       `assignors ` is not visible in `KafkaConsumer`, we should add a `getAssignors()` in `KafkaConsumer` to test this. Maybe the other tests are enough to verify that the `getAssignorInstances` operate rightly, How about remove these 2 test cases?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#issuecomment-818366846


   Well there were a large number of failures, but all of them are unrelated. Left a comment/reopened the ticket for all of the following:
   
   kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication() 
   kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions() 
   kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()
   kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   .kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
   kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
dengziming commented on pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#issuecomment-816693711


   @ableegoldman , Hi, PTAL, thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610828209



##########
File path: docs/upgrade.html
##########
@@ -49,8 +49,9 @@ <h5><a id="upgrade_300_notable" href="#upgrade_300_notable">Notable changes in 3
             were removed. These methods were not intended to be public API and there is no replacement.</li>
         <li>The <code>NoOffsetForPartitionException.partition()</code> method was removed. Please use <code>partitions()</code>
             instead.</li>
-        <li>The Scala <code>kafka.common.MessageFormatter</code> was removed. Plese use the Java <code>org.apache.kafka.common.MessageFormatter</code>.</li>
+        <li>The Scala <code>kafka.common.MessageFormatter</code> was removed. Please use the Java <code>org.apache.kafka.common.MessageFormatter</code>.</li>
         <li>The <code>MessageFormatter.init(Properties)</code> method was removed. Please use <code>configure(Map)</code> instead.</li>
+        <li>The <code>PartitionAssignor</code> classes have been removed. Please use <code>ConsumerPartitionAssignor</code> instead.</li>

Review comment:
       ```suggestion
           <li>The deprecated <code>o.a.k.clients.consumer.internals.PartitionAssignor</code> class has been removed. Please use <code>ConsumerPartitionAssignor</code> instead.</li>
   ```

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateNewAssignors() {
+        classNames = Collections.singletonList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnNonAssignor() {
+        classNames = Collections.singletonList(String.class.getName());
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+        classNames = Collections.singletonList("Non-existent assignor");
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldInstantiateFromListOfOldAndNewClassTypes() {

Review comment:
       This particular test doesn't make sense any more, since there is no "old" assignor type now that PartitionAssignor is removed

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateNewAssignors() {
+        classNames = Collections.singletonList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);

Review comment:
       Can you add one more test like this one, but with more than one assignor in the list to instantiate?

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateNewAssignors() {

Review comment:
       ```suggestion
       public void shouldInstantiateAssignor() {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #10512:
URL: https://github.com/apache/kafka/pull/10512


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610966015



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateAssignor() {
+        classNames = Collections.singletonList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldInstantiateListOfAssignors() {
+        classNames = Arrays.asList(StickyAssignor.class.getName(), CooperativeStickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnNonAssignor() {
+        classNames = Collections.singletonList(String.class.getName());
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+        classNames = Collections.singletonList("Non-existent assignor");
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldInstantiateFromClassType() {

Review comment:
       Nice, thanks for the update. Looks good




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610944206



##########
File path: docs/upgrade.html
##########
@@ -49,8 +49,10 @@ <h5><a id="upgrade_300_notable" href="#upgrade_300_notable">Notable changes in 3
             were removed. These methods were not intended to be public API and there is no replacement.</li>
         <li>The <code>NoOffsetForPartitionException.partition()</code> method was removed. Please use <code>partitions()</code>
             instead.</li>
-        <li>The Scala <code>kafka.common.MessageFormatter</code> was removed. Plese use the Java <code>org.apache.kafka.common.MessageFormatter</code>.</li>
+        <li>The Scala <code>kafka.common.MessageFormatter</code> was removed. Please use the Java <code>org.apache.kafka.common.MessageFormatter</code>.</li>
         <li>The <code>MessageFormatter.init(Properties)</code> method was removed. Please use <code>configure(Map)</code> instead.</li>
+        <li>The deprecated <code>org.apache.kafka.clients.consumer.internals.PartitionAssignor</code> and related classes have been removed. Please use

Review comment:
       ```suggestion
           <li>The deprecated <code>org.apache.kafka.clients.consumer.internals.PartitionAssignor</code> class has been removed. Please use
   ```
   
   See my reply on the original comment 🙂 

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateAssignor() {
+        classNames = Collections.singletonList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldInstantiateListOfAssignors() {
+        classNames = Arrays.asList(StickyAssignor.class.getName(), CooperativeStickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnNonAssignor() {
+        classNames = Collections.singletonList(String.class.getName());
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+        classNames = Collections.singletonList("Non-existent assignor");
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldInstantiateFromClassType() {

Review comment:
       I think it would make sense to style this test (and `shouldInstantiateFromListOfClassTypes` below) more like `shouldInstantiateAssignors` now, ie where we actually validate the assignors that are returned (eg `assertTrue(assignors.get(0) instanceof StickyAssignor)`). Previously this test was just making sure that we adaptor would work and we wouldn't throw an exception when constructing the consumer, that's why it's like this 

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateAssignor() {
+        classNames = Collections.singletonList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldInstantiateListOfAssignors() {
+        classNames = Arrays.asList(StickyAssignor.class.getName(), CooperativeStickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);

Review comment:
       we should do this `assertTrue` thing for the CooperativeStickyAssignor as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610932197



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateNewAssignors() {
+        classNames = Collections.singletonList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnNonAssignor() {
+        classNames = Collections.singletonList(String.class.getName());
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+        classNames = Collections.singletonList("Non-existent assignor");
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldInstantiateFromListOfOldAndNewClassTypes() {

Review comment:
       I changed its name to `shouldInstantiateFromListOfClassTypes` and add a similar `shouldInstantiateFromClassType` to test "new" assignor, WDYT.

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateNewAssignors() {
+        classNames = Collections.singletonList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);

Review comment:
       Add `shouldInstantiateListOfAssignors`.

##########
File path: docs/upgrade.html
##########
@@ -49,8 +49,9 @@ <h5><a id="upgrade_300_notable" href="#upgrade_300_notable">Notable changes in 3
             were removed. These methods were not intended to be public API and there is no replacement.</li>
         <li>The <code>NoOffsetForPartitionException.partition()</code> method was removed. Please use <code>partitions()</code>
             instead.</li>
-        <li>The Scala <code>kafka.common.MessageFormatter</code> was removed. Plese use the Java <code>org.apache.kafka.common.MessageFormatter</code>.</li>
+        <li>The Scala <code>kafka.common.MessageFormatter</code> was removed. Please use the Java <code>org.apache.kafka.common.MessageFormatter</code>.</li>
         <li>The <code>MessageFormatter.init(Properties)</code> method was removed. Please use <code>configure(Map)</code> instead.</li>
+        <li>The <code>PartitionAssignor</code> classes have been removed. Please use <code>ConsumerPartitionAssignor</code> instead.</li>

Review comment:
       I reworded this and added a "and related classes" since we also removed `PartitionAssignorAdaptor`, WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610959238



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateAssignor() {
+        classNames = Collections.singletonList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldInstantiateListOfAssignors() {
+        classNames = Arrays.asList(StickyAssignor.class.getName(), CooperativeStickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnNonAssignor() {
+        classNames = Collections.singletonList(String.class.getName());
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+        classNames = Collections.singletonList("Non-existent assignor");
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldInstantiateFromClassType() {

Review comment:
       It's super awkward, obviously, but since this is what happens when we process the configs in the real code we should try to replicate that in the test




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610954191



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateAssignor() {
+        classNames = Collections.singletonList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldInstantiateListOfAssignors() {
+        classNames = Arrays.asList(StickyAssignor.class.getName(), CooperativeStickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnNonAssignor() {
+        classNames = Collections.singletonList(String.class.getName());
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+        classNames = Collections.singletonList("Non-existent assignor");
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldInstantiateFromClassType() {

Review comment:
       Ah, I was suggesting to just replicate the `shouldInstantiateAssignor` and `shouldInstantiateListOfAssignors` tests exactly, but with the `classTypes` being eg `StickyAssignor.class` instead of `StickyAssignor.class.getName()`. For example
   
   ```
   classNames = Collections.singletonList(StickyAssignor.class);
           List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
           assertTrue(assignors.get(0) instanceof StickyAssignor);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610943138



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
##########
@@ -254,4 +258,41 @@ public static RebalanceProtocol forId(byte id) {
         }
     }
 
+    /**
+     * Get a list of configured instances of {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor}
+     * based on the class names/types specified by {@link org.apache.kafka.clients.consumer.ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}
+     */
+    public static List<ConsumerPartitionAssignor> getAssignorInstances(List<String> assignorClasses, Map<String, Object> configs) {

Review comment:
       nit: this should be package private I think




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#issuecomment-818074480


   Hm, the build was aborted -- looks like the gradle daemon was killed or crashed for some reason? I'll kick off a new run, let's hope this one passes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
dengziming commented on pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#issuecomment-817232575


   ping @ableegoldman .😉


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#discussion_r610959058



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateAssignor() {
+        classNames = Collections.singletonList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldInstantiateListOfAssignors() {
+        classNames = Arrays.asList(StickyAssignor.class.getName(), CooperativeStickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnNonAssignor() {
+        classNames = Collections.singletonList(String.class.getName());
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+        classNames = Collections.singletonList("Non-existent assignor");
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldInstantiateFromClassType() {

Review comment:
       Ah, yeah, you'd need to do something more like what actually happens in the KafkaConsumer/`getAssignorInstances` code. eg
   ```
   @Test
   @SuppressWarnings("unchecked")
   public void shouldInstantiateAssignorClass() {
       Object classTypes = Collections.singletonList(StickyAssignor.class);
       List<ConsumerPartitionAssignor> assignors = getAssignorInstances((List<String>) classTypes, Collections.emptyMap());
        assertTrue(assignors.get(0) instanceof StickyAssignor);
   }
   ```

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.getAssignorInstances;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPartitionAssignorTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateAssignor() {
+        classNames = Collections.singletonList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldInstantiateListOfAssignors() {
+        classNames = Arrays.asList(StickyAssignor.class.getName(), CooperativeStickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames, Collections.emptyMap());
+        assertTrue(assignors.get(0) instanceof StickyAssignor);
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnNonAssignor() {
+        classNames = Collections.singletonList(String.class.getName());
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+        classNames = Collections.singletonList("Non-existent assignor");
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldInstantiateFromClassType() {

Review comment:
       Ah, yeah, you'd need to do something more like what actually happens in the actual KafkaConsumer/`getAssignorInstances` code. eg
   ```
   @Test
   @SuppressWarnings("unchecked")
   public void shouldInstantiateAssignorClass() {
       Object classTypes = Collections.singletonList(StickyAssignor.class);
       List<ConsumerPartitionAssignor> assignors = getAssignorInstances((List<String>) classTypes, Collections.emptyMap());
        assertTrue(assignors.get(0) instanceof StickyAssignor);
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org