You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/03 15:50:37 UTC

[GitHub] [kafka] mimaison opened a new pull request, #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…

mimaison opened a new pull request, #12248:
URL: https://github.com/apache/kafka/pull/12248

   …-827)
   
   This implements KIP-827: https://cwiki.apache.org/confluence/display/KAFKA/KIP-827%3A+Expose+logdirs+total+and+usable+space+via+Kafka+API
   
   Add TotalBytes and UsableBytes to DescribeLogDirsResponse
   Add matching getters on LogDirDescription
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on code in PR #12248:
URL: https://github.com/apache/kafka/pull/12248#discussion_r892224792


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -4144,6 +4148,31 @@ class ReplicaManagerTest {
       replicaManager.shutdown(checkpointHW = false)
     }
   }
+
+  @Test
+  def testDescribeLogDirs(): Unit = {
+    val topicPartition = 0
+    val topicId = Uuid.randomUuid()
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+    val offsetFromLeader = 5
+
+    // Prepare the mocked components for the test
+    val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new MockTimer(time),
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch,
+      expectTruncation = false, localLogOffset = Some(10), offsetFromLeader = offsetFromLeader, topicId = Some(topicId))
+
+    val responses = replicaManager.describeLogDirs(Set(new TopicPartition(topic, topicPartition)))
+    assertEquals(mockLogMgr.liveLogDirs.size, responses.size)
+    responses.foreach { response =>
+      assertEquals(Errors.NONE.code, response.errorCode)
+      assertTrue(response.totalBytes >= 0)

Review Comment:
   Is there a reason due to which totalBytes could be 0? Perhaps we want to  assert a > 0 condition here?



##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java:
##########
@@ -31,6 +31,7 @@
 public class DescribeLogDirsResponse extends AbstractResponse {
 
     public static final long INVALID_OFFSET_LAG = -1L;
+    public static final long UNKNOWN_VOLUME_BYTES = -1L;

Review Comment:
   Do we still want this? 
   Correct me if I am wrong here but I thought that we reached a conclusion in the KIP that Optional will cover the scenario when client is using a newer API and broker is old.



##########
clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java:
##########
@@ -20,19 +20,29 @@
 import org.apache.kafka.common.errors.ApiException;
 
 import java.util.Map;
+import java.util.OptionalLong;
 
 import static java.util.Collections.unmodifiableMap;
+import static org.apache.kafka.common.requests.DescribeLogDirsResponse.UNKNOWN_VOLUME_BYTES;
 
 /**
  * A description of a log directory on a particular broker.
  */
 public class LogDirDescription {
     private final Map<TopicPartition, ReplicaInfo> replicaInfos;
     private final ApiException error;
+    private final OptionalLong totalBytes;
+    private final OptionalLong usableBytes;
 
     public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) {
+        this(error, replicaInfos, UNKNOWN_VOLUME_BYTES, UNKNOWN_VOLUME_BYTES);

Review Comment:
   Instead of UNKNOWN_VOLUME_BYTES, this could be `null` or Optional.Empty()? That way we could get rid of `UNKNOWN_VOLUME_BYTES` completely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison merged pull request #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…

Posted by GitBox <gi...@apache.org>.
mimaison merged PR #12248:
URL: https://github.com/apache/kafka/pull/12248


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…

Posted by GitBox <gi...@apache.org>.
mimaison commented on code in PR #12248:
URL: https://github.com/apache/kafka/pull/12248#discussion_r892528901


##########
clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java:
##########
@@ -54,11 +64,21 @@ public Map<TopicPartition, ReplicaInfo> replicaInfos() {
         return unmodifiableMap(replicaInfos);
     }
 
+    public OptionalLong totalBytes() {

Review Comment:
   Good point, added!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] soarez commented on a diff in pull request #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…

Posted by GitBox <gi...@apache.org>.
soarez commented on code in PR #12248:
URL: https://github.com/apache/kafka/pull/12248#discussion_r891031695


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -812,13 +817,16 @@ class ReplicaManager(val config: KafkaConfig,
 
             new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath)
               .setErrorCode(Errors.NONE.code).setTopics(topicInfos)
+              .setTotalBytes(totalBytes).setUsableBytes(usableBytes)
           case None =>
             new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath)
               .setErrorCode(Errors.NONE.code)
+              .setTotalBytes(totalBytes).setUsableBytes(usableBytes)
         }
 
       } catch {
         case e: KafkaStorageException =>
+          e.printStackTrace()

Review Comment:
   Is this change intended? The exception is already logged in the following line .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…

Posted by GitBox <gi...@apache.org>.
mimaison commented on code in PR #12248:
URL: https://github.com/apache/kafka/pull/12248#discussion_r892499161


##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java:
##########
@@ -31,6 +31,7 @@
 public class DescribeLogDirsResponse extends AbstractResponse {
 
     public static final long INVALID_OFFSET_LAG = -1L;
+    public static final long UNKNOWN_VOLUME_BYTES = -1L;

Review Comment:
   The wire protocol does not have nullable integers. So the new fields are defined to use -1 as the default value in the protocol: https://github.com/apache/kafka/blob/a73b96a7f6c467b32ca30b93534468fd192eeabe/clients/src/main/resources/common/message/DescribeLogDirsResponse.json#L52-L56
   
   This constant is used to map the value in DescribeLogDirsResponse to an Optional: https://github.com/apache/kafka/blob/a73b96a7f6c467b32ca30b93534468fd192eeabe/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java#L44-L45



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…

Posted by GitBox <gi...@apache.org>.
mimaison commented on code in PR #12248:
URL: https://github.com/apache/kafka/pull/12248#discussion_r892507318


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -4144,6 +4148,31 @@ class ReplicaManagerTest {
       replicaManager.shutdown(checkpointHW = false)
     }
   }
+
+  @Test
+  def testDescribeLogDirs(): Unit = {
+    val topicPartition = 0
+    val topicId = Uuid.randomUuid()
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+    val offsetFromLeader = 5
+
+    // Prepare the mocked components for the test
+    val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new MockTimer(time),
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch,
+      expectTruncation = false, localLogOffset = Some(10), offsetFromLeader = offsetFromLeader, topicId = Some(topicId))
+
+    val responses = replicaManager.describeLogDirs(Set(new TopicPartition(topic, topicPartition)))
+    assertEquals(mockLogMgr.liveLogDirs.size, responses.size)
+    responses.foreach { response =>
+      assertEquals(Errors.NONE.code, response.errorCode)
+      assertTrue(response.totalBytes >= 0)

Review Comment:
   True this assertion is a bit too defensive! > 0 should be fine. I'll push an update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on PR #12248:
URL: https://github.com/apache/kafka/pull/12248#issuecomment-1150079126

   Thank you for making the suggested changes. 
   
   The current set of changes don't contain a fully end to end integration test (KafkaAdminClientTest uses response mocks). Please make a change in `DescribeLogDirsRequestTest.java` to validate the new changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…

Posted by GitBox <gi...@apache.org>.
mimaison commented on code in PR #12248:
URL: https://github.com/apache/kafka/pull/12248#discussion_r892529383


##########
clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java:
##########
@@ -54,11 +64,21 @@ public Map<TopicPartition, ReplicaInfo> replicaInfos() {
         return unmodifiableMap(replicaInfos);
     }
 
+    public OptionalLong totalBytes() {
+        return totalBytes;
+    }
+
+    public OptionalLong usableBytes() {

Review Comment:
   I've added some details, let me know if it's not what you had in mind



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…

Posted by GitBox <gi...@apache.org>.
mimaison commented on code in PR #12248:
URL: https://github.com/apache/kafka/pull/12248#discussion_r891335380


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -812,13 +817,16 @@ class ReplicaManager(val config: KafkaConfig,
 
             new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath)
               .setErrorCode(Errors.NONE.code).setTopics(topicInfos)
+              .setTotalBytes(totalBytes).setUsableBytes(usableBytes)
           case None =>
             new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath)
               .setErrorCode(Errors.NONE.code)
+              .setTotalBytes(totalBytes).setUsableBytes(usableBytes)
         }
 
       } catch {
         case e: KafkaStorageException =>
+          e.printStackTrace()

Review Comment:
   Oops, no we obviously don't want that! Thanks, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tombentley commented on a diff in pull request #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…

Posted by GitBox <gi...@apache.org>.
tombentley commented on code in PR #12248:
URL: https://github.com/apache/kafka/pull/12248#discussion_r892498344


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2716,7 +2716,7 @@ private static Map<String, LogDirDescription> logDirDescriptions(DescribeLogDirs
                             new ReplicaInfo(p.partitionSize(), p.offsetLag(), p.isFutureKey()));
                 }
             }
