You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/04 09:40:24 UTC

[GitHub] [kafka] mimaison commented on a change in pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs

mimaison commented on a change in pull request #8311:
URL: https://github.com/apache/kafka/pull/8311#discussion_r435119315



##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -477,6 +484,27 @@ public void testCreateTopics() throws Exception {
         }
     }
 
+    @Test
+    public void testCreateTopicsPartialResponse() throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
+                    prepareCreateTopicsResponse("myTopic", Errors.NONE));
+            CreateTopicsResult topicsResult = env.adminClient().createTopics(
+                    asList(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2))),
+                           new NewTopic("myTopic2", Collections.singletonMap(0, asList(0, 1, 2)))),
+                    new CreateTopicsOptions().timeoutMs(10000));
+            topicsResult.values().get("myTopic").get();
+            try {
+                topicsResult.values().get("myTopic2").get();
+                fail("Expected an exception.");
+            } catch (ExecutionException e) {

Review comment:
       We can use `assertThrows()` here. Same below

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2234,20 +2251,27 @@ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica,
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
                     AlterReplicaLogDirsResponse response = (AlterReplicaLogDirsResponse) abstractResponse;
-                    for (Map.Entry<TopicPartition, Errors> responseEntry: response.responses().entrySet()) {
-                        TopicPartition tp = responseEntry.getKey();
-                        Errors error = responseEntry.getValue();
-                        TopicPartitionReplica replica = new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId);
-                        KafkaFutureImpl<Void> future = futures.get(replica);
-                        if (future == null) {
-                            handleFailure(new IllegalStateException(
-                                "The partition " + tp + " in the response from broker " + brokerId + " is not in the request"));
-                        } else if (error == Errors.NONE) {
-                            future.complete(null);
-                        } else {
-                            future.completeExceptionally(error.exception());
+                    for (AlterReplicaLogDirTopicResult topicResult: response.data().results()) {
+                        for (AlterReplicaLogDirPartitionResult partitionResult: topicResult.partitions()) {
+                            TopicPartitionReplica replica = new TopicPartitionReplica(
+                                    topicResult.topicName(), partitionResult.partitionIndex(), brokerId);
+                            KafkaFutureImpl<Void> future = futures.get(replica);
+                            if (future == null) {
+                                log.warn("The partition {} in the response from broker {}} is not in the request",

Review comment:
       There's an extra `}` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org