You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/10 13:54:48 UTC

[GitHub] [kafka] tombentley opened a new pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

tombentley opened a new pull request #9007:
URL: https://github.com/apache/kafka/pull/9007


   As per KIP-621. Also added some tests in KafkaAdminClientTest
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
mimaison merged pull request #9007:
URL: https://github.com/apache/kafka/pull/9007


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#discussion_r454941348



##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,205 @@ public void testDescribeConfigsUnrequested() throws Exception {
         }
     }
 
+    @Test
+    public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
+        List<Integer> brokers = singletonList(0);
+        TopicPartition tp = new TopicPartition("topic", 12);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.NONE.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic()
+                                    .setName(tp.topic())
+                                    .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                                            .setPartitionIndex(tp.partition())
+                                            .setPartitionSize(1234567890)
+                                            .setIsFutureKey(false)
+                                            .setOffsetLag(0)))))
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions();
+            assertEquals(Collections.singleton(0), descriptions.keySet());
+            assertNotNull(descriptions.get(0));
+            Map<String, LogDirDescription> descriptionsMap = descriptions.get(0).get();
+            assertEquals(Collections.singleton("/var/data/kafka"), descriptionsMap.keySet());
+            assertNull(descriptionsMap.get("/var/data/kafka").error());
+            Map<TopicPartition, ReplicaInfo> descriptionsReplicaInfos = descriptionsMap.get("/var/data/kafka").replicaInfos();
+            assertEquals(Collections.singleton(tp), descriptionsReplicaInfos.keySet());
+            assertEquals(1234567890, descriptionsReplicaInfos.get(tp).size());
+            assertEquals(0, descriptionsReplicaInfos.get(tp).offsetLag());
+            assertFalse(descriptionsReplicaInfos.get(tp).isFuture());

Review comment:
       This might have less mileage than you expected because the different types mean we need two methods each with two call sites, rather than 4 call sites for a single method, but I've done it anyway.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,205 @@ public void testDescribeConfigsUnrequested() throws Exception {
         }
     }
 