-            result.put(logDirResult.logDir(), new LogDirDescription(Errors.forCode(logDirResult.errorCode()).exception(), replicaInfoMap));
+            result.put(logDirResult.logDir(), new LogDirDescription(Errors.forCode(logDirResult.errorCode()).exception(), replicaInfoMap, logDirResult.totalBytes(), logDirResult.usableBytes()));

Review Comment:
   long line



##########
clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java:
##########
@@ -54,11 +64,21 @@ public Map<TopicPartition, ReplicaInfo> replicaInfos() {
         return unmodifiableMap(replicaInfos);
     }
 
+    public OptionalLong totalBytes() {
+        return totalBytes;
+    }
+
+    public OptionalLong usableBytes() {

Review Comment:
   Javadoc



##########
clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java:
##########
@@ -54,11 +64,21 @@ public Map<TopicPartition, ReplicaInfo> replicaInfos() {
         return unmodifiableMap(replicaInfos);
     }
 
+    public OptionalLong totalBytes() {

Review Comment:
   Javadoc



##########
clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java:
##########
@@ -54,11 +64,21 @@ public Map<TopicPartition, ReplicaInfo> replicaInfos() {
         return unmodifiableMap(replicaInfos);
     }
 
+    public OptionalLong totalBytes() {
+        return totalBytes;
+    }
+
+    public OptionalLong usableBytes() {

Review Comment:
   Also do we want to say something about the contraints on usableBytes and totalBytes (individually and compared with each other).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on pull request #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…

Posted by GitBox <gi...@apache.org>.
mimaison commented on PR #12248:
URL: https://github.com/apache/kafka/pull/12248#issuecomment-1152343721

   Thanks @divijvaidya for the review. I've updated the end to end tests to cover this new feature.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org