+    @Test
+    public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
+        List<Integer> brokers = singletonList(0);
+        TopicPartition tp = new TopicPartition("topic", 12);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.NONE.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic()
+                                    .setName(tp.topic())
+                                    .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                                            .setPartitionIndex(tp.partition())
+                                            .setPartitionSize(1234567890)
+                                            .setIsFutureKey(false)
+                                            .setOffsetLag(0)))))
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions();
+            assertEquals(Collections.singleton(0), descriptions.keySet());
+            assertNotNull(descriptions.get(0));
+            Map<String, LogDirDescription> descriptionsMap = descriptions.get(0).get();
+            assertEquals(Collections.singleton("/var/data/kafka"), descriptionsMap.keySet());
+            assertNull(descriptionsMap.get("/var/data/kafka").error());
+            Map<TopicPartition, ReplicaInfo> descriptionsReplicaInfos = descriptionsMap.get("/var/data/kafka").replicaInfos();
+            assertEquals(Collections.singleton(tp), descriptionsReplicaInfos.keySet());
+            assertEquals(1234567890, descriptionsReplicaInfos.get(tp).size());
+            assertEquals(0, descriptionsReplicaInfos.get(tp).offsetLag());
+            assertFalse(descriptionsReplicaInfos.get(tp).isFuture());
+
+            Map<Integer, Map<String, LogDirDescription>> allDescriptions = result.allDescriptions().get();
+            assertEquals(Collections.singleton(0), allDescriptions.keySet());
+            assertNotNull(allDescriptions.get(0));
+            assertEquals(Collections.singleton("/var/data/kafka"), allDescriptions.get(0).keySet());
+            assertNull(allDescriptions.get(0).get("/var/data/kafka").error());
+            Map<TopicPartition, ReplicaInfo> allDescriptionsReplicInfos = allDescriptions.get(0).get("/var/data/kafka").replicaInfos();
+            assertEquals(Collections.singleton(tp), allDescriptionsReplicInfos.keySet());
+            assertEquals(1234567890, allDescriptionsReplicInfos.get(tp).size());
+            assertEquals(0, allDescriptionsReplicInfos.get(tp).offsetLag());
+            assertFalse(allDescriptionsReplicInfos.get(tp).isFuture());
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testDescribeLogDirsDeprecated() throws ExecutionException, InterruptedException {
+        List<Integer> brokers = singletonList(0);
+        TopicPartition tp = new TopicPartition("topic", 12);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.NONE.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic()
+                                    .setName(tp.topic())
+                                    .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                                            .setPartitionIndex(tp.partition())
+                                            .setPartitionSize(1234567890)
+                                            .setIsFutureKey(false)
+                                            .setOffsetLag(0)))))
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
+
+            Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values();
+            assertEquals(Collections.singleton(0), deprecatedValues.keySet());
+            assertNotNull(deprecatedValues.get(0));
+            Map<String, DescribeLogDirsResponse.LogDirInfo> valuesMap = deprecatedValues.get(0).get();
+            assertEquals(Collections.singleton("/var/data/kafka"), valuesMap.keySet());
+            assertEquals(Errors.NONE, valuesMap.get("/var/data/kafka").error);
+            Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> valuesReplicaInfos =
+                    valuesMap.get("/var/data/kafka").replicaInfos;
+            assertEquals(Collections.singleton(tp), valuesReplicaInfos.keySet());
+            assertEquals(1234567890, valuesReplicaInfos.get(tp).size);
+            assertEquals(0, valuesReplicaInfos.get(tp).offsetLag);
+            assertFalse(valuesReplicaInfos.get(tp).isFuture);
+
+            Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> deprecatedAll = result.all().get();
+            assertEquals(Collections.singleton(0), deprecatedAll.keySet());
+            assertNotNull(deprecatedAll.get(0));
+            assertEquals(Collections.singleton("/var/data/kafka"), deprecatedAll.get(0).keySet());
+            assertEquals(Errors.NONE, deprecatedAll.get(0).get("/var/data/kafka").error);
+            Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> allReplicaInfos =
+                    deprecatedAll.get(0).get("/var/data/kafka").replicaInfos;
+            assertEquals(Collections.singleton(tp), allReplicaInfos.keySet());
+            assertEquals(1234567890, allReplicaInfos.get(tp).size);
+            assertEquals(0, allReplicaInfos.get(tp).offsetLag);
+            assertFalse(allReplicaInfos.get(tp).isFuture);
+        }
+    }
+
+    @Test
+    public void testDescribeLogDirsOfflineDir() throws ExecutionException, InterruptedException {
+        List<Integer> brokers = singletonList(0);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new DescribeLogDirsResponseData().setResults(asList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.KAFKA_STORAGE_ERROR.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(emptyList())
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions();
+            assertEquals(Collections.singleton(0), descriptions.keySet());
+            assertNotNull(descriptions.get(0));
+            Map<String, LogDirDescription> descriptionsMap = descriptions.get(0).get();
+            assertEquals(Collections.singleton("/var/data/kafka"), descriptionsMap.keySet());
+            assertEquals(KafkaStorageException.class, descriptionsMap.get("/var/data/kafka").error().getClass());
+            assertEquals(emptySet(), descriptionsMap.get("/var/data/kafka").replicaInfos().keySet());
+
+            Map<Integer, Map<String, LogDirDescription>> allDescriptions = result.allDescriptions().get();
+            assertEquals(Collections.singleton(0), allDescriptions.keySet());
+            Map<String, LogDirDescription> allMap = allDescriptions.get(0);
+            assertNotNull(allMap);
+            assertEquals(Collections.singleton("/var/data/kafka"), allMap.keySet());
+            assertEquals(KafkaStorageException.class, allMap.get("/var/data/kafka").error().getClass());
+            assertEquals(emptySet(), allMap.get("/var/data/kafka").replicaInfos().keySet());
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testDescribeLogDirsOfflineDirDeprecated() throws ExecutionException, InterruptedException {
+        List<Integer> brokers = singletonList(0);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new DescribeLogDirsResponseData().setResults(asList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.KAFKA_STORAGE_ERROR.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(emptyList())
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
+
+            Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values();
+            assertEquals(Collections.singleton(0), deprecatedValues.keySet());
+            assertNotNull(deprecatedValues.get(0));
+            Map<String, DescribeLogDirsResponse.LogDirInfo> valuesMap = deprecatedValues.get(0).get();
+            assertEquals(Collections.singleton("/var/data/kafka"), valuesMap.keySet());
+            assertEquals(Errors.KAFKA_STORAGE_ERROR, valuesMap.get("/var/data/kafka").error);
+            assertEquals(emptySet(), valuesMap.get("/var/data/kafka").replicaInfos.keySet());
+
+            Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> deprecatedAll = result.all().get();
+            assertEquals(Collections.singleton(0), deprecatedAll.keySet());
+            Map<String, DescribeLogDirsResponse.LogDirInfo> allMap = deprecatedAll.get(0);
+            assertNotNull(allMap);
+            assertEquals(Collections.singleton("/var/data/kafka"), allMap.keySet());
+            assertEquals(Errors.KAFKA_STORAGE_ERROR, allMap.get("/var/data/kafka").error);
+            assertEquals(emptySet(), allMap.get("/var/data/kafka").replicaInfos.keySet());
+        }
+    }
+
+    @Test
+    public void testDescribeReplicaLogDirs() throws ExecutionException, InterruptedException {

Review comment:
       I added it to the existing test. Due to the new helper methods I felt this didn't really complicate the test very much and is also allows us to cover the case where the RPC returns `STORAGE_ERROR`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-666256788


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-667483663


   @tombentley Congratulations! :congratulations: @omkreddy @mimaison Thanks again for the detailed review, as usual! :smiley: 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#discussion_r454572008



##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,205 @@ public void testDescribeConfigsUnrequested() throws Exception {
         }
     }
 
+    @Test
+    public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
+        List<Integer> brokers = singletonList(0);
+        TopicPartition tp = new TopicPartition("topic", 12);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(

Review comment:
       nit: What about extracting the construction in a small helper method `prepareDescribeLogDirsResponse` that create a response for one LogDir and TopicPartition? It seems that the same block of code is used in many tests.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2354,32 +2374,31 @@ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicParti
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
                     DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse;
-                    for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> responseEntry: response.logDirInfos().entrySet()) {
+                    for (Map.Entry<String, LogDirDescription> responseEntry: logDirDescriptions(response).entrySet()) {
                         String logDir = responseEntry.getKey();
-                        DescribeLogDirsResponse.LogDirInfo logDirInfo = responseEntry.getValue();
+                        LogDirDescription logDirInfo = responseEntry.getValue();
 
                         // No replica info will be provided if the log directory is offline
-                        if (logDirInfo.error == Errors.KAFKA_STORAGE_ERROR)
+                        if (logDirInfo.error() instanceof KafkaStorageException)
                             continue;
-                        if (logDirInfo.error != Errors.NONE)
+                        if (logDirInfo.error() != null)
                             handleFailure(new IllegalStateException(
-                                "The error " + logDirInfo.error + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal"));
+                                "The error " + logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal"));
 
-                        for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfoEntry: logDirInfo.replicaInfos.entrySet()) {
+                        for (Map.Entry<TopicPartition, ReplicaInfo> replicaInfoEntry: logDirInfo.replicaInfos().entrySet()) {
                             TopicPartition tp = replicaInfoEntry.getKey();
-                            DescribeLogDirsResponse.ReplicaInfo replicaInfo = replicaInfoEntry.getValue();
+                            ReplicaInfo replicaInfo = replicaInfoEntry.getValue();
                             ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp);
                             if (replicaLogDirInfo == null) {
-                                handleFailure(new IllegalStateException(
-                                    "The partition " + tp + " in the response from broker " + brokerId + " is not in the request"));
-                            } else if (replicaInfo.isFuture) {
+                                log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp);

Review comment:
       I suggest to add a unit test to cover this change. I think that the previous behaviour was a bug so it would be great to not reintroduce it in the future.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,205 @@ public void testDescribeConfigsUnrequested() throws Exception {
         }
     }
 
+    @Test
+    public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
+        List<Integer> brokers = singletonList(0);
+        TopicPartition tp = new TopicPartition("topic", 12);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.NONE.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic()
+                                    .setName(tp.topic())
+                                    .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                                            .setPartitionIndex(tp.partition())
+                                            .setPartitionSize(1234567890)
+                                            .setIsFutureKey(false)
+                                            .setOffsetLag(0)))))
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);

Review comment:
       nit: In some of the other tests, you have an empty line after calling the method of the admin client. Shall we add one everywhere in order to be consistent? I personally like to have one before and after to separate blocks of code. I leave this up to you.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+
+import java.util.Map;
+
+import static java.util.Collections.unmodifiableMap;
+
+/**
+ * A description of a log directory on a particular broker.
+ */
+public class LogDirDescription {
+    private final Map<TopicPartition, ReplicaInfo> replicaInfos;
+    private final ApiException error;
+
+    public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) {

Review comment:
       That's great point. At the moment, I think that we are not consistent about this. Some are package private and some are not. The advantage of keeping it public is that it allows to use the class in unit tests which resides in other packages.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,205 @@ public void testDescribeConfigsUnrequested() throws Exception {
         }
     }
 
+    @Test
+    public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
+        List<Integer> brokers = singletonList(0);
+        TopicPartition tp = new TopicPartition("topic", 12);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.NONE.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic()
+                                    .setName(tp.topic())
+                                    .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                                            .setPartitionIndex(tp.partition())
+                                            .setPartitionSize(1234567890)
+                                            .setIsFutureKey(false)
+                                            .setOffsetLag(0)))))
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions();
+            assertEquals(Collections.singleton(0), descriptions.keySet());
+            assertNotNull(descriptions.get(0));
+            Map<String, LogDirDescription> descriptionsMap = descriptions.get(0).get();
+            assertEquals(Collections.singleton("/var/data/kafka"), descriptionsMap.keySet());
+            assertNull(descriptionsMap.get("/var/data/kafka").error());
+            Map<TopicPartition, ReplicaInfo> descriptionsReplicaInfos = descriptionsMap.get("/var/data/kafka").replicaInfos();
+            assertEquals(Collections.singleton(tp), descriptionsReplicaInfos.keySet());
+            assertEquals(1234567890, descriptionsReplicaInfos.get(tp).size());
+            assertEquals(0, descriptionsReplicaInfos.get(tp).offsetLag());
+            assertFalse(descriptionsReplicaInfos.get(tp).isFuture());
+
+            Map<Integer, Map<String, LogDirDescription>> allDescriptions = result.allDescriptions().get();
+            assertEquals(Collections.singleton(0), allDescriptions.keySet());
+            assertNotNull(allDescriptions.get(0));
+            assertEquals(Collections.singleton("/var/data/kafka"), allDescriptions.get(0).keySet());
+            assertNull(allDescriptions.get(0).get("/var/data/kafka").error());
+            Map<TopicPartition, ReplicaInfo> allDescriptionsReplicInfos = allDescriptions.get(0).get("/var/data/kafka").replicaInfos();
+            assertEquals(Collections.singleton(tp), allDescriptionsReplicInfos.keySet());
+            assertEquals(1234567890, allDescriptionsReplicInfos.get(tp).size());
+            assertEquals(0, allDescriptionsReplicInfos.get(tp).offsetLag());
+            assertFalse(allDescriptionsReplicInfos.get(tp).isFuture());
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testDescribeLogDirsDeprecated() throws ExecutionException, InterruptedException {
+        List<Integer> brokers = singletonList(0);
+        TopicPartition tp = new TopicPartition("topic", 12);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.NONE.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic()
+                                    .setName(tp.topic())
+                                    .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                                            .setPartitionIndex(tp.partition())
+                                            .setPartitionSize(1234567890)
+                                            .setIsFutureKey(false)
+                                            .setOffsetLag(0)))))
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
+
+            Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values();
+            assertEquals(Collections.singleton(0), deprecatedValues.keySet());
+            assertNotNull(deprecatedValues.get(0));
+            Map<String, DescribeLogDirsResponse.LogDirInfo> valuesMap = deprecatedValues.get(0).get();
+            assertEquals(Collections.singleton("/var/data/kafka"), valuesMap.keySet());
+            assertEquals(Errors.NONE, valuesMap.get("/var/data/kafka").error);
+            Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> valuesReplicaInfos =
+                    valuesMap.get("/var/data/kafka").replicaInfos;
+            assertEquals(Collections.singleton(tp), valuesReplicaInfos.keySet());
+            assertEquals(1234567890, valuesReplicaInfos.get(tp).size);
+            assertEquals(0, valuesReplicaInfos.get(tp).offsetLag);
+            assertFalse(valuesReplicaInfos.get(tp).isFuture);
+
+            Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> deprecatedAll = result.all().get();
+            assertEquals(Collections.singleton(0), deprecatedAll.keySet());
+            assertNotNull(deprecatedAll.get(0));
+            assertEquals(Collections.singleton("/var/data/kafka"), deprecatedAll.get(0).keySet());
+            assertEquals(Errors.NONE, deprecatedAll.get(0).get("/var/data/kafka").error);
+            Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> allReplicaInfos =
+                    deprecatedAll.get(0).get("/var/data/kafka").replicaInfos;
+            assertEquals(Collections.singleton(tp), allReplicaInfos.keySet());
+            assertEquals(1234567890, allReplicaInfos.get(tp).size);
+            assertEquals(0, allReplicaInfos.get(tp).offsetLag);
+            assertFalse(allReplicaInfos.get(tp).isFuture);
+        }
+    }
+
+    @Test
+    public void testDescribeLogDirsOfflineDir() throws ExecutionException, InterruptedException {
+        List<Integer> brokers = singletonList(0);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new DescribeLogDirsResponseData().setResults(asList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.KAFKA_STORAGE_ERROR.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(emptyList())
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions();
+            assertEquals(Collections.singleton(0), descriptions.keySet());
+            assertNotNull(descriptions.get(0));
+            Map<String, LogDirDescription> descriptionsMap = descriptions.get(0).get();
+            assertEquals(Collections.singleton("/var/data/kafka"), descriptionsMap.keySet());
+            assertEquals(KafkaStorageException.class, descriptionsMap.get("/var/data/kafka").error().getClass());
+            assertEquals(emptySet(), descriptionsMap.get("/var/data/kafka").replicaInfos().keySet());
+
+            Map<Integer, Map<String, LogDirDescription>> allDescriptions = result.allDescriptions().get();
+            assertEquals(Collections.singleton(0), allDescriptions.keySet());
+            Map<String, LogDirDescription> allMap = allDescriptions.get(0);
+            assertNotNull(allMap);
+            assertEquals(Collections.singleton("/var/data/kafka"), allMap.keySet());
+            assertEquals(KafkaStorageException.class, allMap.get("/var/data/kafka").error().getClass());
+            assertEquals(emptySet(), allMap.get("/var/data/kafka").replicaInfos().keySet());
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testDescribeLogDirsOfflineDirDeprecated() throws ExecutionException, InterruptedException {
+        List<Integer> brokers = singletonList(0);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new DescribeLogDirsResponseData().setResults(asList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.KAFKA_STORAGE_ERROR.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(emptyList())
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
+
+            Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values();
+            assertEquals(Collections.singleton(0), deprecatedValues.keySet());
+            assertNotNull(deprecatedValues.get(0));
+            Map<String, DescribeLogDirsResponse.LogDirInfo> valuesMap = deprecatedValues.get(0).get();
+            assertEquals(Collections.singleton("/var/data/kafka"), valuesMap.keySet());
+            assertEquals(Errors.KAFKA_STORAGE_ERROR, valuesMap.get("/var/data/kafka").error);
+            assertEquals(emptySet(), valuesMap.get("/var/data/kafka").replicaInfos.keySet());
+
+            Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> deprecatedAll = result.all().get();
+            assertEquals(Collections.singleton(0), deprecatedAll.keySet());
+            Map<String, DescribeLogDirsResponse.LogDirInfo> allMap = deprecatedAll.get(0);
+            assertNotNull(allMap);
+            assertEquals(Collections.singleton("/var/data/kafka"), allMap.keySet());
+            assertEquals(Errors.KAFKA_STORAGE_ERROR, allMap.get("/var/data/kafka").error);
+            assertEquals(emptySet(), allMap.get("/var/data/kafka").replicaInfos.keySet());
+        }
+    }
+
+    @Test
+    public void testDescribeReplicaLogDirs() throws ExecutionException, InterruptedException {

Review comment:
       This is not due to your PR but shall we add a unit test which uses multiple brokers? 

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,205 @@ public void testDescribeConfigsUnrequested() throws Exception {
         }
     }
 
+    @Test
+    public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
+        List<Integer> brokers = singletonList(0);
+        TopicPartition tp = new TopicPartition("topic", 12);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.NONE.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic()
+                                    .setName(tp.topic())
+                                    .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                                            .setPartitionIndex(tp.partition())
+                                            .setPartitionSize(1234567890)
+                                            .setIsFutureKey(false)
+                                            .setOffsetLag(0)))))
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions();
+            assertEquals(Collections.singleton(0), descriptions.keySet());

Review comment:
       nit: You can reuse `brokers` here. Would it make sense to allow extract the other constants in local variables?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2286,13 +2288,15 @@ public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, Descri
                     return new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(null));
                 }
 
+                @SuppressWarnings("deprecation")
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
                     DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse;
-                    if (response.logDirInfos().size() > 0) {
-                        future.complete(response.logDirInfos());
+                    Map<String, LogDirDescription> descriptions = logDirDescriptions(response);
+                    if (descriptions.size() > 0) {
+                        future.complete(descriptions);
                     } else {
-                        // response.logDirInfos() will be empty if and only if the user is not authorized to describe clsuter resource.
+                        // response/descriptions will be empty if and only if the user is not authorized to describe cluster resource.

Review comment:
       nit: shall we remove `response/`?

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,205 @@ public void testDescribeConfigsUnrequested() throws Exception {
         }
     }
 
+    @Test
+    public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
+        List<Integer> brokers = singletonList(0);
+        TopicPartition tp = new TopicPartition("topic", 12);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.NONE.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic()
+                                    .setName(tp.topic())
+                                    .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                                            .setPartitionIndex(tp.partition())
+                                            .setPartitionSize(1234567890)
+                                            .setIsFutureKey(false)
+                                            .setOffsetLag(0)))))
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions();
+            assertEquals(Collections.singleton(0), descriptions.keySet());
+            assertNotNull(descriptions.get(0));
+            Map<String, LogDirDescription> descriptionsMap = descriptions.get(0).get();
+            assertEquals(Collections.singleton("/var/data/kafka"), descriptionsMap.keySet());
+            assertNull(descriptionsMap.get("/var/data/kafka").error());
+            Map<TopicPartition, ReplicaInfo> descriptionsReplicaInfos = descriptionsMap.get("/var/data/kafka").replicaInfos();
+            assertEquals(Collections.singleton(tp), descriptionsReplicaInfos.keySet());
+            assertEquals(1234567890, descriptionsReplicaInfos.get(tp).size());
+            assertEquals(0, descriptionsReplicaInfos.get(tp).offsetLag());
+            assertFalse(descriptionsReplicaInfos.get(tp).isFuture());

Review comment:
       This block of assertions is used multiple times. Would it make sense to extract it in a helper method, say `assertDescriptions`, that verifies a descriptions map contains the information about a single log dir/topic partition?
   
   Something like `assertDescriptionContains(descriptionsMap, logDir, tp, size, offsetLag, isFuture)`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on a change in pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on a change in pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#discussion_r455051262



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+
+import java.util.Map;
+
+import static java.util.Collections.unmodifiableMap;
+
+/**
+ * A description of a log directory on a particular broker.
+ */
+public class LogDirDescription {
+    private final Map<TopicPartition, ReplicaInfo> replicaInfos;
+    private final ApiException error;
+
+    public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) {
+        this.error = error;
+        this.replicaInfos = replicaInfos;
+    }
+
+    /**
+     * A KafkaStorageException if this log directory is offline,
+     * possibly some other exception if there were problems describing the log directory
+     * or null if the directory is online.
+     */
+    public ApiException error() {

Review comment:
       We should remove the `<p>` before the `<ul>` also. (see below.)
   ![kafka KafkaStreams java at trunk ยท apache kafka](https://user-images.githubusercontent.com/2375128/87550564-23978180-c6ea-11ea-9fcf-c6a88ea3034d.png)
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] omkreddy commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
omkreddy commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-662818802


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-658746535


   @dongjinleekr ah, thank you, I hadn't noticed that that was the norm.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#discussion_r454484147



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+
+import java.util.Map;
+
+import static java.util.Collections.unmodifiableMap;
+
+/**
+ * A description of a log directory on a particular broker.
+ */
+public class LogDirDescription {
+    private final Map<TopicPartition, ReplicaInfo> replicaInfos;
+    private final ApiException error;
+
+    public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) {

Review comment:
       I initially thought the same, but `TopicDescription`, for example (as well as other classes accessible from `*Results` classes) have a public constructors.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#discussion_r459540733



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2306,6 +2310,22 @@ void handleFailure(Throwable throwable) {
         return new DescribeLogDirsResult(new HashMap<>(futures));
     }
 
+    private Map<String, LogDirDescription> logDirDescriptions(DescribeLogDirsResponse response) {

Review comment:
       This can be static. Also should we keep it in `DescribeLogDirsResponse`?

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws Exception {
         }
     }
 
+    private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {
+        return prepareDescribeLogDirsResponse(error, logDir,
+                prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false));
+    }
+
+    private List<DescribeLogDirsTopic> prepareDescribeLogDirsTopics(
+            long partitionSize, long offsetLag, String topic, int partition, boolean isFuture) {
+        return singletonList(new DescribeLogDirsTopic()
+                .setName(topic)
+                .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                        .setPartitionIndex(partition)
+                        .setPartitionSize(partitionSize)
+                        .setIsFutureKey(isFuture)
+                        .setOffsetLag(offsetLag))));
+    }
+
+    private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir,
+                                                                   List<DescribeLogDirsTopic> topics) {
+        return new DescribeLogDirsResponse(
+                new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                        .setErrorCode(error.code())
+                        .setLogDir(logDir)
+                        .setTopics(topics)
+                )));
+    }
+
+    @Test
+    public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
+        Set<Integer> brokers = Collections.singleton(0);
+        String logDir = "/var/data/kafka";
+        TopicPartition tp = new TopicPartition("topic", 12);
+        long partitionSize = 1234567890;
+        long offsetLag = 24;
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(
+                    prepareDescribeLogDirsResponse(Errors.NONE, logDir, tp, partitionSize, offsetLag),
+                    env.cluster().nodeById(0));
+
+            DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
+
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions();
+            assertEquals(brokers, descriptions.keySet());
+            assertNotNull(descriptions.get(0));
+            assertDescriptionContains(descriptions.get(0).get(), logDir, tp, partitionSize, offsetLag);
+
+            Map<Integer, Map<String, LogDirDescription>> allDescriptions = result.allDescriptions().get();
+            assertEquals(brokers, allDescriptions.keySet());
+            assertDescriptionContains(allDescriptions.get(0), logDir, tp, partitionSize, offsetLag);
+        }
+    }
+
+    private void assertDescriptionContains(Map<String, LogDirDescription> descriptionsMap, String logDir,

Review comment:
       This can be static

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws Exception {
         }
     }
 
+    private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {

Review comment:
       This can be static

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws Exception {
         }
     }
 
+    private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {
+        return prepareDescribeLogDirsResponse(error, logDir,
+                prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false));
+    }
+
+    private List<DescribeLogDirsTopic> prepareDescribeLogDirsTopics(

Review comment:
       This can be static

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws Exception {
         }
     }
 
+    private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {
+        return prepareDescribeLogDirsResponse(error, logDir,
+                prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false));
+    }
+
+    private List<DescribeLogDirsTopic> prepareDescribeLogDirsTopics(
+            long partitionSize, long offsetLag, String topic, int partition, boolean isFuture) {
+        return singletonList(new DescribeLogDirsTopic()
+                .setName(topic)
+                .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                        .setPartitionIndex(partition)
+                        .setPartitionSize(partitionSize)
+                        .setIsFutureKey(isFuture)
+                        .setOffsetLag(offsetLag))));
+    }
+
+    private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir,
+                                                                   List<DescribeLogDirsTopic> topics) {
+        return new DescribeLogDirsResponse(
+                new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                        .setErrorCode(error.code())
+                        .setLogDir(logDir)
+                        .setTopics(topics)
+                )));
+    }
+
+    @Test
+    public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
+        Set<Integer> brokers = Collections.singleton(0);
+        String logDir = "/var/data/kafka";
+        TopicPartition tp = new TopicPartition("topic", 12);
+        long partitionSize = 1234567890;
+        long offsetLag = 24;
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(
+                    prepareDescribeLogDirsResponse(Errors.NONE, logDir, tp, partitionSize, offsetLag),
+                    env.cluster().nodeById(0));
+
+            DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
+
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions();
+            assertEquals(brokers, descriptions.keySet());
+            assertNotNull(descriptions.get(0));
+            assertDescriptionContains(descriptions.get(0).get(), logDir, tp, partitionSize, offsetLag);
+
+            Map<Integer, Map<String, LogDirDescription>> allDescriptions = result.allDescriptions().get();
+            assertEquals(brokers, allDescriptions.keySet());
+            assertDescriptionContains(allDescriptions.get(0), logDir, tp, partitionSize, offsetLag);
+        }
+    }
+
+    private void assertDescriptionContains(Map<String, LogDirDescription> descriptionsMap, String logDir,
+                                           TopicPartition tp, long partitionSize, long offsetLag) {
+        assertNotNull(descriptionsMap);
+        assertEquals(Collections.singleton(logDir), descriptionsMap.keySet());
+        assertNull(descriptionsMap.get(logDir).error());
+        Map<TopicPartition, ReplicaInfo> descriptionsReplicaInfos = descriptionsMap.get(logDir).replicaInfos();
+        assertEquals(Collections.singleton(tp), descriptionsReplicaInfos.keySet());
+        assertEquals(partitionSize, descriptionsReplicaInfos.get(tp).size());
+        assertEquals(offsetLag, descriptionsReplicaInfos.get(tp).offsetLag());
+        assertFalse(descriptionsReplicaInfos.get(tp).isFuture());
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testDescribeLogDirsDeprecated() throws ExecutionException, InterruptedException {
+        Set<Integer> brokers = Collections.singleton(0);
+        TopicPartition tp = new TopicPartition("topic", 12);
+        String logDir = "/var/data/kafka";
+        Errors error = Errors.NONE;
+        int offsetLag = 24;
+        long partitionSize = 1234567890;
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(
+                    prepareDescribeLogDirsResponse(error, logDir, tp, partitionSize, offsetLag),
+                    env.cluster().nodeById(0));
+
+            DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
+
+            Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values();
+            assertEquals(brokers, deprecatedValues.keySet());
+            assertNotNull(deprecatedValues.get(0));
+            assertDescriptionContains(deprecatedValues.get(0).get(), logDir, tp, error, offsetLag, partitionSize);
+
+            Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> deprecatedAll = result.all().get();
+            assertEquals(brokers, deprecatedAll.keySet());
+            assertDescriptionContains(deprecatedAll.get(0), logDir, tp, error, offsetLag, partitionSize);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private void assertDescriptionContains(Map<String, DescribeLogDirsResponse.LogDirInfo> descriptionsMap,

Review comment:
       This can be static

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2306,6 +2310,22 @@ void handleFailure(Throwable throwable) {
         return new DescribeLogDirsResult(new HashMap<>(futures));
     }
 
+    private Map<String, LogDirDescription> logDirDescriptions(DescribeLogDirsResponse response) {
+        HashMap<String, LogDirDescription> result = new HashMap<>(response.data().results().size());

Review comment:
       The left side can be `Map`

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws Exception {
         }
     }
 
+    private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {
+        return prepareDescribeLogDirsResponse(error, logDir,
+                prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false));
+    }
+
+    private List<DescribeLogDirsTopic> prepareDescribeLogDirsTopics(
+            long partitionSize, long offsetLag, String topic, int partition, boolean isFuture) {
+        return singletonList(new DescribeLogDirsTopic()
+                .setName(topic)
+                .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                        .setPartitionIndex(partition)
+                        .setPartitionSize(partitionSize)
+                        .setIsFutureKey(isFuture)
+                        .setOffsetLag(offsetLag))));
+    }
+
+    private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir,

Review comment:
       This can be static




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on a change in pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on a change in pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#discussion_r455019176



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+
+import java.util.Map;
+
+import static java.util.Collections.unmodifiableMap;
+
+/**
+ * A description of a log directory on a particular broker.
+ */
+public class LogDirDescription {
+    private final Map<TopicPartition, ReplicaInfo> replicaInfos;
+    private final ApiException error;
+
+    public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) {
+        this.error = error;
+        this.replicaInfos = replicaInfos;
+    }
+
+    /**
+     * A KafkaStorageException if this log directory is offline,
+     * possibly some other exception if there were problems describing the log directory
+     * or null if the directory is online.
+     */
+    public ApiException error() {

Review comment:
       For consistency with the other [Javadoc](https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1078) it seems like we should use a single `<p>` only when between the paragraphs. Please remove the `<p>` tags. (Sorry, I was also confused.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#discussion_r452893782



##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -64,7 +63,7 @@ object LogDirsCommand {
                     "logDirs" -> logDirInfos.map { case (logDir, logDirInfo) =>
                         Map(
                             "logDir" -> logDir,
-                            "error" -> logDirInfo.error.exceptionName(),
+                            "error" -> Option(logDirInfo.error).flatMap(ex => Some(ex.getClass.getName)).orNull,

Review comment:
       nit: Can't we use `map` instead of `flatMap` and remove the `Some`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/ReplicaInfo.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+/**
+ * A description of a replica on a particular broker.
+ */
+public class ReplicaInfo {
+
+    private final long size;
+    private final long offsetLag;
+    private final boolean isFuture;
+
+    public ReplicaInfo(long size, long offsetLag, boolean isFuture) {
+        this.size = size;
+        this.offsetLag = offsetLag;
+        this.isFuture = isFuture;
+    }
+
+    /**
+     * The total size of the log segments in this replica in bytes.
+     */
+    public long size() {
+        return size;
+    }
+
+    /**
+     * The lag of the log's LEO with respect to the partition's
+     * high watermark (if it is the current log for the partition)
+     * or the current replica's LEO (if it is the {@linkplain #isFuture() future log}
+     * for the partition).
+     */
+    public long offsetLag() {
+        return offsetLag;
+    }
+
+    /**
+     * Whether this replica has been created by a AlterReplicaLogDirsRequest
+     * but not yet replaced the current replica on the broker.
+     *
+     * @return true if this log is created by AlterReplicaLogDirsRequest and will replace the current log
+     * of the replica at some time in the future.
+     */
+    public boolean isFuture() {
+        return isFuture;
+    }
+
+    @Override
+    public String toString() {
+        return "ReplicaInfo{" +
+                "size=" + size +
+                ", offsetLag=" + offsetLag +
+                ", isFuture=" + isFuture +
+                '}';

Review comment:
       nit: curly braces instead of parenthesis. 

##########
File path: core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
##########
@@ -46,15 +48,23 @@ class DescribeLogDirsRequestTest extends BaseRequestTest {
 
     val request = new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(null)).build()
     val response = connectAndReceive[DescribeLogDirsResponse](request, destination = controllerSocketServer)
-    val logDirInfos = response.logDirInfos()
+    case class ReplicaInfo(size: Long, offsetLag: Long, isFuture: Boolean)
+    case class LogDirInfo(error: Errors, replicaInfos: Map[TopicPartition, ReplicaInfo])

Review comment:
       That's a pity that we have to redefine there classes here. Couldn't we update the test to work with the plain response instead?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2356,32 +2378,32 @@ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicParti
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
                     DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse;
-                    for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> responseEntry: response.logDirInfos().entrySet()) {
+                    for (Map.Entry<String, LogDirDescription> responseEntry: logDirDescriptions(response).entrySet()) {
                         String logDir = responseEntry.getKey();
-                        DescribeLogDirsResponse.LogDirInfo logDirInfo = responseEntry.getValue();
+                        LogDirDescription logDirInfo = responseEntry.getValue();
 
                         // No replica info will be provided if the log directory is offline
-                        if (logDirInfo.error == Errors.KAFKA_STORAGE_ERROR)
+                        if (logDirInfo.error() instanceof KafkaStorageException)
                             continue;
-                        if (logDirInfo.error != Errors.NONE)
+                        if (logDirInfo.error() != null)
                             handleFailure(new IllegalStateException(
-                                "The error " + logDirInfo.error + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal"));
+                                "The error " + logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal"));
 
-                        for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfoEntry: logDirInfo.replicaInfos.entrySet()) {
+                        for (Map.Entry<TopicPartition, ReplicaInfo> replicaInfoEntry: logDirInfo.replicaInfos().entrySet()) {
                             TopicPartition tp = replicaInfoEntry.getKey();
-                            DescribeLogDirsResponse.ReplicaInfo replicaInfo = replicaInfoEntry.getValue();
+                            ReplicaInfo replicaInfo = replicaInfoEntry.getValue();
                             ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp);
                             if (replicaLogDirInfo == null) {
                                 handleFailure(new IllegalStateException(

Review comment:
       Not related to your PR but this look weird. It seems that we fail all the futures if an unexpected replica is provided by the broker in the response. I think that we should log a warning when this happen like we do in the other methods (e.g. createTopics). What do you think?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2308,6 +2312,24 @@ void handleFailure(Throwable throwable) {
         return new DescribeLogDirsResult(new HashMap<>(futures));
     }
 
+    private Map<String, LogDirDescription> logDirDescriptions(DescribeLogDirsResponse response) {
+        HashMap<String, LogDirDescription> result = new HashMap<>(response.data().results().size());
+        for (DescribeLogDirsResponseData.DescribeLogDirsResult logDirResult : response.data().results()) {
+            Map<TopicPartition, ReplicaInfo> replicaInfoMap = new HashMap<>();
+            if (logDirResult.topics() != null) {

Review comment:
       nit: `topics` is not nullable in the protocol so it should never be `null`, does it?

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1055,6 +1059,154 @@ public void testDescribeConfigsUnrequested() throws Exception {
         }
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
+        List<Integer> brokers = singletonList(0);
+        TopicPartition tp = new TopicPartition("topic", 12);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.NONE.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsTopic()
+                                    .setName(tp.topic())
+                                    .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                                            .setPartitionIndex(tp.partition())
+                                            .setPartitionSize(1234567890)
+                                            .setIsFutureKey(false)
+                                            .setOffsetLag(0)))))
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions();
+            assertEquals(Collections.singleton(0), descriptions.keySet());
+            assertNotNull(descriptions.get(0));
+            assertEquals(Collections.singleton("/var/data/kafka"), descriptions.get(0).get().keySet());
+            assertNull(descriptions.get(0).get().get("/var/data/kafka").error());
+            assertEquals(Collections.singleton(tp), descriptions.get(0).get().get("/var/data/kafka").replicaInfos().keySet());
+            assertEquals(1234567890, descriptions.get(0).get().get("/var/data/kafka").replicaInfos().get(tp).size());
+            assertEquals(0, descriptions.get(0).get().get("/var/data/kafka").replicaInfos().get(tp).offsetLag());
+            assertFalse(descriptions.get(0).get().get("/var/data/kafka").replicaInfos().get(tp).isFuture());

Review comment:
       These blocks of assertions are quite hard to read. Can we try to make them more digestable? We could perhaps extract temporary variable to reduce the number of `.get()`. We could also define an `verifyDescription` helper that verify a `LogDirDescription` for instance. It may be worth having dedicated unit tests for the new and the old APIs as well.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+
+import java.util.Map;
+
+import static java.util.Collections.unmodifiableMap;
+
+/**
+ * A description of a log directory on a particular broker.
+ */
+public class LogDirDescription {
+    private final Map<TopicPartition, ReplicaInfo> replicaInfos;
+    private final ApiException error;
+
+    public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) {
+        this.error = error;
+        this.replicaInfos = replicaInfos;
+    }
+
+    /**
+     * A KafkaStorageException if this log directory is offline,
+     * possibly some other exception if there were problems describing the log directory
+     * or null if the directory is online.
+     */
+    public ApiException error() {
+        return error;
+    }
+
+    /**
+     * A map from topic partition to replica information for that partition
+     * in this log directory.
+     */
+    public Map<TopicPartition, ReplicaInfo> replicaInfos() {
+        return unmodifiableMap(replicaInfos);
+    }
+
+    @Override
+    public String toString() {
+        return "LogDirDescription{" +
+                "replicaInfos=" + replicaInfos +
+                ", error=" + error +
+                '}';

Review comment:
       nit: We usually use parenthesis instead of curly braces. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#discussion_r454480751



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+
+import java.util.Map;
+
+import static java.util.Collections.unmodifiableMap;
+
+/**
+ * A description of a log directory on a particular broker.
+ */
+public class LogDirDescription {
+    private final Map<TopicPartition, ReplicaInfo> replicaInfos;
+    private final ApiException error;
+
+    public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) {

Review comment:
       Is it better to have package-private visibility?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#discussion_r454487144



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
##########
@@ -95,6 +76,7 @@ public static DescribeLogDirsResponse parse(ByteBuffer buffer, short version) {
      * KAFKA_STORAGE_ERROR (56)
      * UNKNOWN (-1)
      */
+    @Deprecated

Review comment:
       Does it need comment to describe the replacement? for example
   ```
   @deprecated Deprecated Since Kafka 2.7. Use {@link LogDirDescription}.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-666239450


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on a change in pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on a change in pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#discussion_r454394005



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+
+import java.util.Map;
+
+import static java.util.Collections.unmodifiableMap;
+
+/**
+ * A description of a log directory on a particular broker.
+ */
+public class LogDirDescription {
+    private final Map<TopicPartition, ReplicaInfo> replicaInfos;
+    private final ApiException error;
+
+    public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) {
+        this.error = error;
+        this.replicaInfos = replicaInfos;
+    }
+
+    /**
+     * A KafkaStorageException if this log directory is offline,
+     * possibly some other exception if there were problems describing the log directory
+     * or null if the directory is online.
+     */
+    public ApiException error() {

Review comment:
       How about this? (in consistency with current `DescribeLogDirsResponse.LogDirInfo`):
   
   ```
   Returns `ApiException` if the log directory is offline or an error occurred. If not, returns null.
   <p><ul>
   <li> KafkaStorageException - The log directory is offline.
   <li> UnknownServerException - The server experienced an unexpected error when processing the request.
   </ul><p>
   ```
   
   (Description of `UnknownServerException` was from `Errors.UNKNOWN_SERVER_ERROR`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-658243269


   @dongjinleekr thanks for the review, amended. Also rebased for conflict.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-664836543


   @mimaison done, thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-656690684


   @cmccabe, @omkreddy, @mimaison, @dajac, @dongjinleekr since you all voted on the KIP feel free to review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on a change in pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on a change in pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#discussion_r454369738



##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -64,7 +63,7 @@ object LogDirsCommand {
                     "logDirs" -> logDirInfos.map { case (logDir, logDirInfo) =>
                         Map(
                             "logDir" -> logDir,
-                            "error" -> logDirInfo.error.exceptionName(),
+                            "error" -> Option(logDirInfo.error).map(ex => ex.getClass.getName).orNull,

Review comment:
       +1 for the `Option` way. :smile:




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-657439525


   Thats for the review @dajac. I've addressed your comments. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-666353178


   Failures look unrelated:
   - org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest > shouldUpgradeFromEosAlphaToEosBeta[true] FAILED
   - org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > testOneWayReplicationWithAutorOffsetSync1


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org