You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/09/03 06:21:03 UTC

[1/3] kafka git commit: KAFKA-5694; Add AlterReplicaDirRequest and DescribeReplicaDirRequest (KIP-113 part-1)

Repository: kafka
Updated Branches:
  refs/heads/trunk b2a328daf -> adefc8ea0


http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index dadd002..ce16971 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -12,6 +12,9 @@
   */
 package kafka.admin
 
+import java.util.Collections
+import java.util.Properties
+
 import kafka.admin.ReassignPartitionsCommand._
 import kafka.common.{AdminCommandFailedException, TopicAndPartition}
 import kafka.server.{KafkaConfig, KafkaServer}
@@ -22,14 +25,24 @@ import kafka.zk.ZooKeeperTestHarness
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.{After, Before, Test}
 import kafka.admin.ReplicationQuotaUtils._
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.clients.admin.{AdminClient => JAdminClient}
+import org.apache.kafka.common.TopicPartitionReplica
+
+import scala.collection.JavaConverters._
 import scala.collection.Map
 import scala.collection.Seq
+import scala.util.Random
+
+import java.io.File
 
 class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
   val partitionId = 0
   var servers: Seq[KafkaServer] = null
   val topicName = "my-topic"
   val delayMs = 1000
+  var adminClient: JAdminClient = null
+
   def zkUpdateDelay(): Unit = Thread.sleep(delayMs)
 
   @Before
@@ -38,12 +51,29 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
   }
 
   def startBrokers(brokerIds: Seq[Int]) {
-    servers = brokerIds.map(i => createBrokerConfig(i, zkConnect))
+    servers = brokerIds.map(i => createBrokerConfig(i, zkConnect, logDirCount = 3))
       .map(c => createServer(KafkaConfig.fromProps(c)))
   }
 
+  def createAdminClient(servers: Seq[KafkaServer]): JAdminClient = {
+    val props = new Properties()
+    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers))
+    props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+    JAdminClient.create(props)
+  }
+
+  def getRandomLogDirAssignment(brokerId: Int): String = {
+    val server = servers.find(_.config.brokerId == brokerId).get
+    val logDirs = server.config.logDirs
+    new File(logDirs(Random.nextInt(logDirs.size))).getAbsolutePath
+  }
+
   @After
   override def tearDown() {
+    if (adminClient != null) {
+      adminClient.close()
+      adminClient = null
+    }
     TestUtils.shutdownServers(servers)
     super.tearDown()
   }
@@ -52,16 +82,22 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
   def shouldMoveSinglePartition(): Unit = {
     //Given a single replica on server 100
     startBrokers(Seq(100, 101))
+    adminClient = createAdminClient(servers)
     val partition = 0
+    // Get a random log directory on broker 101
+    val expectedLogDir = getRandomLogDirAssignment(101)
     createTopic(zkUtils, topicName, Map(partition -> Seq(100)), servers = servers)
 
     //When we move the replica on 100 to broker 101
-    val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101]}]}"""
-    ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
+    val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101],"log_dirs":["$expectedLogDir"]}]}"""
+    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient), topicJson, NoThrottle)
     waitForReassignmentToComplete()
 
     //Then the replica should be on 101
     assertEquals(Seq(101), zkUtils.getPartitionAssignmentForTopics(Seq(topicName)).get(topicName).get(partition))
+    // The replica should be in the expected log directory on broker 101
+    val replica = new TopicPartitionReplica(topicName, 0, 101)
+    assertEquals(expectedLogDir, adminClient.describeReplicaLogDir(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir)
   }
 
   @Test
@@ -69,6 +105,9 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     //Given partitions on 2 of 3 brokers
     val brokers = Array(100, 101, 102)
     startBrokers(brokers)
+    adminClient = createAdminClient(servers)
+    // Get a random log directory on broker 102
+    val expectedLogDir = getRandomLogDirAssignment(102)
     createTopic(zkUtils, topicName, Map(
       0 -> Seq(100, 101),
       1 -> Seq(100, 101),
@@ -77,12 +116,19 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     //When rebalancing
     val newAssignment = generateAssignment(zkUtils, brokers, json(topicName), true)._1
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), NoThrottle)
+    // Find a partition on broker 102
+    val partition = newAssignment.find { case (replica, brokerIds) => brokerIds.contains(102)}.get._1.partition
+    val replica = new TopicPartitionReplica(topicName, partition, 102)
+    val newReplicaAssignment = Map(replica -> expectedLogDir)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient),
+      ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, newReplicaAssignment), NoThrottle)
     waitForReassignmentToComplete()
 
     //Then the replicas should span all three brokers
     val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
     assertEquals(Seq(100, 101, 102), actual.values.flatten.toSeq.distinct.sorted)
+    // The replica should be in the expected log directory on broker 102
+    assertEquals(expectedLogDir, adminClient.describeReplicaLogDir(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir)
   }
 
   @Test
@@ -98,7 +144,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     //When rebalancing
     val newAssignment = generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, None,
+      ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty), NoThrottle)
     waitForReassignmentToComplete()
 
     //Then replicas should only span the first two brokers
@@ -111,6 +158,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     //Given partitions on 3 of 3 brokers
     val brokers = Array(100, 101, 102)
     startBrokers(brokers)
+    adminClient = createAdminClient(servers)
     createTopic(zkUtils, "topic1", Map(
       0 -> Seq(100, 101),
       1 -> Seq(101, 102),
@@ -125,11 +173,20 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     val proposed: Map[TopicAndPartition, Seq[Int]] = Map(
       TopicAndPartition("topic1", 0) -> Seq(100, 102),
       TopicAndPartition("topic1", 2) -> Seq(100, 102),
+      TopicAndPartition("topic2", 1) -> Seq(101, 100),
       TopicAndPartition("topic2", 2) -> Seq(100, 102)
     )
 
+    val replica1 = new TopicPartitionReplica("topic1", 0, 102)
+    val replica2 = new TopicPartitionReplica("topic2", 1, 100)
+    val proposedReplicaAssignment: Map[TopicPartitionReplica, String] = Map(
+      replica1 -> getRandomLogDirAssignment(102),
+      replica2 -> getRandomLogDirAssignment(100)
+    )
+
     //When rebalancing
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(proposed), NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient),
+      ReassignPartitionsCommand.formatAsReassignmentJson(proposed, proposedReplicaAssignment), NoThrottle)
     waitForReassignmentToComplete()
 
     //Then the proposed changes should have been made
@@ -138,8 +195,13 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     assertEquals(Seq(101, 102), actual("topic1")(1))
     assertEquals(Seq(100, 102), actual("topic1")(2))//changed
     assertEquals(Seq(100, 101), actual("topic2")(0))
-    assertEquals(Seq(101, 102), actual("topic2")(1))
+    assertEquals(Seq(101, 100), actual("topic2")(1))//changed
     assertEquals(Seq(100, 102), actual("topic2")(2))//changed
+
+    // The replicas should be in the expected log directories
+    val replicaDirs = adminClient.describeReplicaLogDir(List(replica1, replica2).asJavaCollection).all().get()
+    assertEquals(proposedReplicaAssignment(replica1), replicaDirs.get(replica1).getCurrentReplicaLogDir)
+    assertEquals(proposedReplicaAssignment(replica2), replicaDirs.get(replica2).getCurrentReplicaLogDir)
   }
 
   @Test
@@ -164,7 +226,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     val newAssignment = generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
 
     val start = System.currentTimeMillis()
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), initialThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, None,
+      ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty), initialThrottle)
 
     //Check throttle config. Should be throttling replica 0 on 100 and 102 only.
     checkThrottleConfigAddedToZK(initialThrottle.value, servers, topicName, "0:100,0:101", "0:102")
@@ -216,7 +279,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
       TopicAndPartition("topic1", 2) -> Seq(103, 104), //didn't move
       TopicAndPartition("topic2", 2) -> Seq(103, 104)  //didn't move
     )
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(throttle))
+    ReassignPartitionsCommand.executeAssignment(zkUtils, None,
+      ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty), Throttle(throttle))
 
     //Check throttle config. Should be throttling specific replicas for each topic.
     checkThrottleConfigAddedToZK(throttle, servers, "topic1",
@@ -245,13 +309,14 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     //Start rebalance
     val newAssignment = generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
 
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(initialThrottle))
+    ReassignPartitionsCommand.executeAssignment(zkUtils, None,
+      ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty), Throttle(initialThrottle))
 
     //Check throttle config
     checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:101", "0:102")
 
     //Ensure that running Verify, whilst the command is executing, should have no effect
-    verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
+    verifyAssignment(zkUtils, None, ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty))
 
     //Check throttle config again
     checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:101", "0:102")
@@ -259,7 +324,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     //Now re-run the same assignment with a larger throttle, which should only act to increase the throttle and make progress
     val newThrottle = initialThrottle * 1000
 
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(newThrottle))
+    ReassignPartitionsCommand.executeAssignment(zkUtils, None,
+      ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty), Throttle(newThrottle))
 
     //Check throttle was changed
     checkThrottleConfigAddedToZK(newThrottle, servers, topicName, "0:100,0:101", "0:102")
@@ -268,7 +334,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     waitForReassignmentToComplete()
 
     //Verify should remove the throttle
-    verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
+    verifyAssignment(zkUtils, None, ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty))
 
     //Check removed
     checkThrottleConfigRemovedFromZK(topicName, servers)
@@ -286,7 +352,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     //When we execute an assignment that includes an invalid partition (1:101 in this case)
     val topicJson = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":1,"replicas":[101]}]}"""
-    ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, None, topicJson, NoThrottle)
   }
 
   @Test(expected = classOf[AdminCommandFailedException])
@@ -297,7 +363,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     //When we execute an assignment that specifies an empty replica list (0: empty list in this case)
     val topicJson = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[]}]}"""
-    ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, None, topicJson, NoThrottle)
   }
 
   @Test(expected = classOf[AdminCommandFailedException])
@@ -308,7 +374,46 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     //When we execute an assignment that specifies an invalid brokerID (102: invalid broker ID in this case)
     val topicJson = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101, 102]}]}"""
-    ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, None, topicJson, NoThrottle)
+  }
+
+  @Test(expected = classOf[AdminCommandFailedException])
+  def shouldFailIfProposedHasInvalidLogDir() {
+    // Given a single replica on server 100
+    startBrokers(Seq(100, 101))
+    adminClient = createAdminClient(servers)
+    createTopic(zkUtils, topicName, Map(0 -> Seq(100)), servers = servers)
+
+    // When we execute an assignment that specifies an invalid log directory
+    val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101],"log_dirs":["invalidDir"]}]}"""
+    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient), topicJson, NoThrottle)
+  }
+
+  @Test(expected = classOf[AdminCommandFailedException])
+  def shouldFailIfProposedMoveReplicaWithinBroker() {
+    // Given a single replica on server 100
+    startBrokers(Seq(100, 101))
+    adminClient = createAdminClient(servers)
+    val logDir = getRandomLogDirAssignment(100)
+    createTopic(zkUtils, topicName, Map(0 -> Seq(100)), servers = servers)
+
+    // When we execute an assignment that specifies log directory for an existing replica on the broker
+    // This test can be removed after KIP-113 is fully implemented, which allows us to change log directory of existing replicas on a broker
+    val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[100],"log_dirs":["$logDir"]}]}"""
+    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient), topicJson, NoThrottle)
+  }
+
+  @Test(expected = classOf[AdminCommandFailedException])
+  def shouldFailIfProposedHasInconsistentReplicasAndLogDirs() {
+    // Given a single replica on server 100
+    startBrokers(Seq(100, 101))
+    adminClient = createAdminClient(servers)
+    val logDir = getRandomLogDirAssignment(100)
+    createTopic(zkUtils, topicName, Map(0 -> Seq(100)), servers = servers)
+
+    // When we execute an assignment whose length of replicas doesn't match that of replicas
+    val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101],"log_dirs":["$logDir", "$logDir"]}]}"""
+    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient), topicJson, NoThrottle)
   }
 
   @Test
@@ -333,7 +438,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     )
 
     //When we run a throttled reassignment
-    new ReassignPartitionsCommand(zkUtils, move).reassignPartitions(throttle)
+    new ReassignPartitionsCommand(zkUtils, None, move).reassignPartitions(throttle)
 
     waitForReassignmentToComplete()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index c75c28a..09c9ea8 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -40,7 +40,7 @@ class ReassignPartitionsCommandTest extends Logging {
   @Test
   def shouldFindMovingReplicas() {
     val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
-    val assigner = new ReassignPartitionsCommand(null, null)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null)
 
     //Given partition 0 moves from broker 100 -> 102. Partition 1 does not move.
     val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101), control)
@@ -61,7 +61,7 @@ class ReassignPartitionsCommandTest extends Logging {
 
   @Test
   def shouldFindMovingReplicasWhenProposedIsSubsetOfExisting() {
-    val assigner = new ReassignPartitionsCommand(null, null)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null)
 
     //Given we have more existing partitions than we are proposing
     val existingSuperset = Map(
@@ -94,7 +94,7 @@ class ReassignPartitionsCommandTest extends Logging {
   @Test
   def shouldFindMovingReplicasMultiplePartitions() {
     val control = TopicAndPartition("topic1", 2) -> Seq(100, 102)
-    val assigner = new ReassignPartitionsCommand(null, null)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null)
 
     //Given partitions 0 & 1 moves from broker 100 -> 102. Partition 2 does not move.
     val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101), TopicAndPartition("topic1", 1) -> Seq(100, 101), control)
@@ -117,7 +117,7 @@ class ReassignPartitionsCommandTest extends Logging {
   @Test
   def shouldFindMovingReplicasMultipleTopics() {
     val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
-    val assigner = new ReassignPartitionsCommand(null, null)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null)
 
     //Given topics 1 -> move from broker 100 -> 102, topics 2 -> move from broker 101 -> 100
     val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101), TopicAndPartition("topic2", 0) -> Seq(101, 102), control)
@@ -146,7 +146,7 @@ class ReassignPartitionsCommandTest extends Logging {
 
   @Test
   def shouldFindMovingReplicasMultipleTopicsAndPartitions() {
-    val assigner = new ReassignPartitionsCommand(null, null)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null)
 
     //Given
     val existing = Map(
@@ -186,7 +186,7 @@ class ReassignPartitionsCommandTest extends Logging {
   @Test
   def shouldFindTwoMovingReplicasInSamePartition() {
     val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
-    val assigner = new ReassignPartitionsCommand(null, null)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null)
 
     //Given partition 0 has 2 moves from broker 102 -> 104 & 103 -> 105
     val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101, 102, 103), control)
@@ -209,7 +209,7 @@ class ReassignPartitionsCommandTest extends Logging {
   @Test
   def shouldNotOverwriteEntityConfigsWhenUpdatingThrottledReplicas(): Unit = {
     val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
-    val assigner = new ReassignPartitionsCommand(null, null)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null)
     val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101), control)
     val proposed = Map(TopicAndPartition("topic1", 0) -> Seq(101, 102), control)
 
@@ -243,7 +243,7 @@ class ReassignPartitionsCommandTest extends Logging {
     val zk = stubZK(existing)
     val admin = createMock(classOf[AdminUtilities])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
-    val assigner = new ReassignPartitionsCommand(zk, proposed, admin)
+    val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
     expect(admin.fetchEntityConfig(is(zk), anyString(), anyString())).andStubReturn(new Properties)
     expect(admin.changeBrokerConfig(is(zk), anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
     replay(admin)
@@ -269,7 +269,7 @@ class ReassignPartitionsCommandTest extends Logging {
     val zk = stubZK(existing)
     val admin = createMock(classOf[AdminUtilities])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
-    val assigner = new ReassignPartitionsCommand(zk, proposed, admin)
+    val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
     expect(admin.changeBrokerConfig(is(zk), anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
 
     //Expect the existing broker config to be changed from 10/100 to 1000
@@ -303,7 +303,7 @@ class ReassignPartitionsCommandTest extends Logging {
     val zk = stubZK(existing)
     val admin = createMock(classOf[AdminUtilities])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
-    val assigner = new ReassignPartitionsCommand(zk, proposed, admin)
+    val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
     expect(admin.changeBrokerConfig(is(zk), anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
 
     //Given there is some existing config

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index dd59e60..9794b1a 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -275,7 +275,7 @@ class LogManagerTest {
   @Test
   def testRecoveryDirectoryMappingWithRelativeDirectory() {
     logManager.shutdown()
-    logDir = new File("data" + File.separator + logDir.getName)
+    logDir = new File("data" + File.separator + logDir.getName).getAbsoluteFile
     logDir.mkdirs()
     logDir.deleteOnExit()
     logManager = createLogManager()

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/server/AlterReplicaDirRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AlterReplicaDirRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterReplicaDirRequestTest.scala
new file mode 100644
index 0000000..6e22444
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/AlterReplicaDirRequestTest.scala
@@ -0,0 +1,83 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import kafka.network.SocketServer
+import kafka.utils._
+import java.io.File
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests._
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class AlterReplicaDirRequestTest extends BaseRequestTest {
+
+  override def numBrokers: Int = 1
+  override def logDirCount: Int = 5
+
+  val topic = "topic"
+
+  @Test
+  def testAlterReplicaDirRequestBeforeTopicCreation() {
+    val partitionNum = 5
+    val logDir = new File(servers.head.config.logDirs.head).getAbsolutePath
+    val partitionDirs = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir).toMap
+    val alterReplicaDirResponse = sendAlterReplicaDirRequest(partitionDirs)
+
+    // The response should show error REPLICA_NOT_AVAILABLE for all partitions
+    (0 until partitionNum).foreach { partition =>
+      val tp = new TopicPartition(topic, partition)
+      assertEquals(Errors.REPLICA_NOT_AVAILABLE, alterReplicaDirResponse.responses().get(tp))
+      assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+    }
+
+    TestUtils.createTopic(zkUtils, topic, partitionNum, 1, servers)
+    (0 until partitionNum).foreach { partition =>
+      assertEquals(logDir, servers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent)
+    }
+  }
+
+  @Test
+  def testAlterReplicaDirRequestErrorCode(): Unit = {
+    val validDir = new File(servers.head.config.logDirs.head).getAbsolutePath
+    val offlineDir = new File(servers.head.config.logDirs.tail.head).getAbsolutePath
+    servers.head.logDirFailureChannel.maybeAddOfflineLogDir(offlineDir, "", new java.io.IOException())
+    TestUtils.createTopic(zkUtils, topic, 3, 1, servers)
+
+    val partitionDirs = mutable.Map.empty[TopicPartition, String]
+    partitionDirs.put(new TopicPartition(topic, 0), "invalidDir")
+    partitionDirs.put(new TopicPartition(topic, 1), validDir)
+    partitionDirs.put(new TopicPartition(topic, 2), offlineDir)
+
+    val alterReplicaDirResponse = sendAlterReplicaDirRequest(partitionDirs.toMap)
+    assertEquals(Errors.LOG_DIR_NOT_FOUND, alterReplicaDirResponse.responses().get(new TopicPartition(topic, 0)))
+    assertEquals(Errors.NONE, alterReplicaDirResponse.responses().get(new TopicPartition(topic, 1)))
+    assertEquals(Errors.KAFKA_STORAGE_ERROR, alterReplicaDirResponse.responses().get(new TopicPartition(topic, 2)))
+  }
+
+  private def sendAlterReplicaDirRequest(partitionDirs: Map[TopicPartition, String], socketServer: SocketServer = controllerSocketServer): AlterReplicaDirResponse = {
+    val request = new AlterReplicaDirRequest.Builder(partitionDirs.asJava).build()
+    val response = connectAndSend(request, ApiKeys.ALTER_REPLICA_DIR, socketServer)
+    AlterReplicaDirResponse.parse(response, request.version)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index fffe3a8..a2ff35e 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -36,6 +36,8 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
   // If required, set number of brokers
   protected def numBrokers: Int = 3
 
+  protected def logDirCount: Int = 1
+
   // If required, override properties by mutating the passed Properties object
   protected def propertyOverrides(properties: Properties) {}
 
@@ -43,7 +45,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
     val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
       enableControlledShutdown = false, enableDeleteTopic = true,
       interBrokerSecurityProtocol = Some(securityProtocol),
-      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
+      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
     props.foreach(propertyOverrides)
     props.map(KafkaConfig.fromProps)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
new file mode 100644
index 0000000..353c180
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
@@ -0,0 +1,64 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import kafka.utils._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests._
+import org.junit.Assert._
+import org.junit.Test
+import java.io.File
+
+class DescribeLogDirsRequestTest extends BaseRequestTest {
+
+  override def numBrokers: Int = 1
+  override def logDirCount: Int = 2
+
+  val topic = "topic"
+  val partitionNum = 2
+  val tp0 = new TopicPartition(topic, 0)
+  val tp1 = new TopicPartition(topic, 1)
+
+  @Test
+  def testDescribeLogDirsRequest(): Unit = {
+    val onlineDir = new File(servers.head.config.logDirs.head).getAbsolutePath
+    val offlineDir = new File(servers.head.config.logDirs.tail.head).getAbsolutePath
+    servers.head.replicaManager.handleLogDirFailure(offlineDir)
+    TestUtils.createTopic(zkUtils, topic, partitionNum, 1, servers)
+    TestUtils.produceMessages(servers, topic, 10)
+
+    val request = new DescribeLogDirsRequest.Builder(null).build()
+    val response = connectAndSend(request, ApiKeys.DESCRIBE_LOG_DIRS, controllerSocketServer)
+    val logDirInfos = DescribeLogDirsResponse.parse(response, request.version).logDirInfos()
+
+    assertEquals(logDirCount, logDirInfos.size())
+    assertEquals(Errors.KAFKA_STORAGE_ERROR, logDirInfos.get(offlineDir).error)
+    assertEquals(0, logDirInfos.get(offlineDir).replicaInfos.size())
+
+    assertEquals(Errors.NONE, logDirInfos.get(onlineDir).error)
+    val replicaInfo0 = logDirInfos.get(onlineDir).replicaInfos.get(tp0)
+    val replicaInfo1 = logDirInfos.get(onlineDir).replicaInfos.get(tp1)
+    assertEquals(servers.head.logManager.getLog(tp0).get.size, replicaInfo0.size)
+    assertEquals(servers.head.logManager.getLog(tp1).get.size, replicaInfo1.size)
+    assertTrue(servers.head.logManager.getLog(tp0).get.logEndOffset > 0)
+    assertEquals(servers.head.replicaManager.getLogEndOffsetLag(tp0), replicaInfo0.offsetLag)
+    assertEquals(servers.head.replicaManager.getLogEndOffsetLag(tp1), replicaInfo1.offsetLag)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index aadb4d2..aec68e2 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -49,6 +49,7 @@ class RequestQuotaTest extends BaseRequestTest {
   private val topic = "topic-1"
   private val numPartitions = 1
   private val tp = new TopicPartition(topic, 0)
+  private val logDir = "logDir"
   private val unthrottledClientId = "unthrottled-client"
   private val brokerId: Integer = 0
   private var leaderNode: KafkaServer = null
@@ -290,6 +291,12 @@ class RequestQuotaTest extends BaseRequestTest {
                 new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
               ))), true)
 
+        case ApiKeys.ALTER_REPLICA_DIR =>
+          new AlterReplicaDirRequest.Builder(Collections.singletonMap(tp, logDir))
+
+        case ApiKeys.DESCRIBE_LOG_DIRS =>
+          new DescribeLogDirsRequest.Builder(Collections.singleton(tp))
+
         case _ =>
           throw new IllegalArgumentException("Unsupported API key " + apiKey)
     }
@@ -381,6 +388,8 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.DELETE_ACLS => new DeleteAclsResponse(response).throttleTimeMs
       case ApiKeys.DESCRIBE_CONFIGS => new DescribeConfigsResponse(response).throttleTimeMs
       case ApiKeys.ALTER_CONFIGS => new AlterConfigsResponse(response).throttleTimeMs
+      case ApiKeys.ALTER_REPLICA_DIR => new AlterReplicaDirResponse(response).throttleTimeMs
+      case ApiKeys.DESCRIBE_LOG_DIRS => new DescribeLogDirsResponse(response).throttleTimeMs
       case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2a08311..a52c83c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -197,19 +197,23 @@ object TestUtils extends Logging {
     *
     * Note that if `interBrokerSecurityProtocol` is defined, the listener for the `SecurityProtocol` will be enabled.
     */
-  def createBrokerConfig(nodeId: Int, zkConnect: String,
-    enableControlledShutdown: Boolean = true,
-    enableDeleteTopic: Boolean = false,
-    port: Int = RandomPort,
-    interBrokerSecurityProtocol: Option[SecurityProtocol] = None,
-    trustStoreFile: Option[File] = None,
-    saslProperties: Option[Properties] = None,
-    enablePlaintext: Boolean = true,
-    enableSaslPlaintext: Boolean = false, saslPlaintextPort: Int = RandomPort,
-    enableSsl: Boolean = false, sslPort: Int = RandomPort,
-    enableSaslSsl: Boolean = false, saslSslPort: Int = RandomPort, rack: Option[String] = None, logDirCount: Int = 1)
-  : Properties = {
-
+  def createBrokerConfig(nodeId: Int,
+                         zkConnect: String,
+                         enableControlledShutdown: Boolean = true,
+                         enableDeleteTopic: Boolean = false,
+                         port: Int = RandomPort,
+                         interBrokerSecurityProtocol: Option[SecurityProtocol] = None,
+                         trustStoreFile: Option[File] = None,
+                         saslProperties: Option[Properties] = None,
+                         enablePlaintext: Boolean = true,
+                         enableSaslPlaintext: Boolean = false,
+                         saslPlaintextPort: Int = RandomPort,
+                         enableSsl: Boolean = false,
+                         sslPort: Int = RandomPort,
+                         enableSaslSsl: Boolean = false,
+                         saslSslPort: Int = RandomPort,
+                         rack: Option[String] = None,
+                         logDirCount: Int = 1): Properties = {
     def shouldEnable(protocol: SecurityProtocol) = interBrokerSecurityProtocol.fold(false)(_ == protocol)
 
     val protocolAndPorts = ArrayBuffer[(SecurityProtocol, Int)]()


[3/3] kafka git commit: KAFKA-5694; Add AlterReplicaDirRequest and DescribeReplicaDirRequest (KIP-113 part-1)

Posted by jq...@apache.org.
KAFKA-5694; Add AlterReplicaDirRequest and DescribeReplicaDirRequest (KIP-113 part-1)

Author: Dong Lin <li...@gmail.com>

Reviewers: Jun Rao <ju...@gmail.com>, Jiangjie Qin <be...@gmail.com>, Colin P. Mccabe <cm...@confluent.io>

Closes #3621 from lindong28/KAFKA-5694


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/adefc8ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/adefc8ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/adefc8ea

Branch: refs/heads/trunk
Commit: adefc8ea076354e07839f0319fee1fba52343b91
Parents: b2a328d
Author: Dong Lin <li...@gmail.com>
Authored: Sat Sep 2 23:20:13 2017 -0700
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Sat Sep 2 23:20:13 2017 -0700

----------------------------------------------------------------------
 bin/kafka-log-dirs.sh                           |  17 ++
 .../kafka/clients/admin/AbstractOptions.java    |  46 ++++
 .../apache/kafka/clients/admin/AdminClient.java |  77 +++++-
 .../kafka/clients/admin/AdminClientConfig.java  |  10 +-
 .../clients/admin/AlterConfigsOptions.java      |  20 +-
 .../clients/admin/AlterReplicaDirOptions.java   |  29 ++
 .../clients/admin/AlterReplicaDirResult.java    |  57 ++++
 .../kafka/clients/admin/CreateAclsOptions.java  |  20 +-
 .../clients/admin/CreateTopicsOptions.java      |  21 +-
 .../kafka/clients/admin/DeleteAclsOptions.java  |  20 +-
 .../clients/admin/DeleteTopicsOptions.java      |  20 +-
 .../clients/admin/DescribeAclsOptions.java      |  20 +-
 .../clients/admin/DescribeClusterOptions.java   |  20 +-
 .../clients/admin/DescribeConfigsOptions.java   |  19 +-
 .../clients/admin/DescribeLogDirsOptions.java   |  33 +++
 .../clients/admin/DescribeLogDirsResult.java    |  70 +++++
 .../admin/DescribeReplicaLogDirOptions.java     |  31 +++
 .../admin/DescribeReplicaLogDirResult.java      | 132 +++++++++
 .../clients/admin/DescribeTopicsOptions.java    |  20 +-
 .../kafka/clients/admin/KafkaAdminClient.java   | 195 ++++++++++++-
 .../kafka/clients/admin/ListTopicsOptions.java  |  21 +-
 .../kafka/common/TopicPartitionReplica.java     |  91 +++++++
 .../common/errors/KafkaStorageException.java    |   2 +-
 .../common/errors/LogDirNotFoundException.java  |  37 +++
 .../apache/kafka/common/protocol/ApiKeys.java   |   4 +-
 .../apache/kafka/common/protocol/Errors.java    |   8 +
 .../apache/kafka/common/protocol/Protocol.java  |  56 +++-
 .../kafka/common/requests/AbstractRequest.java  |   4 +
 .../kafka/common/requests/AbstractResponse.java |   4 +
 .../common/requests/AlterReplicaDirRequest.java | 148 ++++++++++
 .../requests/AlterReplicaDirResponse.java       | 114 ++++++++
 .../common/requests/DescribeLogDirsRequest.java | 145 ++++++++++
 .../requests/DescribeLogDirsResponse.java       | 184 +++++++++++++
 .../main/scala/kafka/admin/LogDirsCommand.scala | 114 ++++++++
 .../kafka/admin/ReassignPartitionsCommand.scala | 272 ++++++++++++++++---
 core/src/main/scala/kafka/log/LogManager.scala  |  35 ++-
 .../src/main/scala/kafka/server/KafkaApis.scala |  34 ++-
 .../main/scala/kafka/server/KafkaServer.scala   |   2 +-
 .../scala/kafka/server/ReplicaManager.scala     |  89 +++++-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   8 +-
 .../kafka/api/AdminClientIntegrationTest.scala  |  81 +++++-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  22 +-
 .../other/kafka/ReplicationQuotasTestRig.scala  |   2 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |  10 +-
 .../unit/kafka/admin/DeleteTopicTest.scala      |   2 +-
 .../admin/ReassignPartitionsClusterTest.scala   | 139 ++++++++--
 .../admin/ReassignPartitionsCommandTest.scala   |  20 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |   2 +-
 .../server/AlterReplicaDirRequestTest.scala     |  83 ++++++
 .../unit/kafka/server/BaseRequestTest.scala     |   4 +-
 .../server/DescribeLogDirsRequestTest.scala     |  64 +++++
 .../unit/kafka/server/RequestQuotaTest.scala    |   9 +
 .../test/scala/unit/kafka/utils/TestUtils.scala |  30 +-
 53 files changed, 2399 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/bin/kafka-log-dirs.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-log-dirs.sh b/bin/kafka-log-dirs.sh
new file mode 100755
index 0000000..dc16edc
--- /dev/null
+++ b/bin/kafka-log-dirs.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.LogDirsCommand "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java
new file mode 100644
index 0000000..5b13dea
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+
+/*
+ * This class implements the common APIs that are shared by Options classes for various AdminClient commands
+ */
+public abstract class AbstractOptions<T extends AbstractOptions> {
+
+    private Integer timeoutMs = null;
+
+    /**
+     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
+     * AdminClient should be used.
+     */
+    public T timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return (T) this;
+    }
+
+    /**
+     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
+     * AdminClient should be used.
+     */
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index dd2aad6..3f4a07c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.annotation.InterfaceStability;
@@ -335,9 +336,6 @@ public abstract class AdminClient implements AutoCloseable {
     /**
      * Update the configuration for the specified resources with the default options.
      *
-     * Updates are not transactional so they may succeed for some resources while fail for others. The configs for
-     * a particular resource are updated atomically.
-     *
      * This operation is supported by brokers with version 0.11.0.0 or higher.
      *
      * @param configs         The resources with their configs (topic is the only resource type with configs that can
@@ -346,4 +344,77 @@ public abstract class AdminClient implements AutoCloseable {
      * @return                The AlterConfigsResult
      */
     public abstract AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);
+
+    /**
+     * Change the log directory for the specified replicas. This API is currently only useful if it is used
+     * before the replica has been created on the broker. It will support moving replicas that have already been created after
+     * KIP-113 is fully implemented.
+     *
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param replicaAssignment  The replicas with their log directory absolute path
+     * @return                   The AlterReplicaDirResult
+     */
+    public AlterReplicaDirResult alterReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment) {
+        return alterReplicaDir(replicaAssignment, new AlterReplicaDirOptions());
+    }
+
+    /**
+     * Change the log directory for the specified replicas. This API is currently only useful if it is used
+     * before the replica has been created on the broker. It will support moving replicas that have already been created after
+     * KIP-113 is fully implemented.
+     *
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param replicaAssignment  The replicas with their log directory absolute path
+     * @param options            The options to use when changing replica dir
+     * @return                   The AlterReplicaDirResult
+     */
+    public abstract AlterReplicaDirResult alterReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaDirOptions options);
+
+    /**
+     * Query the information of all log directories on the given set of brokers
+     *
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param brokers     A list of brokers
+     * @return            The DescribeLogDirsResult
+     */
+    public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers) {
+        return describeLogDirs(brokers, new DescribeLogDirsOptions());
+    }
+
+    /**
+     * Query the information of all log directories on the given set of brokers
+     *
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param brokers     A list of brokers
+     * @param options     The options to use when querying log dir info
+     * @return            The DescribeLogDirsResult
+     */
+    public abstract DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options);
+
+    /**
+     * Query the replica log directory information for the specified replicas.
+     *
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param replicas      The replicas to query
+     * @return              The DescribeReplicaLogDirResult
+     */
+    public DescribeReplicaLogDirResult describeReplicaLogDir(Collection<TopicPartitionReplica> replicas) {
+        return describeReplicaLogDir(replicas, new DescribeReplicaLogDirOptions());
+    }
+
+    /**
+     * Query the replica log directory information for the specified replicas.
+     *
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param replicas      The replicas to query
+     * @param options       The options to use when querying replica log dir info
+     * @return              The DescribeReplicaLogDirResult
+     */
+    public abstract DescribeReplicaLogDirResult describeReplicaLogDir(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirOptions options);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index ed51e67..f0a117c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -154,11 +154,11 @@ public class AdminClientConfig extends AbstractConfig {
                                 .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
                                 .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
                                 .define(METRICS_RECORDING_LEVEL_CONFIG,
-                                    Type.STRING,
-                                    Sensor.RecordingLevel.INFO.toString(),
-                                    in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
-                                    Importance.LOW,
-                                    METRICS_RECORDING_LEVEL_DOC)
+                                        Type.STRING,
+                                        Sensor.RecordingLevel.INFO.toString(),
+                                        in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
+                                        Importance.LOW,
+                                        METRICS_RECORDING_LEVEL_DOC)
                                 // security support
                                 .define(SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
index c5665c0..7c84e05 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
@@ -27,29 +27,11 @@ import java.util.Map;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class AlterConfigsOptions {
+public class AlterConfigsOptions extends AbstractOptions<AlterConfigsOptions> {
 
-    private Integer timeoutMs = null;
     private boolean validateOnly = false;
 
     /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public AlterConfigsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
-
-    /**
      * Return true if the request should be validated without altering the configs.
      */
     public boolean shouldValidateOnly() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirOptions.java
new file mode 100644
index 0000000..68d2ab6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Map;
+
+/**
+ * Options for {@link AdminClient#alterReplicaDir(Map, AlterReplicaDirOptions)}.
+ */
+@InterfaceStability.Evolving
+public class AlterReplicaDirOptions extends AbstractOptions<AlterReplicaDirOptions> {
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirResult.java
new file mode 100644
index 0000000..55bf85b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirResult.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Map;
+
+
+/**
+ * The result of {@link AdminClient#alterReplicaDir(Map, AlterReplicaDirOptions)}.
+ */
+@InterfaceStability.Evolving
+public class AlterReplicaDirResult {
+    private final Map<TopicPartitionReplica, KafkaFuture<Void>> futures;
+
+    AlterReplicaDirResult(Map<TopicPartitionReplica, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     *
+     * Return a map from replica to future which can be used to check the status of individual replica movement.
+     *
+     * Possible error code:
+     *
+     * LOG_DIR_NOT_FOUND (57)
+     * KAFKA_STORAGE_ERROR (56)
+     * REPLICA_NOT_AVAILABLE (9)
+     * UNKNOWN (-1)
+     */
+    public Map<TopicPartitionReplica, KafkaFuture<Void>> values() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds if all the replica movement have succeeded
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
index 008c678..410f079 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
@@ -27,24 +27,6 @@ import java.util.Collection;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class CreateAclsOptions {
-    private Integer timeoutMs = null;
-
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public CreateAclsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
+public class CreateAclsOptions extends AbstractOptions<CreateAclsOptions> {
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
index cb23a8d..7d4bd9e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
@@ -27,26 +27,9 @@ import java.util.Collection;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class CreateTopicsOptions {
-    private Integer timeoutMs = null;
-    private boolean validateOnly = false;
-
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public CreateTopicsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
+public class CreateTopicsOptions extends AbstractOptions<CreateTopicsOptions> {
 
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
+    private boolean validateOnly = false;
 
     /**
      * Set to true if the request should be validated without creating the topic.

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
index e06e775..ca57978 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
@@ -27,24 +27,6 @@ import java.util.Collection;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class DeleteAclsOptions {
-    private Integer timeoutMs = null;
-
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public DeleteAclsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
+public class DeleteAclsOptions extends AbstractOptions<DeleteAclsOptions> {
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
index ffe4ed7..d7c5af3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
@@ -27,24 +27,6 @@ import java.util.Collection;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class DeleteTopicsOptions {
-    private Integer timeoutMs = null;
-
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public DeleteTopicsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
+public class DeleteTopicsOptions extends AbstractOptions<DeleteTopicsOptions> {
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
index 55ae2e4..097cd19 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
@@ -26,24 +26,6 @@ import org.apache.kafka.common.annotation.InterfaceStability;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class DescribeAclsOptions {
-    private Integer timeoutMs = null;
-
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public DescribeAclsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
+public class DescribeAclsOptions extends AbstractOptions<DescribeAclsOptions> {
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
index 0c3ea51..cb5652b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
@@ -25,24 +25,6 @@ import org.apache.kafka.common.annotation.InterfaceStability;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class DescribeClusterOptions {
-    private Integer timeoutMs = null;
-
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public DescribeClusterOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
+public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptions> {
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
index cc7d9cc..bb37e6b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
@@ -27,23 +27,6 @@ import java.util.Collection;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class DescribeConfigsOptions {
-    private Integer timeoutMs = null;
+public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptions> {
 
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public DescribeConfigsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsOptions.java
new file mode 100644
index 0000000..48711bf
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsOptions.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+
+/**
+ * Options for {@link AdminClient#describeLogDirs(Collection<Integer>)}
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeLogDirsOptions extends AbstractOptions<DescribeLogDirsOptions> {
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
new file mode 100644
index 0000000..907b48d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.HashMap;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo;
+
+
+/**
+ * The result of the {@link AdminClient#describeLogDirs(Collection<Integer>)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeLogDirsResult {
+    private final Map<Integer, KafkaFuture<Map<String, LogDirInfo>>> futures;
+
+    DescribeLogDirsResult(Map<Integer, KafkaFuture<Map<String, LogDirInfo>>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from brokerId to future which can be used to check the information of partitions on each individual broker
+     */
+    public Map<Integer, KafkaFuture<Map<String, LogDirInfo>>> values() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds only if all the brokers have responded without error
+     */
+    public KafkaFuture<Map<Integer, Map<String, LogDirInfo>>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+            thenApply(new KafkaFuture.Function<Void, Map<Integer, Map<String, LogDirInfo>>>() {
+                @Override
+                public Map<Integer, Map<String, LogDirInfo>> apply(Void v) {
+                    Map<Integer, Map<String, LogDirInfo>> descriptions = new HashMap<>(futures.size());
+                    for (Map.Entry<Integer, KafkaFuture<Map<String, LogDirInfo>>> entry : futures.entrySet()) {
+                        try {
+                            descriptions.put(entry.getKey(), entry.getValue().get());
+                        } catch (InterruptedException | ExecutionException e) {
+                            // This should be unreachable, because allOf ensured that all the futures completed successfully.
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    return descriptions;
+                }
+            });
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirOptions.java
new file mode 100644
index 0000000..72d9643
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Collection;
+
+/**
+ * Options for {@link AdminClient#describeReplicaLogDir(Collection<org.apache.kafka.common.TopicPartitionReplica>)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeReplicaLogDirOptions extends AbstractOptions<DescribeReplicaLogDirOptions> {
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirResult.java
new file mode 100644
index 0000000..6139cc7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirResult.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+
+
+/**
+ * The result of {@link AdminClient#describeReplicaLogDir(Collection)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeReplicaLogDirResult {
+    private final Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> futures;
+
+    DescribeReplicaLogDirResult(Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from replica to future which can be used to check the log directory information of individual replicas
+     */
+    public Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> values() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds if log directory information of all replicas are available
+     */
+    public KafkaFuture<Map<TopicPartitionReplica, ReplicaLogDirInfo>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+            thenApply(new KafkaFuture.Function<Void, Map<TopicPartitionReplica, ReplicaLogDirInfo>>() {
+                @Override
+                public Map<TopicPartitionReplica, ReplicaLogDirInfo> apply(Void v) {
+                    Map<TopicPartitionReplica, ReplicaLogDirInfo> replicaLogDirInfos = new HashMap<>();
+                    for (Map.Entry<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> entry : futures.entrySet()) {
+                        try {
+                            replicaLogDirInfos.put(entry.getKey(), entry.getValue().get());
+                        } catch (InterruptedException | ExecutionException e) {
+                            // This should be unreachable, because allOf ensured that all the futures completed successfully.
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    return replicaLogDirInfos;
+                }
+            });
+    }
+
+    static public class ReplicaLogDirInfo {
+        // The current log directory of the replica of this partition on the given broker.
+        // Null if no replica is not found for this partition on the given broker.
+        private final String currentReplicaLogDir;
+        // Defined as max(HW of partition - LEO of the replica, 0).
+        private final long currentReplicaOffsetLag;
+        // The future log directory of the replica of this partition on the given broker.
+        // Null if the replica of this partition is not being moved to another log directory on the given broker.
+        private final String futureReplicaLogDir;
+        // The LEO of the replica - LEO of the future log of this replica in the destination log directory.
+        // -1 if either there is not replica for this partition or the replica of this partition is not being moved to another log directory on the given broker.
+        private final long futureReplicaOffsetLag;
+
+        ReplicaLogDirInfo() {
+            this(null, DescribeLogDirsResponse.INVALID_OFFSET_LAG, null, DescribeLogDirsResponse.INVALID_OFFSET_LAG);
+        }
+
+        ReplicaLogDirInfo(String currentReplicaLogDir,
+                          long currentReplicaOffsetLag,
+                          String futureReplicaLogDir,
+                          long futureReplicaOffsetLag) {
+            this.currentReplicaLogDir = currentReplicaLogDir;
+            this.currentReplicaOffsetLag = currentReplicaOffsetLag;
+            this.futureReplicaLogDir = futureReplicaLogDir;
+            this.futureReplicaOffsetLag = futureReplicaOffsetLag;
+        }
+
+        public String getCurrentReplicaLogDir() {
+            return currentReplicaLogDir;
+        }
+
+        public long getCurrentReplicaOffsetLag() {
+            return currentReplicaOffsetLag;
+        }
+
+        public String getFutureReplicaLogDir() {
+            return futureReplicaLogDir;
+        }
+
+        public long getFutureReplicaOffsetLag() {
+            return futureReplicaOffsetLag;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            if (futureReplicaLogDir != null) {
+                builder.append("(currentReplicaLogDir=")
+                    .append(currentReplicaLogDir)
+                    .append(", futureReplicaLogDir=")
+                    .append(futureReplicaLogDir)
+                    .append(", futureReplicaOffsetLag=")
+                    .append(futureReplicaOffsetLag)
+                    .append(")");
+            } else {
+                builder.append("ReplicaLogDirInfo(currentReplicaLogDir=").append(currentReplicaLogDir).append(")");
+            }
+            return builder.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
index f81569e..64ead48 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
@@ -27,24 +27,6 @@ import java.util.Collection;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class DescribeTopicsOptions {
-    private Integer timeoutMs = null;
-
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public DescribeTopicsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
+public class DescribeTopicsOptions extends AbstractOptions<DescribeTopicsOptions> {
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index dca9f16..49ac93e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.Set;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
@@ -26,12 +27,15 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
+import org.apache.kafka.clients.admin.DescribeReplicaLogDirResult.ReplicaLogDirInfo;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.annotation.InterfaceStability;
@@ -58,6 +62,8 @@ import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.AlterConfigsRequest;
 import org.apache.kafka.common.requests.AlterConfigsResponse;
+import org.apache.kafka.common.requests.AlterReplicaDirRequest;
+import org.apache.kafka.common.requests.AlterReplicaDirResponse;
 import org.apache.kafka.common.requests.CreateAclsRequest;
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsResponse;
@@ -75,6 +81,8 @@ import org.apache.kafka.common.requests.DescribeAclsResponse;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.DescribeConfigsRequest;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.requests.DescribeLogDirsRequest;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.Resource;
@@ -395,7 +403,7 @@ public class KafkaAdminClient extends AdminClient {
             thread.join();
 
             AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
-            
+
             log.debug("Kafka admin client closed.");
         } catch (InterruptedException e) {
             log.debug("Interrupted while joining I/O thread", e);
@@ -1595,4 +1603,189 @@ public class KafkaAdminClient extends AdminClient {
         }, now);
         return new AlterConfigsResult(new HashMap<ConfigResource, KafkaFuture<Void>>(futures));
     }
+
+    @Override
+    public AlterReplicaDirResult alterReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaDirOptions options) {
+        final Map<TopicPartitionReplica, KafkaFutureImpl<Void>> futures = new HashMap<>(replicaAssignment.size());
+
+        for (TopicPartitionReplica replica : replicaAssignment.keySet()) {
+            futures.put(replica, new KafkaFutureImpl<Void>());
+        }
+
+        Map<Integer, Map<TopicPartition, String>> replicaAssignmentByBroker = new HashMap<>();
+
+        for (Map.Entry<TopicPartitionReplica, String> entry: replicaAssignment.entrySet()) {
+            TopicPartitionReplica replica = entry.getKey();
+            String logDir = entry.getValue();
+            int brokerId = replica.brokerId();
+            TopicPartition topicPartition = new TopicPartition(replica.topic(), replica.partition());
+            if (!replicaAssignmentByBroker.containsKey(brokerId))
+                replicaAssignmentByBroker.put(brokerId, new HashMap<TopicPartition, String>());
+            replicaAssignmentByBroker.get(brokerId).put(topicPartition, logDir);
+        }
+
+        final long now = time.milliseconds();
+        for (Map.Entry<Integer, Map<TopicPartition, String>> entry: replicaAssignmentByBroker.entrySet()) {
+            final int brokerId = entry.getKey();
+            final Map<TopicPartition, String> assignment = entry.getValue();
+
+            runnable.call(new Call("alterReplicaDir", calcDeadlineMs(now, options.timeoutMs()),
+                new ConstantNodeIdProvider(brokerId)) {
+
+                @Override
+                public AbstractRequest.Builder createRequest(int timeoutMs) {
+                    return new AlterReplicaDirRequest.Builder(assignment);
+                }
+
+                @Override
+                public void handleResponse(AbstractResponse abstractResponse) {
+                    AlterReplicaDirResponse response = (AlterReplicaDirResponse) abstractResponse;
+                    for (Map.Entry<TopicPartition, Errors> responseEntry: response.responses().entrySet()) {
+                        TopicPartition tp = responseEntry.getKey();
+                        Errors error = responseEntry.getValue();
+                        TopicPartitionReplica replica = new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId);
+                        KafkaFutureImpl<Void> future = futures.get(replica);
+                        if (future == null) {
+                            handleFailure(new IllegalArgumentException(
+                                "The partition " + tp + " in the response from broker " + brokerId + " is not in the request"));
+                        } else if (error == Errors.NONE) {
+                            future.complete(null);
+                        } else {
+                            future.completeExceptionally(error.exception());
+                        }
+                    }
+                }
+                @Override
+                void handleFailure(Throwable throwable) {
+                    completeAllExceptionally(futures.values(), throwable);
+                }
+            }, now);
+        }
+
+        return new AlterReplicaDirResult(new HashMap<TopicPartitionReplica, KafkaFuture<Void>>(futures));
+    }
+
+    @Override
+    public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options) {
+        final Map<Integer, KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>>> futures = new HashMap<>(brokers.size());
+
+        for (Integer brokerId: brokers) {
+            futures.put(brokerId, new KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>>());
+        }
+
+        final long now = time.milliseconds();
+        for (final Integer brokerId: brokers) {
+            runnable.call(new Call("describeLogDirs", calcDeadlineMs(now, options.timeoutMs()),
+                new ConstantNodeIdProvider(brokerId)) {
+
+                @Override
+                public AbstractRequest.Builder createRequest(int timeoutMs) {
+                    // Query selected partitions in all log directories
+                    return new DescribeLogDirsRequest.Builder(null);
+                }
+
+                @Override
+                public void handleResponse(AbstractResponse abstractResponse) {
+                    DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse;
+                    KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>> future = futures.get(brokerId);
+                    if (response.logDirInfos().size() > 0) {
+                        future.complete(response.logDirInfos());
+                    } else {
+                        // response.logDirInfos() will be empty if and only if the user is not authorized to describe clsuter resource.
+                        future.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
+                    }
+                }
+                @Override
+                void handleFailure(Throwable throwable) {
+                    completeAllExceptionally(futures.values(), throwable);
+                }
+            }, now);
+        }
+
+        return new DescribeLogDirsResult(new HashMap<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>>(futures));
+    }
+
+    @Override
+    public DescribeReplicaLogDirResult describeReplicaLogDir(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirOptions options) {
+        final Map<TopicPartitionReplica, KafkaFutureImpl<DescribeReplicaLogDirResult.ReplicaLogDirInfo>> futures = new HashMap<>(replicas.size());
+
+        for (TopicPartitionReplica replica : replicas) {
+            futures.put(replica, new KafkaFutureImpl<DescribeReplicaLogDirResult.ReplicaLogDirInfo>());
+        }
+
+        Map<Integer, Set<TopicPartition>> partitionsByBroker = new HashMap<>();
+
+        for (TopicPartitionReplica replica: replicas) {
+            if (!partitionsByBroker.containsKey(replica.brokerId()))
+                partitionsByBroker.put(replica.brokerId(), new HashSet<TopicPartition>());
+            partitionsByBroker.get(replica.brokerId()).add(new TopicPartition(replica.topic(), replica.partition()));
+        }
+
+        final long now = time.milliseconds();
+        for (Map.Entry<Integer, Set<TopicPartition>> entry: partitionsByBroker.entrySet()) {
+            final int brokerId = entry.getKey();
+            final Set<TopicPartition> topicPartitions = entry.getValue();
+            final Map<TopicPartition, ReplicaLogDirInfo> replicaDirInfoByPartition = new HashMap<>();
+            for (TopicPartition topicPartition: topicPartitions)
+                replicaDirInfoByPartition.put(topicPartition, new ReplicaLogDirInfo());
+
+            runnable.call(new Call("describeReplicaLogDir", calcDeadlineMs(now, options.timeoutMs()),
+                new ConstantNodeIdProvider(brokerId)) {
+
+                @Override
+                public AbstractRequest.Builder createRequest(int timeoutMs) {
+                    // Query selected partitions in all log directories
+                    return new DescribeLogDirsRequest.Builder(topicPartitions);
+                }
+
+                @Override
+                public void handleResponse(AbstractResponse abstractResponse) {
+                    DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse;
+                    for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> responseEntry: response.logDirInfos().entrySet()) {
+                        String logDir = responseEntry.getKey();
+                        DescribeLogDirsResponse.LogDirInfo logDirInfo = responseEntry.getValue();
+
+                        // No replica info will be provided if the log directory is offline
+                        if (logDirInfo.error == Errors.KAFKA_STORAGE_ERROR)
+                            continue;
+                        else if (logDirInfo.error != Errors.NONE)
+                            handleFailure(new IllegalArgumentException(
+                                "The error " + logDirInfo.error + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal"));
+
+                        for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfoEntry: logDirInfo.replicaInfos.entrySet()) {
+                            TopicPartition tp = replicaInfoEntry.getKey();
+                            DescribeLogDirsResponse.ReplicaInfo replicaInfo = replicaInfoEntry.getValue();
+                            ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp);
+                            if (replicaLogDirInfo == null) {
+                                handleFailure(new IllegalArgumentException(
+                                    "The partition " + tp + " in the response from broker " + brokerId + " is not in the request"));
+                            } else if (replicaInfo.isFuture) {
+                                replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
+                                                                                        replicaLogDirInfo.getCurrentReplicaOffsetLag(),
+                                                                                        logDir,
+                                                                                        replicaInfo.offsetLag));
+                            } else {
+                                replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir,
+                                                                                        replicaInfo.offsetLag,
+                                                                                        replicaLogDirInfo.getFutureReplicaLogDir(),
+                                                                                        replicaLogDirInfo.getFutureReplicaOffsetLag()));
+                            }
+                        }
+                    }
+
+                    for (Map.Entry<TopicPartition, ReplicaLogDirInfo> entry: replicaDirInfoByPartition.entrySet()) {
+                        TopicPartition tp = entry.getKey();
+                        KafkaFutureImpl<ReplicaLogDirInfo> future = futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId));
+                        future.complete(entry.getValue());
+                    }
+                }
+                @Override
+                void handleFailure(Throwable throwable) {
+                    completeAllExceptionally(futures.values(), throwable);
+                }
+            }, now);
+        }
+
+        return new DescribeReplicaLogDirResult(new HashMap<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>>(futures));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
index 81d834f..f656ff4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
@@ -25,26 +25,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class ListTopicsOptions {
-    private Integer timeoutMs = null;
-    private boolean listInternal = false;
-
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public ListTopicsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
+public class ListTopicsOptions extends AbstractOptions<ListTopicsOptions> {
 
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
+    private boolean listInternal = false;
 
     /**
      * Set whether we should list internal topics.

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/TopicPartitionReplica.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartitionReplica.java b/clients/src/main/java/org/apache/kafka/common/TopicPartitionReplica.java
new file mode 100644
index 0000000..2a10439
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartitionReplica.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.io.Serializable;
+
+
+/**
+ * The topic name, partition number and the brokerId of the replica
+ */
+public final class TopicPartitionReplica implements Serializable {
+
+    private int hash = 0;
+    private final int brokerId;
+    private final int partition;
+    private final String topic;
+
+    public TopicPartitionReplica(String topic, int partition, int brokerId) {
+        this.topic = topic;
+        this.partition = partition;
+        this.brokerId = brokerId;
+    }
+
+    public String topic() {
+        return topic;
+    }
+
+    public int partition() {
+        return partition;
+    }
+
+    public int brokerId() {
+        return brokerId;
+    }
+
+    @Override
+    public int hashCode() {
+        if (hash != 0) {
+            return hash;
+        }
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+        result = prime * result + partition;
+        result = prime * result + brokerId;
+        this.hash = result;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        TopicPartitionReplica other = (TopicPartitionReplica) obj;
+        if (partition != other.partition)
+            return false;
+        if (brokerId != other.brokerId)
+            return false;
+        if (topic == null) {
+            if (other.topic != null) {
+                return false;
+            }
+        } else if (!topic.equals(other.topic)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s-%d-%d", topic, partition, brokerId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java b/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java
index 00c7cee..c45afb0 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java
@@ -23,7 +23,7 @@ package org.apache.kafka.common.errors;
  * Here are the guidelines on how to handle KafkaStorageException and IOException:
  *
  * 1) If the server has not finished loading logs, IOException does not need to be converted to KafkaStorageException
- * 2) After the server has finished loading logs, IOException should be caught and trigger LogDirFailureChannel.maybeAddLogFailureEvent
+ * 2) After the server has finished loading logs, IOException should be caught and trigger LogDirFailureChannel.maybeAddOfflineLogDir()
  *    Then the IOException should either be swallowed and logged, or be converted and re-thrown as KafkaStorageException
  * 3) It is preferred for IOException to be caught in Log rather than in ReplicaManager or LogSegment.
  *

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/errors/LogDirNotFoundException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/LogDirNotFoundException.java b/clients/src/main/java/org/apache/kafka/common/errors/LogDirNotFoundException.java
new file mode 100644
index 0000000..0a4ae16
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/LogDirNotFoundException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Thrown when a request is made for a log directory that is not present on the broker
+ */
+public class LogDirNotFoundException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public LogDirNotFoundException(String message) {
+        super(message);
+    }
+
+    public LogDirNotFoundException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public LogDirNotFoundException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 721a610..5ac02fa 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -68,7 +68,9 @@ public enum ApiKeys {
     CREATE_ACLS(30, "CreateAcls"),
     DELETE_ACLS(31, "DeleteAcls"),
     DESCRIBE_CONFIGS(32, "DescribeConfigs"),
-    ALTER_CONFIGS(33, "AlterConfigs");
+    ALTER_CONFIGS(33, "AlterConfigs"),
+    ALTER_REPLICA_DIR(34, "AlterReplicaDir"),
+    DESCRIBE_LOG_DIRS(35, "DescribeLogDirs");
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 19acfd6..9decef2 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.ControllerMovedException;
 import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.LogDirNotFoundException;
 import org.apache.kafka.common.errors.DuplicateSequenceNumberException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
@@ -508,6 +509,13 @@ public enum Errors {
             public ApiException build(String message) {
                 return new KafkaStorageException(message);
             }
+    }),
+    LOG_DIR_NOT_FOUND(57, "The user-specified log directory is not found in the broker config.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new LogDirNotFoundException(message);
+            }
     });
 
     private interface ApiExceptionBuilder {

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 9f6ae3d..10b1823 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -1830,6 +1830,56 @@ public class Protocol {
     public static final Schema[] DELETE_ACLS_REQUEST = {DELETE_ACLS_REQUEST_V0};
     public static final Schema[] DELETE_ACLS_RESPONSE = {DELETE_ACLS_RESPONSE_V0};
 
+    public static final Schema ALTER_REPLICA_DIR_REQUEST_V0 = new Schema(
+        new Field("log_dirs", new ArrayOf(new Schema(
+                new Field("log_dir", STRING, "The absolute log directory path."),
+                new Field("topics", new ArrayOf(new Schema(
+                    new Field("topic", STRING, "The name of the topic."),
+                    new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic.")
+                )))
+        ))));
+
+    public static final Schema ALTER_REPLICA_DIR_RESPONSE_V0 = new Schema(
+        newThrottleTimeField(),
+        new Field("topics", new ArrayOf(new Schema(
+                new Field("topic", STRING, "The name of the topic."),
+                new Field("partitions", new ArrayOf(new Schema(
+                    new Field("partition", INT32, "The id of the partition."),
+                    new Field("error_code", INT16, "The error code for the partition.")
+                )))
+        ))));
+
+    public static final Schema[] ALTER_REPLICA_DIR_REQUEST = {ALTER_REPLICA_DIR_REQUEST_V0};
+    public static final Schema[] ALTER_REPLICA_DIR_RESPONSE = {ALTER_REPLICA_DIR_RESPONSE_V0};
+
+    public static final Schema DESCRIBE_LOG_DIRS_REQUEST_V0 = new Schema(
+        new Field("topics", ArrayOf.nullable(new Schema(
+            new Field("topic", STRING, "The name of the topic."),
+            new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic.")
+        )))
+    );
+
+    public static final Schema DESCRIBE_LOG_DIRS_RESPONSE_V0 = new Schema(
+        newThrottleTimeField(),
+        new Field("log_dirs",
+            new ArrayOf(new Schema(
+                new Field("error_code", INT16, "The error code for the log directory."),
+                new Field("log_dir", STRING, "The absolute log directory path."),
+                new Field("topics", new ArrayOf(new Schema(
+                    new Field("topic", STRING, "The name of the topic."),
+                    new Field("partitions", new ArrayOf(new Schema(
+                        new Field("partition", INT32, "The id of the partition."),
+                        new Field("size", INT64, "The size of the log segments of the partition in bytes."),
+                        new Field("offset_lag", INT64,
+                            "The lag of the log's LEO w.r.t. partition's HW (if it is the current log for the partition) or current replica's LEO (if it is the future log for the partition)"),
+                        new Field("is_future", BOOLEAN, "True if this log is created by AlterReplicaDirRequest and will replace the current log of the replica in the future.")
+                    )))
+                )))
+            ))));
+
+    public static final Schema[] DESCRIBE_LOG_DIRS_REQUEST = {DESCRIBE_LOG_DIRS_REQUEST_V0};
+    public static final Schema[] DESCRIBE_LOG_DIRS_RESPONSE = {DESCRIBE_LOG_DIRS_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
      * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -1875,6 +1925,8 @@ public class Protocol {
         REQUESTS[ApiKeys.DELETE_ACLS.id] = DELETE_ACLS_REQUEST;
         REQUESTS[ApiKeys.DESCRIBE_CONFIGS.id] = DESCRIBE_CONFIGS_REQUEST;
         REQUESTS[ApiKeys.ALTER_CONFIGS.id] = ALTER_CONFIGS_REQUEST;
+        REQUESTS[ApiKeys.ALTER_REPLICA_DIR.id] = ALTER_REPLICA_DIR_REQUEST;
+        REQUESTS[ApiKeys.DESCRIBE_LOG_DIRS.id] = DESCRIBE_LOG_DIRS_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -1910,6 +1962,8 @@ public class Protocol {
         RESPONSES[ApiKeys.DELETE_ACLS.id] = DELETE_ACLS_RESPONSE;
         RESPONSES[ApiKeys.DESCRIBE_CONFIGS.id] = DESCRIBE_CONFIGS_RESPONSE;
         RESPONSES[ApiKeys.ALTER_CONFIGS.id] = ALTER_CONFIGS_RESPONSE;
+        RESPONSES[ApiKeys.ALTER_REPLICA_DIR.id] = ALTER_REPLICA_DIR_RESPONSE;
+        RESPONSES[ApiKeys.DESCRIBE_LOG_DIRS.id] = DESCRIBE_LOG_DIRS_RESPONSE;
 
         /* set the minimum and maximum version of each api */
         for (ApiKeys api : ApiKeys.values()) {
@@ -1988,7 +2042,7 @@ public class Protocol {
     public static boolean requiresDelayedDeallocation(int apiKey) {
         return DELAYED_DEALLOCATION_REQUESTS.contains(ApiKeys.forId(apiKey));
     }
-    
+
     public static Schema requestHeaderSchema(short apiKey, short version) {
         if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY.id && version == 0)
             // This will be removed once we remove support for v0 of ControlledShutdownRequest, which

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 00de8c1..f819371 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -177,6 +177,10 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return new DescribeConfigsRequest(struct, apiVersion);
             case ALTER_CONFIGS:
                 return new AlterConfigsRequest(struct, apiVersion);
+            case ALTER_REPLICA_DIR:
+                return new AlterReplicaDirRequest(struct, apiVersion);
+            case DESCRIBE_LOG_DIRS:
+                return new DescribeLogDirsRequest(struct, apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 5f1f615..f9ff6e8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -110,6 +110,10 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new DescribeConfigsResponse(struct);
             case ALTER_CONFIGS:
                 return new AlterConfigsResponse(struct);
+            case ALTER_REPLICA_DIR:
+                return new AlterReplicaDirResponse(struct);
+            case DESCRIBE_LOG_DIRS:
+                return new DescribeLogDirsResponse(struct);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
new file mode 100644
index 0000000..2c2401b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class AlterReplicaDirRequest extends AbstractRequest {
+
+    // request level key names
+    private static final String LOG_DIRS_KEY_NAME = "log_dirs";
+
+    // log dir level key names
+    private static final String LOG_DIR_KEY_NAME = "log_dir";
+    private static final String TOPICS_KEY_NAME = "topics";
+
+    // topic level key names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    private final Map<TopicPartition, String> partitionDirs;
+
+    public static class Builder extends AbstractRequest.Builder<AlterReplicaDirRequest> {
+        private final Map<TopicPartition, String> partitionDirs;
+
+        public Builder(Map<TopicPartition, String> partitionDirs) {
+            super(ApiKeys.ALTER_REPLICA_DIR);
+            this.partitionDirs = partitionDirs;
+        }
+
+        @Override
+        public AlterReplicaDirRequest build(short version) {
+            return new AlterReplicaDirRequest(partitionDirs, version);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("(type=AlterReplicaDirRequest")
+                .append(", partitionDirs=")
+                .append(partitionDirs)
+                .append(")");
+            return builder.toString();
+        }
+    }
+
+    public AlterReplicaDirRequest(Struct struct, short version) {
+        super(version);
+        partitionDirs = new HashMap<>();
+        for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) {
+            Struct logDirStruct = (Struct) logDirStructObj;
+            String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME);
+            for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) {
+                Struct topicStruct = (Struct) topicStructObj;
+                String topic = topicStruct.getString(TOPIC_KEY_NAME);
+                for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+                    int partition = (Integer) partitionObj;
+                    partitionDirs.put(new TopicPartition(topic, partition), logDir);
+                }
+            }
+        }
+    }
+
+    public AlterReplicaDirRequest(Map<TopicPartition, String> partitionDirs, short version) {
+        super(version);
+        this.partitionDirs = partitionDirs;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Map<String, List<TopicPartition>> dirPartitions = new HashMap<>();
+        for (Map.Entry<TopicPartition, String> entry: partitionDirs.entrySet()) {
+            if (!dirPartitions.containsKey(entry.getValue()))
+                dirPartitions.put(entry.getValue(), new ArrayList<TopicPartition>());
+            dirPartitions.get(entry.getValue()).add(entry.getKey());
+        }
+
+        Struct struct = new Struct(ApiKeys.ALTER_REPLICA_DIR.requestSchema(version()));
+        List<Struct> logDirStructArray = new ArrayList<>();
+        for (Map.Entry<String, List<TopicPartition>> logDirEntry: dirPartitions.entrySet()) {
+            Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME);
+            logDirStruct.set(LOG_DIR_KEY_NAME, logDirEntry.getKey());
+
+            List<Struct> topicStructArray = new ArrayList<>();
+            for (Map.Entry<String, List<Integer>> topicEntry: CollectionUtils.groupDataByTopic(logDirEntry.getValue()).entrySet()) {
+                Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME);
+                topicStruct.set(TOPIC_KEY_NAME, topicEntry.getKey());
+                topicStruct.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
+                topicStructArray.add(topicStruct);
+            }
+            logDirStruct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+            logDirStructArray.add(logDirStruct);
+        }
+        struct.set(LOG_DIRS_KEY_NAME, logDirStructArray.toArray());
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        Map<TopicPartition, Errors> responseMap = new HashMap<>();
+
+        for (Map.Entry<TopicPartition, String> entry : partitionDirs.entrySet()) {
+            responseMap.put(entry.getKey(), Errors.forException(e));
+        }
+
+        short versionId = version();
+        switch (versionId) {
+            case 0:
+                return new AlterReplicaDirResponse(throttleTimeMs, responseMap);
+            default:
+                throw new IllegalArgumentException(
+                    String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId,
+                        this.getClass().getSimpleName(), ApiKeys.ALTER_REPLICA_DIR.latestVersion()));
+        }
+    }
+
+    public Map<TopicPartition, String> partitionDirs() {
+        return partitionDirs;
+    }
+
+    public static AlterReplicaDirRequest parse(ByteBuffer buffer, short version) {
+        return new AlterReplicaDirRequest(ApiKeys.ALTER_REPLICA_DIR.parseRequest(version, buffer), version);
+    }
+}


[2/3] kafka git commit: KAFKA-5694; Add AlterReplicaDirRequest and DescribeReplicaDirRequest (KIP-113 part-1)

Posted by jq...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
new file mode 100644
index 0000000..f97f9a0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class AlterReplicaDirResponse extends AbstractResponse {
+
+    // request level key names
+    private static final String TOPICS_KEY_NAME = "topics";
+
+    // topic level key names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    // partition level key names
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    /**
+     * Possible error code:
+     *
+     * LOG_DIR_NOT_FOUND (57)
+     * KAFKA_STORAGE_ERROR (56)
+     * REPLICA_NOT_AVAILABLE (9)
+     * UNKNOWN (-1)
+     */
+    private final Map<TopicPartition, Errors> responses;
+    private final int throttleTimeMs;
+
+    public AlterReplicaDirResponse(Struct struct) {
+        throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        responses = new HashMap<>();
+        for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
+            Struct topicStruct = (Struct) topicStructObj;
+            String topic = topicStruct.getString(TOPIC_KEY_NAME);
+            for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionStruct = (Struct) partitionStructObj;
+                int partition = partitionStruct.getInt(PARTITION_KEY_NAME);
+                Errors error = Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME));
+                responses.put(new TopicPartition(topic, partition), error);
+            }
+        }
+    }
+
+    /**
+     * Constructor for version 0.
+     */
+    public AlterReplicaDirResponse(int throttleTimeMs, Map<TopicPartition, Errors> responses) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.responses = responses;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.ALTER_REPLICA_DIR.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        Map<String, Map<Integer, Errors>> responsesByTopic = CollectionUtils.groupDataByTopic(responses);
+        List<Struct> topicStructArray = new ArrayList<>();
+        for (Map.Entry<String, Map<Integer, Errors>> responsesByTopicEntry : responsesByTopic.entrySet()) {
+            Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
+            topicStruct.set(TOPIC_KEY_NAME, responsesByTopicEntry.getKey());
+            List<Struct> partitionStructArray = new ArrayList<>();
+            for (Map.Entry<Integer, Errors> responsesByPartitionEntry : responsesByTopicEntry.getValue().entrySet()) {
+                Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
+                Errors response = responsesByPartitionEntry.getValue();
+                partitionStruct.set(PARTITION_KEY_NAME, responsesByPartitionEntry.getKey());
+                partitionStruct.set(ERROR_CODE_KEY_NAME, response.code());
+                partitionStructArray.add(partitionStruct);
+            }
+            topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray());
+            topicStructArray.add(topicStruct);
+        }
+        struct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+        return struct;
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    public Map<TopicPartition, Errors> responses() {
+        return this.responses;
+    }
+
+    public static AlterReplicaDirResponse parse(ByteBuffer buffer, short version) {
+        return new AlterReplicaDirResponse(ApiKeys.ALTER_REPLICA_DIR.responseSchema(version).read(buffer));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
new file mode 100644
index 0000000..338d684
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+public class DescribeLogDirsRequest extends AbstractRequest {
+
+    // request level key names
+    private static final String TOPICS_KEY_NAME = "topics";
+
+    // topic level key names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    private final Set<TopicPartition> topicPartitions;
+
+    public static class Builder extends AbstractRequest.Builder<DescribeLogDirsRequest> {
+        private final Set<TopicPartition> topicPartitions;
+
+        // topicPartitions == null indicates requesting all partitions, and an empty list indicates requesting no partitions.
+        public Builder(Set<TopicPartition> partitions) {
+            super(ApiKeys.DESCRIBE_LOG_DIRS);
+            this.topicPartitions = partitions;
+        }
+
+        @Override
+        public DescribeLogDirsRequest build(short version) {
+            return new DescribeLogDirsRequest(topicPartitions, version);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("(type=DescribeLogDirsRequest")
+                .append(", topicPartitions=")
+                .append(topicPartitions)
+                .append(")");
+            return builder.toString();
+        }
+    }
+
+    public DescribeLogDirsRequest(Struct struct, short version) {
+        super(version);
+
+        if (struct.getArray(TOPICS_KEY_NAME) == null) {
+            topicPartitions = null;
+        } else {
+            topicPartitions = new HashSet<>();
+            for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
+                Struct topicStruct = (Struct) topicStructObj;
+                String topic = topicStruct.getString(TOPIC_KEY_NAME);
+                for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+                    int partition = (Integer) partitionObj;
+                    topicPartitions.add(new TopicPartition(topic, partition));
+                }
+            }
+        }
+    }
+
+    // topicPartitions == null indicates requesting all partitions, and an empty list indicates requesting no partitions.
+    public DescribeLogDirsRequest(Set<TopicPartition> topicPartitions, short version) {
+        super(version);
+        this.topicPartitions = topicPartitions;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.DESCRIBE_LOG_DIRS.requestSchema(version()));
+        if (topicPartitions == null) {
+            struct.set(TOPICS_KEY_NAME, null);
+            return struct;
+        }
+
+        Map<String, List<Integer>> partitionsByTopic = new HashMap<>();
+        for (TopicPartition tp : topicPartitions) {
+            if (!partitionsByTopic.containsKey(tp.topic())) {
+                partitionsByTopic.put(tp.topic(), new ArrayList<Integer>());
+            }
+            partitionsByTopic.get(tp.topic()).add(tp.partition());
+        }
+
+        List<Struct> topicStructArray = new ArrayList<>();
+        for (Map.Entry<String, List<Integer>> partitionsByTopicEntry : partitionsByTopic.entrySet()) {
+            Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
+            topicStruct.set(TOPIC_KEY_NAME, partitionsByTopicEntry.getKey());
+            topicStruct.set(PARTITIONS_KEY_NAME, partitionsByTopicEntry.getValue().toArray());
+            topicStructArray.add(topicStruct);
+        }
+        struct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        short versionId = version();
+        switch (versionId) {
+            case 0:
+                return new DescribeLogDirsResponse(throttleTimeMs, new HashMap<String, LogDirInfo>());
+            default:
+                throw new IllegalArgumentException(
+                    String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId,
+                        this.getClass().getSimpleName(), ApiKeys.DESCRIBE_LOG_DIRS.latestVersion()));
+        }
+    }
+
+    public boolean isAllTopicPartitions() {
+        return topicPartitions == null;
+    }
+
+    public Set<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+
+    public static DescribeLogDirsRequest parse(ByteBuffer buffer, short version) {
+        return new DescribeLogDirsRequest(ApiKeys.DESCRIBE_LOG_DIRS.parseRequest(version, buffer), version);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
new file mode 100644
index 0000000..f6b31ae
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class DescribeLogDirsResponse extends AbstractResponse {
+
+    public static final long INVALID_OFFSET_LAG = -1L;
+
+    // request level key names
+    private static final String LOG_DIRS_KEY_NAME = "log_dirs";
+
+    // dir level key names
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String LOG_DIR_KEY_NAME = "log_dir";
+    private static final String TOPICS_KEY_NAME = "topics";
+
+    // topic level key names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    // partition level key names
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String SIZE_KEY_NAME = "size";
+    private static final String OFFSET_LAG_KEY_NAME = "offset_lag";
+    private static final String IS_FUTURE_KEY_NAME = "is_future";
+
+    private final int throttleTimeMs;
+    private final Map<String, LogDirInfo> logDirInfos;
+
+    public DescribeLogDirsResponse(Struct struct) {
+        throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        logDirInfos = new HashMap<>();
+
+        for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) {
+            Struct logDirStruct = (Struct) logDirStructObj;
+            Errors error = Errors.forCode(logDirStruct.getShort(ERROR_CODE_KEY_NAME));
+            String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME);
+            Map<TopicPartition, ReplicaInfo> replicaInfos = new HashMap<>();
+
+            for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) {
+                Struct topicStruct = (Struct) topicStructObj;
+                String topic = topicStruct.getString(TOPIC_KEY_NAME);
+
+                for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+                    Struct partitionStruct = (Struct) partitionStructObj;
+                    int partition = partitionStruct.getInt(PARTITION_KEY_NAME);
+                    long size = partitionStruct.getLong(SIZE_KEY_NAME);
+                    long offsetLag = partitionStruct.getLong(OFFSET_LAG_KEY_NAME);
+                    boolean isFuture = partitionStruct.getBoolean(IS_FUTURE_KEY_NAME);
+                    ReplicaInfo replicaInfo = new ReplicaInfo(size, offsetLag, isFuture);
+                    replicaInfos.put(new TopicPartition(topic, partition), replicaInfo);
+                }
+            }
+
+            logDirInfos.put(logDir, new LogDirInfo(error, replicaInfos));
+        }
+    }
+
+    /**
+     * Constructor for version 0.
+     */
+    public DescribeLogDirsResponse(int throttleTimeMs, Map<String, LogDirInfo> logDirInfos) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.logDirInfos = logDirInfos;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.DESCRIBE_LOG_DIRS.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        List<Struct> logDirStructArray = new ArrayList<>();
+        for (Map.Entry<String, LogDirInfo> logDirInfosEntry : logDirInfos.entrySet()) {
+            LogDirInfo logDirInfo = logDirInfosEntry.getValue();
+            Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME);
+            logDirStruct.set(ERROR_CODE_KEY_NAME, logDirInfo.error.code());
+            logDirStruct.set(LOG_DIR_KEY_NAME, logDirInfosEntry.getKey());
+
+            Map<String, Map<Integer, ReplicaInfo>> replicaInfosByTopic = CollectionUtils.groupDataByTopic(logDirInfo.replicaInfos);
+            List<Struct> topicStructArray = new ArrayList<>();
+            for (Map.Entry<String, Map<Integer, ReplicaInfo>> replicaInfosByTopicEntry : replicaInfosByTopic.entrySet()) {
+                Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME);
+                topicStruct.set(TOPIC_KEY_NAME, replicaInfosByTopicEntry.getKey());
+                List<Struct> partitionStructArray = new ArrayList<>();
+
+                for (Map.Entry<Integer, ReplicaInfo> replicaInfosByPartitionEntry : replicaInfosByTopicEntry.getValue().entrySet()) {
+                    Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
+                    ReplicaInfo replicaInfo = replicaInfosByPartitionEntry.getValue();
+                    partitionStruct.set(PARTITION_KEY_NAME, replicaInfosByPartitionEntry.getKey());
+                    partitionStruct.set(SIZE_KEY_NAME, replicaInfo.size);
+                    partitionStruct.set(OFFSET_LAG_KEY_NAME, replicaInfo.offsetLag);
+                    partitionStruct.set(IS_FUTURE_KEY_NAME, replicaInfo.isFuture);
+                    partitionStructArray.add(partitionStruct);
+                }
+                topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray());
+                topicStructArray.add(topicStruct);
+            }
+            logDirStruct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+            logDirStructArray.add(logDirStruct);
+        }
+        struct.set(LOG_DIRS_KEY_NAME, logDirStructArray.toArray());
+        return struct;
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    public Map<String, LogDirInfo> logDirInfos() {
+        return logDirInfos;
+    }
+
+    public static DescribeLogDirsResponse parse(ByteBuffer buffer, short version) {
+        return new DescribeLogDirsResponse(ApiKeys.DESCRIBE_LOG_DIRS.responseSchema(version).read(buffer));
+    }
+
+    /**
+     * Possible error code:
+     *
+     * KAFKA_STORAGE_ERROR (56)
+     * UNKNOWN (-1)
+     */
+    static public class LogDirInfo {
+        public final Errors error;
+        public final Map<TopicPartition, ReplicaInfo> replicaInfos;
+
+        public LogDirInfo(Errors error, Map<TopicPartition, ReplicaInfo> replicaInfos) {
+            this.error = error;
+            this.replicaInfos = replicaInfos;
+        }
+    }
+
+    static public class ReplicaInfo {
+
+        public final long size;
+        public final long offsetLag;
+        public final boolean isFuture;
+
+        public ReplicaInfo(long size, long offsetLag, boolean isFuture) {
+            this.size = size;
+            this.offsetLag = offsetLag;
+            this.isFuture = isFuture;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("(size=")
+                .append(size)
+                .append(", offsetLag=")
+                .append(offsetLag)
+                .append(", isFuture=")
+                .append(isFuture)
+                .append(")");
+            return builder.toString();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/main/scala/kafka/admin/LogDirsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
new file mode 100644
index 0000000..6a167a2
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
@@ -0,0 +1,114 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.admin
+
+import java.io.PrintStream
+import java.util.Properties
+
+import org.apache.kafka.clients.admin.{AdminClientConfig, DescribeLogDirsResult, AdminClient => JAdminClient}
+import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
+
+import scala.collection.JavaConverters._
+import scala.collection.Map
+import kafka.utils.{CommandLineUtils, Json}
+import joptsimple._
+
+/**
+  * A command for querying log directory usage on the specified brokers
+  */
+object LogDirsCommand {
+
+    def main(args: Array[String]): Unit = {
+        describe(args, System.out)
+    }
+
+    def describe(args: Array[String], out: PrintStream): Unit = {
+        val opts = new LogDirsCommandOptions(args)
+        val adminClient = createAdminClient(opts)
+        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
+        val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
+            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
+            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+        }
+
+        out.println("Querying brokers for log directories information")
+        val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
+        val logDirInfosByBroker = describeLogDirsResult.all.get().asScala.mapValues(_.asScala)
+
+        out.println(s"Received log directory information from brokers ${brokerList.mkString(",")}")
+        out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))
+        adminClient.close()
+    }
+
+    private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirInfo]], topicSet: Set[String]): String = {
+        Json.encode(Map(
+            "version" -> 1,
+            "brokers" -> logDirInfosByBroker.map { case (broker, logDirInfos) =>
+                Map(
+                    "broker" -> broker,
+                    "logDirs" -> logDirInfos.map { case (logDir, logDirInfo) =>
+                        Map(
+                            "logDir" -> logDir,
+                            "error" -> logDirInfo.error.exceptionName(),
+                            "partitions" -> logDirInfo.replicaInfos.asScala.filter { case (topicPartition, replicaInfo) =>
+                                topicSet.isEmpty || topicSet.contains(topicPartition.topic)
+                            }.map { case (topicPartition, replicaInfo) =>
+                                Map(
+                                    "partition" -> topicPartition.toString,
+                                    "size" -> replicaInfo.size,
+                                    "offsetLag" -> replicaInfo.offsetLag,
+                                    "isFuture" -> replicaInfo.isFuture
+                                )
+                            }
+                        )
+                    }
+                )
+            }
+        ))
+    }
+
+    private def createAdminClient(opts: LogDirsCommandOptions): JAdminClient = {
+        val props = new Properties()
+        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+        props.put(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool")
+        JAdminClient.create(props)
+    }
+
+    class LogDirsCommandOptions(args: Array[String]) {
+        val parser = new OptionParser(false)
+        val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping")
+          .withRequiredArg
+          .describedAs("The server(s) to use for bootstrapping")
+          .ofType(classOf[String])
+        val describeOpt = parser.accepts("describe", "Describe the specified log directories on the specified brokers.")
+        val topicListOpt = parser.accepts("topic-list", "The list of topics to be queried in the form \"topic1,topic2,topic3\". " +
+          "All topics will be queried if no topic list is specified")
+          .withRequiredArg
+          .describedAs("Topic list")
+          .defaultsTo("")
+          .ofType(classOf[String])
+        val brokerListOpt = parser.accepts("broker-list", "The list of brokers to be queried in the form \"0,1,2\". " +
+          "All brokers in the cluster will be queried if no broker list is specified")
+          .withRequiredArg
+          .describedAs("Broker list")
+          .ofType(classOf[String])
+
+        val options = parser.parse(args : _*)
+        CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, describeOpt)
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index cae14b1..e27e239 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -16,23 +16,31 @@
  */
 package kafka.admin
 
-import joptsimple.OptionParser
-import kafka.server.{ConfigType, DynamicConfig}
-import kafka.utils._
+import java.util.Properties
+import java.util.concurrent.ExecutionException
 
 import scala.collection._
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import scala.collection.JavaConverters._
+import kafka.server.{ConfigType, DynamicConfig}
+import kafka.utils._
 import kafka.common.{AdminCommandFailedException, TopicAndPartition}
 import kafka.log.LogConfig
-import LogConfig._
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.TopicPartitionReplica
+import org.apache.kafka.common.errors.{LogDirNotFoundException, ReplicaNotAvailableException}
+import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaDirOptions, AdminClient => JAdminClient}
+import LogConfig._
+import joptsimple.OptionParser
+import org.apache.kafka.clients.admin.DescribeReplicaLogDirResult.ReplicaLogDirInfo
 
 object ReassignPartitionsCommand extends Logging {
 
   case class Throttle(value: Long, postUpdateAction: () => Unit = () => ())
 
   private[admin] val NoThrottle = Throttle(-1)
+  private[admin] val AnyLogDir = "any"
 
   def main(args: Array[String]): Unit = {
 
@@ -42,13 +50,15 @@ object ReassignPartitionsCommand extends Logging {
                           30000,
                           30000,
                           JaasUtils.isZkSecurityEnabled())
+    val adminClientOpt = createAdminClient(opts)
+
     try {
       if(opts.options.has(opts.verifyOpt))
-        verifyAssignment(zkUtils, opts)
+        verifyAssignment(zkUtils, adminClientOpt, opts)
       else if(opts.options.has(opts.generateOpt))
         generateAssignment(zkUtils, opts)
       else if (opts.options.has(opts.executeOpt))
-        executeAssignment(zkUtils, opts)
+        executeAssignment(zkUtils, adminClientOpt, opts)
     } catch {
       case e: Throwable =>
         println("Partitions reassignment failed due to " + e.getMessage)
@@ -56,16 +66,29 @@ object ReassignPartitionsCommand extends Logging {
     } finally zkUtils.close()
   }
 
-  def verifyAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
+  private def createAdminClient(opts: ReassignPartitionsCommandOptions): Option[JAdminClient] = {
+    if (opts.options.has(opts.bootstrapServerOpt)) {
+      val props = new Properties()
+      props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, "reassign-partitions-tool")
+      Some(JAdminClient.create(props))
+    } else {
+      None
+    }
+  }
+
+  def verifyAssignment(zkUtils: ZkUtils, adminClientOpt: Option[JAdminClient], opts: ReassignPartitionsCommandOptions) {
     val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val jsonString = Utils.readFileAsString(jsonFile)
-    verifyAssignment(zkUtils, jsonString)
+    verifyAssignment(zkUtils, adminClientOpt, jsonString)
   }
 
-  def verifyAssignment(zkUtils: ZkUtils, jsonString: String): Unit = {
+  def verifyAssignment(zkUtils: ZkUtils, adminClientOpt: Option[JAdminClient], jsonString: String): Unit = {
     println("Status of partition reassignment: ")
-    val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
-    val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkUtils, partitionsToBeReassigned)
+    val (partitionsToBeReassigned, replicaAssignment) = parsePartitionReassignmentData(jsonString)
+    val reassignedPartitionsStatus = checkIfPartitionReassignmentSucceeded(zkUtils, partitionsToBeReassigned.toMap)
+    val replicaReassignmentStatus = checkIfReplicaReassignmentSucceeded(adminClientOpt, replicaAssignment)
+
     reassignedPartitionsStatus.foreach { case (topicPartition, status) =>
       status match {
         case ReassignmentCompleted =>
@@ -76,7 +99,19 @@ object ReassignPartitionsCommand extends Logging {
           println("Reassignment of partition %s is still in progress".format(topicPartition))
       }
     }
-    removeThrottle(zkUtils, partitionsToBeReassigned, reassignedPartitionsStatus)
+
+    replicaReassignmentStatus.foreach { case (replica, status) =>
+      status match {
+        case ReassignmentCompleted =>
+          println("Reassignment of replica %s completed successfully".format(replica))
+        case ReassignmentFailed =>
+          println("Reassignment of replica %s failed".format(replica))
+        case ReassignmentInProgress =>
+          println("Reassignment of replica %s is still in progress".format(replica))
+      }
+    }
+
+    removeThrottle(zkUtils, partitionsToBeReassigned.toMap, reassignedPartitionsStatus)
   }
 
   private[admin] def removeThrottle(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], reassignedPartitionsStatus: Map[TopicAndPartition, ReassignmentStatus], admin: AdminUtilities = AdminUtils): Unit = {
@@ -121,8 +156,8 @@ object ReassignPartitionsCommand extends Logging {
     val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
     val disableRackAware = opts.options.has(opts.disableRackAware)
     val (proposedAssignments, currentAssignments) = generateAssignment(zkUtils, brokerListToReassign, topicsToMoveJsonString, disableRackAware)
-    println("Current partition replica assignment\n%s\n".format(ZkUtils.formatAsReassignmentJson(currentAssignments)))
-    println("Proposed partition reassignment configuration\n%s".format(ZkUtils.formatAsReassignmentJson(proposedAssignments)))
+    println("Current partition replica assignment\n%s\n".format(formatAsReassignmentJson(currentAssignments, Map.empty)))
+    println("Proposed partition reassignment configuration\n%s".format(formatAsReassignmentJson(proposedAssignments, Map.empty)))
   }
 
   def generateAssignment(zkUtils: ZkUtils, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicAndPartition, Seq[Int]], Map[TopicAndPartition, Seq[Int]]) = {
@@ -147,42 +182,85 @@ object ReassignPartitionsCommand extends Logging {
     (partitionsToBeReassigned, currentAssignment)
   }
 
-  def executeAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
+  def executeAssignment(zkUtils: ZkUtils, adminClientOpt: Option[JAdminClient], opts: ReassignPartitionsCommandOptions) {
     val reassignmentJsonFile =  opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
-    val throttle = if (opts.options.has(opts.throttleOpt)) opts.options.valueOf(opts.throttleOpt) else -1
-    executeAssignment(zkUtils, reassignmentJsonString, Throttle(throttle))
+    val throttle = opts.options.valueOf(opts.throttleOpt)
+    val timeoutMs = opts.options.valueOf(opts.timeoutOpt)
+    executeAssignment(zkUtils, adminClientOpt, reassignmentJsonString, Throttle(throttle), timeoutMs)
   }
 
-  def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, throttle: Throttle) {
-    val partitionsToBeReassigned = parseAndValidate(zkUtils, reassignmentJsonString)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, partitionsToBeReassigned.toMap)
+  def executeAssignment(zkUtils: ZkUtils, adminClientOpt: Option[JAdminClient], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L) {
+    val (partitionAssignment, replicaAssignment) = parseAndValidate(zkUtils, reassignmentJsonString)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, adminClientOpt, partitionAssignment.toMap, replicaAssignment)
 
     // If there is an existing rebalance running, attempt to change its throttle
     if (zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)) {
       println("There is an existing assignment running.")
       reassignPartitionsCommand.maybeLimit(throttle)
-    }
-    else {
-      printCurrentAssignment(zkUtils, partitionsToBeReassigned)
+    } else {
+      printCurrentAssignment(zkUtils, partitionAssignment.map(_._1.topic))
       if (throttle.value >= 0)
         println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value."))
-      if (reassignPartitionsCommand.reassignPartitions(throttle)) {
+      if (reassignPartitionsCommand.reassignPartitions(throttle, timeoutMs)) {
         println("Successfully started reassignment of partitions.")
       } else
-        println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
+        println("Failed to reassign partitions %s".format(partitionAssignment))
     }
   }
 
-  def printCurrentAssignment(zkUtils: ZkUtils, partitionsToBeReassigned: Seq[(TopicAndPartition, Seq[Int])]): Unit = {
+  def printCurrentAssignment(zkUtils: ZkUtils, topics: Seq[String]): Unit = {
     // before starting assignment, output the current replica assignment to facilitate rollback
-    val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic))
+    val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(topics)
     println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
-      .format(ZkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment)))
+      .format(formatAsReassignmentJson(currentPartitionReplicaAssignment, Map.empty)))
+  }
+
+  def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
+                               replicaLogDirAssignment: Map[TopicPartitionReplica, String]): String = {
+    Json.encode(Map(
+      "version" -> 1,
+      "partitions" -> partitionsToBeReassigned.map { case (TopicAndPartition(topic, partition), replicas) =>
+        Map(
+          "topic" -> topic,
+          "partition" -> partition,
+          "replicas" -> replicas,
+          "log_dirs" -> replicas.map(r => replicaLogDirAssignment.getOrElse(new TopicPartitionReplica(topic, partition, r), AnyLogDir))
+        )
+      }
+    ))
   }
 
-  def parseAndValidate(zkUtils: ZkUtils, reassignmentJsonString: String): Seq[(TopicAndPartition, Seq[Int])] = {
-    val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
+  // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed
+  def parsePartitionReassignmentData(jsonData: String): (Seq[(TopicAndPartition, Seq[Int])], Map[TopicPartitionReplica, String]) = {
+    val partitionAssignment = mutable.ListBuffer.empty[(TopicAndPartition, Seq[Int])]
+    val replicaAssignment = mutable.Map.empty[TopicPartitionReplica, String]
+    for {
+      js <- Json.parseFull(jsonData).toSeq
+      partitionsSeq <- js.asJsonObject.get("partitions").toSeq
+      p <- partitionsSeq.asJsonArray.iterator
+    } {
+      val partitionFields = p.asJsonObject
+      val topic = partitionFields("topic").to[String]
+      val partition = partitionFields("partition").to[Int]
+      val newReplicas = partitionFields("replicas").to[Seq[Int]]
+      val newLogDirs = partitionFields.get("log_dirs") match {
+        case Some(jsonValue) => jsonValue.to[Seq[String]]
+        case None => newReplicas.map(r => AnyLogDir)
+      }
+      if (newReplicas.size != newLogDirs.size)
+        throw new AdminCommandFailedException(s"Size of replicas list $newReplicas is different from " +
+          s"size of log dirs list $newLogDirs for partition ${TopicAndPartition(topic, partition)}")
+      partitionAssignment += (TopicAndPartition(topic, partition) -> newReplicas)
+      replicaAssignment ++= newReplicas.zip(newLogDirs).map { case (replica, logDir) =>
+        new TopicPartitionReplica(topic, partition, replica) -> logDir
+      }.filter(_._2 != AnyLogDir)
+    }
+    (partitionAssignment, replicaAssignment)
+  }
+
+  def parseAndValidate(zkUtils: ZkUtils, reassignmentJsonString: String): (Seq[(TopicAndPartition, Seq[Int])], Map[TopicPartitionReplica, String]) = {
+    val (partitionsToBeReassigned, replicaAssignment) = parsePartitionReassignmentData(reassignmentJsonString)
 
     if (partitionsToBeReassigned.isEmpty)
       throw new AdminCommandFailedException("Partition reassignment data file is empty")
@@ -215,10 +293,18 @@ object ReassignPartitionsCommand extends Logging {
     if (nonExistingBrokerIDs.nonEmpty)
       throw new AdminCommandFailedException("The proposed assignment contains non-existent brokerIDs: " + nonExistingBrokerIDs.mkString(","))
 
-    partitionsToBeReassigned
+    // check that replica will always be moved to another broker if a particular log directory is specified for it.
+    // We will support moving replica within broker after KIP-113 is implemented
+    replicaAssignment.foreach { case (replica, logDir) =>
+      if (existingAssignment.getOrElse(TopicAndPartition(replica.topic(), replica.partition()), Seq.empty).contains(replica.brokerId()))
+        throw new AdminCommandFailedException(s"The proposed assignment intends to move an existing replica $replica to " +
+          s"another log directory $logDir on the same broker. This is not currently supported")
+    }
+
+    (partitionsToBeReassigned, replicaAssignment)
   }
 
-  private def checkIfReassignmentSucceeded(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
+  private def checkIfPartitionReassignmentSucceeded(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
   :Map[TopicAndPartition, ReassignmentStatus] = {
     val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
     partitionsToBeReassigned.keys.map { topicAndPartition =>
@@ -227,6 +313,47 @@ object ReassignPartitionsCommand extends Logging {
     }.toMap
   }
 
+  private def checkIfReplicaReassignmentSucceeded(adminClientOpt: Option[JAdminClient], replicaAssignment: Map[TopicPartitionReplica, String])
+  :Map[TopicPartitionReplica, ReassignmentStatus] = {
+
+    val replicaLogDirInfos = {
+      if (replicaAssignment.nonEmpty) {
+        val adminClient = adminClientOpt.getOrElse(
+          throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory"))
+        adminClient.describeReplicaLogDir(replicaAssignment.keySet.asJava).all().get().asScala
+      } else {
+        Map.empty[TopicPartitionReplica, ReplicaLogDirInfo]
+      }
+    }
+
+    replicaAssignment.map { case (replica, newLogDir) =>
+      val status: ReassignmentStatus = replicaLogDirInfos.get(replica) match {
+        case Some(replicaLogDirInfo) =>
+          if (replicaLogDirInfo.getCurrentReplicaLogDir == null) {
+            println(s"Partition ${replica.topic()}-${replica.partition()} is not found in any live log dir on " +
+              s"broker ${replica.brokerId()}. There is likely offline log directory on the broker.")
+            ReassignmentFailed
+          } else if (replicaLogDirInfo.getFutureReplicaLogDir == newLogDir) {
+            ReassignmentInProgress
+          } else if (replicaLogDirInfo.getFutureReplicaLogDir != null) {
+            println(s"Partition ${replica.topic()}-${replica.partition()} on broker ${replica.brokerId()} " +
+              s"is being moved to log dir ${replicaLogDirInfo.getFutureReplicaLogDir} instead of $newLogDir")
+            ReassignmentFailed
+          } else if (replicaLogDirInfo.getCurrentReplicaLogDir == newLogDir) {
+            ReassignmentCompleted
+          } else {
+            println(s"Partition ${replica.topic()}-${replica.partition()} on broker ${replica.brokerId()} " +
+              s"is not being moved from log dir ${replicaLogDirInfo.getCurrentReplicaLogDir} to $newLogDir")
+            ReassignmentFailed
+          }
+        case None =>
+          println(s"Partition ${replica.topic()}-${replica.partition()} is not found in any live log dir on broker ${replica.brokerId()}.")
+          ReassignmentFailed
+      }
+      (replica, status)
+    }
+  }
+
   def checkIfPartitionReassignmentSucceeded(zkUtils: ZkUtils, topicAndPartition: TopicAndPartition,
                                             partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
                                             partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = {
@@ -280,19 +407,26 @@ object ReassignPartitionsCommand extends Logging {
 
   class ReassignPartitionsCommandOptions(args: Array[String]) {
     val parser = new OptionParser(false)
-
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", "the server(s) to use for bootstrapping. REQUIRED if " +
+                      "an absolution path of the log directory is specified for any replica in the reassignment json file")
+                      .withRequiredArg
+                      .describedAs("Server(s) to use for bootstrapping")
+                      .ofType(classOf[String])
     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
                       "form host:port. Multiple URLS can be given to allow fail-over.")
                       .withRequiredArg
                       .describedAs("urls")
                       .ofType(classOf[String])
     val generateOpt = parser.accepts("generate", "Generate a candidate partition reassignment configuration." +
-      " Note that this only generates a candidate assignment, it does not execute it.")
+                      " Note that this only generates a candidate assignment, it does not execute it.")
     val executeOpt = parser.accepts("execute", "Kick off the reassignment as specified by the --reassignment-json-file option.")
     val verifyOpt = parser.accepts("verify", "Verify if the reassignment completed as specified by the --reassignment-json-file option. If there is a throttle engaged for the replicas specified, and the rebalance has completed, the throttle will be removed")
     val reassignmentJsonFileOpt = parser.accepts("reassignment-json-file", "The JSON file with the partition reassignment configuration" +
                       "The format to use is - \n" +
-                      "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t  \"partition\": 1,\n\t  \"replicas\": [1,2,3] }],\n\"version\":1\n}")
+                      "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t  \"partition\": 1,\n\t  \"replicas\": [1,2,3],\n\t  \"log_dirs\": [\"dir1\",\"dir2\",\"dir3\"] }],\n\"version\":1\n}\n" +
+                      "Note that \"log_dirs\" is optional. When it is specified, its length must equal the length of the replicas list. The value in this list " +
+                      "can be either \"any\" or the absolution path of the log directory on the broker. If absolute log directory path is specified, it is currently required that " +
+                      "the replica has not already been created on that broker. The replica will then be created in the specified log directory on the broker later.")
                       .withRequiredArg
                       .describedAs("manual assignment json file path")
                       .ofType(classOf[String])
@@ -313,23 +447,32 @@ object ReassignPartitionsCommand extends Logging {
                       .describedAs("throttle")
                       .defaultsTo("-1")
                       .ofType(classOf[Long])
+    val timeoutOpt = parser.accepts("timeout", "The maximum time in ms allowed to wait for partition reassignment execution to be successfully initiated")
+                      .withRequiredArg()
+                      .describedAs("timeout")
+                      .defaultsTo("10000")
+                      .ofType(classOf[Long])
     val options = parser.parse(args : _*)
   }
 }
 
-class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicAndPartition, Seq[Int]], admin: AdminUtilities = AdminUtils)
+class ReassignPartitionsCommand(zkUtils: ZkUtils,
+                                adminClientOpt: Option[JAdminClient],
+                                proposedPartitionAssignment: Map[TopicAndPartition, Seq[Int]],
+                                proposedReplicaAssignment: Map[TopicPartitionReplica, String] = Map.empty,
+                                admin: AdminUtilities = AdminUtils)
   extends Logging {
 
   import ReassignPartitionsCommand._
 
   def existingAssignment(): Map[TopicAndPartition, Seq[Int]] = {
-    val proposedTopics = proposedAssignment.keySet.map(_.topic).toSeq
+    val proposedTopics = proposedPartitionAssignment.keySet.map(_.topic).toSeq
     zkUtils.getReplicaAssignmentForTopics(proposedTopics)
   }
 
   private def maybeThrottle(throttle: Throttle): Unit = {
     if (throttle.value >= 0) {
-      assignThrottledReplicas(existingAssignment(), proposedAssignment)
+      assignThrottledReplicas(existingAssignment(), proposedPartitionAssignment)
       maybeLimit(throttle)
       throttle.postUpdateAction()
       println(s"The throttle limit was set to ${throttle.value} B/s")
@@ -343,7 +486,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
   def maybeLimit(throttle: Throttle) {
     if (throttle.value >= 0) {
       val existingBrokers = existingAssignment().values.flatten.toSeq
-      val proposedBrokers = proposedAssignment.values.flatten.toSeq
+      val proposedBrokers = proposedPartitionAssignment.values.flatten.toSeq
       val brokers = (existingBrokers ++ proposedBrokers).distinct
 
       for (id <- brokers) {
@@ -401,12 +544,45 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
       allProposed.filter { case (tp, _) => tp.topic == topic })
   }
 
-  def reassignPartitions(throttle: Throttle = NoThrottle): Boolean = {
+  def reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 10000L): Boolean = {
     maybeThrottle(throttle)
     try {
-      val validPartitions = proposedAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) }
+      val validPartitions = proposedPartitionAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) }
       if (validPartitions.isEmpty) false
       else {
+        if (proposedReplicaAssignment.nonEmpty) {
+          // Send AlterReplicaDirRequest to allow broker to create replica in the right log dir later if the replica
+          // has not been created it. This allows us to rebalance load across log directories in the cluster even if
+          // we can not move replicas between log directories on the same broker. We will be able to move replicas
+          // between log directories on the same broker after KIP-113 is implemented.
+          val adminClient = adminClientOpt.getOrElse(
+            throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory"))
+          val alterReplicaDirResult = adminClient.alterReplicaDir(
+            proposedReplicaAssignment.asJava, new AlterReplicaDirOptions().timeoutMs(timeoutMs.toInt))
+          alterReplicaDirResult.values().asScala.foreach { case (replica, future) => {
+              try {
+                /*
+                 * Before KIP-113 is fully implemented, user can only specify the destination log directory of the replica
+                 * if the replica has not already been created on the broker; otherwise the log directory specified in the
+                 * json file will not be enforced. Therefore we want to verify that broker will return ReplicaNotAvailableException
+                 * for this replica.
+                 *
+                 * After KIP-113 is fully implemented, we will not need to verify that the broker returns this ReplicaNotAvailableException
+                 * in this step. And after the reassignment znode is created, we will need to re-send AlterReplicaDirRequest to broker
+                 * if broker returns ReplicaNotAvailableException for any replica in the request.
+                 */
+                future.get()
+                throw new AdminCommandFailedException(s"Partition ${replica.topic()}-${replica.partition()} already exists on broker ${replica.brokerId()}." +
+                  s" Reassign replica to another log directory on the same broker is currently not supported.")
+              } catch {
+                case t: ExecutionException =>
+                  t.getCause match {
+                    case e: ReplicaNotAvailableException => // It is OK if the replica is not available
+                    case e: Throwable => throw e
+                  }
+              }
+          }}
+        }
         val jsonReassignmentData = ZkUtils.formatAsReassignmentJson(validPartitions)
         zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
         true
@@ -415,8 +591,14 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
       case _: ZkNodeExistsException =>
         val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
         throw new AdminCommandFailedException("Partition reassignment currently in " +
-        "progress for %s. Aborting operation".format(partitionsBeingReassigned))
-      case e: Throwable => error("Admin command failed", e); false
+          "progress for %s. Aborting operation".format(partitionsBeingReassigned))
+      case e: LogDirNotFoundException =>
+        throw new AdminCommandFailedException(s"The proposed replica assignment $proposedReplicaAssignment contains " +
+          s"invalid log directory. Aborting operation", e)
+      case e: AdminCommandFailedException => throw e
+      case e: Throwable =>
+        error("Admin command failed", e)
+        false
     }
   }
 
@@ -439,8 +621,8 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
   }
 }
 
-
 sealed trait ReassignmentStatus { def status: Int }
 case object ReassignmentCompleted extends ReassignmentStatus { val status = 1 }
 case object ReassignmentInProgress extends ReassignmentStatus { val status = 0 }
 case object ReassignmentFailed extends ReassignmentStatus { val status = -1 }
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 2377497..690f52a 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -30,7 +30,8 @@ import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.errors.{LogDirNotFoundException, KafkaStorageException}
+
 import scala.collection.JavaConverters._
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
@@ -87,6 +88,7 @@ class LogManager(logDirs: Array[File],
     (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap
 
   private def offlineLogDirs = logDirs.filterNot(_liveLogDirs.contains)
+  private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]()
 
   loadLogs()
 
@@ -525,6 +527,11 @@ class LogManager(logDirs: Array[File],
     }
   }
 
+  def updatePreferredLogDir(topicPartition: TopicPartition, logDir: String): Unit = {
+    // The logDir should be an absolute path
+    preferredLogDirs.put(topicPartition, logDir)
+  }
+
   /**
    * Get the log if it exists, otherwise return None
    */
@@ -545,9 +552,18 @@ class LogManager(logDirs: Array[File],
         if (!isNew && offlineLogDirs.nonEmpty)
           throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline")
 
-        val dataDir = nextLogDir()
+        val logDir = {
+          val preferredLogDir = preferredLogDirs.get(topicPartition)
+          if (preferredLogDir != null)
+            preferredLogDir
+          else
+            nextLogDir().getAbsolutePath
+        }
+        if (!isLogDirOnline(logDir))
+          throw new KafkaStorageException(s"Can not create log for $topicPartition because log directory $logDir is offline")
+
         try {
-          val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
+          val dir = new File(logDir, topicPartition.topic + "-" + topicPartition.partition)
           Files.createDirectories(dir.toPath)
 
           val log = Log(
@@ -567,13 +583,16 @@ class LogManager(logDirs: Array[File],
           info("Created log for partition [%s,%d] in %s with properties {%s}."
             .format(topicPartition.topic,
               topicPartition.partition,
-              dataDir.getAbsolutePath,
+              logDir,
               config.originals.asScala.mkString(", ")))
+          // Remove the preferred log dir since it has already been satisfied
+          preferredLogDirs.remove(topicPartition)
+
           log
         } catch {
           case e: IOException =>
-            val msg = s"Error while creating log for $topicPartition in dir ${dataDir.getAbsolutePath}"
-            logDirFailureChannel.maybeAddOfflineLogDir(dataDir.getAbsolutePath, msg, e)
+            val msg = s"Error while creating log for $topicPartition in dir ${logDir}"
+            logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
             throw new KafkaStorageException(msg, e)
         }
       }
@@ -606,6 +625,7 @@ class LogManager(logDirs: Array[File],
   /**
     * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and
     * add it in the queue for deletion.
+    *
     * @param topicPartition TopicPartition that needs to be deleted
     * @return the removed log
     */
@@ -704,8 +724,9 @@ class LogManager(logDirs: Array[File],
 
   // logDir should be an absolute path
   def isLogDirOnline(logDir: String): Boolean = {
+    // The logDir should be an absolute path
     if (!logDirs.exists(_.getAbsolutePath == logDir))
-      throw new RuntimeException(s"Log dir $logDir is not found in the config.")
+      throw new LogDirNotFoundException(s"Log dir $logDir is not found in the config.")
 
     _liveLogDirs.contains(new File(logDir))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1a85222..17c3f2d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -36,7 +36,7 @@ import kafka.log.{Log, LogManager, TimestampOffset}
 import kafka.network.RequestChannel
 import kafka.network.RequestChannel.{CloseConnectionAction, NoOpAction, SendAction}
 import kafka.security.SecurityUtils
-import kafka.security.auth._
+import kafka.security.auth.{Resource, _}
 import kafka.utils.{CoreUtils, Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
@@ -54,6 +54,7 @@ import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.requests.SaslHandshakeResponse
 import org.apache.kafka.common.resource.{Resource => AdminResource}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
+import DescribeLogDirsResponse.LogDirInfo
 
 import scala.collection.{mutable, _}
 import scala.collection.JavaConverters._
@@ -129,6 +130,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
         case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
         case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
+        case ApiKeys.ALTER_REPLICA_DIR => handleAlterReplicaDirRequest(request)
+        case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
       }
     } catch {
       case e: FatalExitError => throw e
@@ -1910,6 +1913,35 @@ class KafkaApis(val requestChannel: RequestChannel,
       new DescribeConfigsResponse(requestThrottleMs, (authorizedConfigs ++ unauthorizedConfigs).asJava))
   }
 
+  def handleAlterReplicaDirRequest(request: RequestChannel.Request): Unit = {
+    val alterReplicaDirRequest = request.body[AlterReplicaDirRequest]
+    val responseMap = {
+      if (authorize(request.session, Alter, Resource.ClusterResource))
+        replicaManager.alterReplicaDir(alterReplicaDirRequest.partitionDirs.asScala)
+      else
+        alterReplicaDirRequest.partitionDirs.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
+    }
+    sendResponseMaybeThrottle(request, requestThrottleMs => new AlterReplicaDirResponse(requestThrottleMs, responseMap.asJava))
+  }
+
+  def handleDescribeLogDirsRequest(request: RequestChannel.Request): Unit = {
+    val describeLogDirsDirRequest = request.body[DescribeLogDirsRequest]
+    val logDirInfos = {
+      if (authorize(request.session, Describe, Resource.ClusterResource)) {
+        val partitions =
+          if (describeLogDirsDirRequest.isAllTopicPartitions)
+            replicaManager.logManager.allLogs().map(_.topicPartition).toSet
+          else
+            describeLogDirsDirRequest.topicPartitions().asScala
+
+        replicaManager.describeLogDirs(partitions)
+      } else {
+        Map.empty[String, LogDirInfo]
+      }
+    }
+    sendResponseMaybeThrottle(request, throttleTimeMs => new DescribeLogDirsResponse(throttleTimeMs, logDirInfos.asJava))
+  }
+
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index ec3abff..2689980 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -32,7 +32,7 @@ import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.log.{LogConfig, LogManager}
 import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter}
-import kafka.network.{BlockingChannel, SocketServer}
+import kafka.network.SocketServer
 import kafka.security.CredentialProvider
 import kafka.security.auth.Authorizer
 import kafka.utils._

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 4a415e9..7920efe 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -30,13 +30,14 @@ import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{KafkaStorageException, ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _}
+import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, LogDirNotFoundException, InvalidTimestampException, InvalidTopicException, KafkaStorageException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION
 import org.apache.kafka.common.protocol.Errors.KAFKA_STORAGE_ERROR
 import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, ReplicaInfo}
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
@@ -538,6 +539,92 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  /*
+   * For each pair of partition and log directory specified in the map, record the pair in the memory so that the partition
+   * will be created in the specified log directory when broker receives LeaderAndIsrRequest for the partition later.
+   *
+   * This API is currently only useful if the replica has not been created yet. We will be able to move replicas
+   * that are already created to the user-specified log directory after KIP-113 is fully implemented
+   *
+   */
+  def alterReplicaDir(partitionDirs: Map[TopicPartition, String]): Map[TopicPartition, Errors] = {
+    partitionDirs.map { case (topicPartition, destinationDir) =>
+      try {
+        if (!logManager.isLogDirOnline(destinationDir))
+          throw new KafkaStorageException(s"Log directory $destinationDir is offline")
+
+        // If the log for this partition has not been created yet:
+        // 1) Respond with ReplicaNotAvailableException for this partition in the AlterReplicaDirResponse
+        // 2) Record the destination log directory in the memory so that the partition will be created in this log directory
+        //    when broker receives LeaderAndIsrRequest for this partition later.
+        getReplica(topicPartition) match {
+          case Some(_) => // The support for moving replica between log directories on the same broker is not available yet.
+          case None =>
+            logManager.updatePreferredLogDir(topicPartition, destinationDir)
+            throw new ReplicaNotAvailableException(s"Replica $localBrokerId is not available for partition $topicPartition")
+        }
+
+        (topicPartition, Errors.NONE)
+      } catch {
+        case e@(_: LogDirNotFoundException |
+                _: ReplicaNotAvailableException |
+                _: KafkaStorageException) =>
+          (topicPartition, Errors.forException(e))
+        case t: Throwable =>
+          error("Error while changing replica dir for partition %s".format(topicPartition), t)
+          (topicPartition, Errors.forException(t))
+      }
+    }
+  }
+
+  /*
+   * Get the LogDirInfo for the specified list of partitions.
+   *
+   * Each LogDirInfo specifies the following information for a given log directory:
+   * 1) Error of the log directory, e.g. whether the log is online or offline
+   * 2) size and lag of current and future logs for each partition in the given log directory. Only logs of the queried partitions
+   *    are included. There may be future logs (which will replace the current logs of the partition in the future) on the broker after KIP-113 is implemented.
+   */
+  def describeLogDirs(partitions: Set[TopicPartition]): Map[String, LogDirInfo] = {
+    val logsByDir = logManager.allLogs().groupBy(log => log.dir.getParent)
+
+    config.logDirs.toSet.map { logDir: String =>
+      val absolutePath = new File(logDir).getAbsolutePath
+      try {
+        if (!logManager.isLogDirOnline(absolutePath))
+          throw new KafkaStorageException(s"Log directory $absolutePath is offline")
+
+        logsByDir.get(absolutePath) match {
+          case Some(logs) =>
+            val replicaInfos = logs.filter(log =>
+              partitions.contains(log.topicPartition)
+            ).map(log => log.topicPartition -> new ReplicaInfo(log.size, getLogEndOffsetLag(log.topicPartition), false)).toMap
+
+            (absolutePath, new LogDirInfo(Errors.NONE, replicaInfos.asJava))
+          case None =>
+            (absolutePath, new LogDirInfo(Errors.NONE, Map.empty[TopicPartition, ReplicaInfo].asJava))
+        }
+
+      } catch {
+        case e: KafkaStorageException =>
+          (absolutePath, new LogDirInfo(Errors.KAFKA_STORAGE_ERROR, Map.empty[TopicPartition, ReplicaInfo].asJava))
+        case t: Throwable =>
+          error(s"Error while describing replica in dir $absolutePath", t)
+          (absolutePath, new LogDirInfo(Errors.forException(t), Map.empty[TopicPartition, ReplicaInfo].asJava))
+      }
+    }.toMap
+  }
+
+  def getLogEndOffsetLag(topicPartition: TopicPartition): Long = {
+    getReplica(topicPartition) match {
+      case Some(replica) =>
+          math.max(replica.highWatermark.messageOffset - replica.log.get.logEndOffset, 0)
+      case None =>
+        // return -1L to indicate that the LEO lag is not available if broker is neither follower or leader of this partition
+        DescribeLogDirsResponse.INVALID_OFFSET_LAG
+    }
+  }
+
   def deleteRecords(timeout: Long,
                     offsetPerPartition: Map[TopicPartition, Long],
                     responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index cb20b31..9582c50 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -166,8 +166,8 @@ object ZkUtils {
     DeleteTopicsPath + "/" + topic
 
   // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed
-  def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = {
-    for {
+  def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = {
+    val seq = for {
       js <- Json.parseFull(jsonData).toSeq
       partitionsSeq <- js.asJsonObject.get("partitions").toSeq
       p <- partitionsSeq.asJsonArray.iterator
@@ -178,11 +178,9 @@ object ZkUtils {
       val newReplicas = partitionFields("replicas").to[Seq[Int]]
       TopicAndPartition(topic, partition) -> newReplicas
     }
+    seq.toMap
   }
 
-  def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] =
-    parsePartitionReassignmentDataWithoutDedup(jsonData).toMap
-
   def parseTopicsData(jsonData: String): Seq[String] = {
     for {
       js <- Json.parseFull(jsonData).toSeq

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 8bd379a..49a75b9 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -19,6 +19,7 @@ package kafka.api
 import java.util
 import java.util.{Collections, Properties}
 import java.util.concurrent.{ExecutionException, TimeUnit}
+import java.io.File
 
 import org.apache.kafka.clients.admin.KafkaAdminClientTest
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -29,16 +30,17 @@ import org.apache.kafka.clients.admin._
 import kafka.utils.{Logging, TestUtils, ZkUtils}
 import kafka.utils.Implicits._
 import org.apache.kafka.clients.admin.NewTopic
-import org.apache.kafka.common.KafkaFuture
-import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
+import org.apache.kafka.common.{KafkaFuture, TopicPartition, TopicPartitionReplica}
+import org.apache.kafka.common.acl._
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors._
 import org.junit.{After, Before, Rule, Test}
 import org.apache.kafka.common.requests.MetadataResponse
 import org.apache.kafka.common.resource.{Resource, ResourceType}
 import org.junit.rules.Timeout
 import org.junit.Assert._
 
+import scala.util.Random
 import scala.collection.JavaConverters._
 
 /**
@@ -216,6 +218,77 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
   }
 
   @Test
+  def testDescribeLogDirs(): Unit = {
+    client = AdminClient.create(createConfig())
+    val topic = "topic"
+    val leaderByPartition = TestUtils.createTopic(zkUtils, topic, 10, 1, servers, new Properties())
+    val partitionsByBroker = leaderByPartition.groupBy { case (partitionId, leaderId) => leaderId }.mapValues(_.keys.toSeq)
+    val brokers = (0 until brokerCount).map(Integer.valueOf)
+    val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).all.get
+
+    (0 until brokerCount).foreach { brokerId =>
+      val server = servers.find(_.config.brokerId == brokerId).get
+      val expectedPartitions = partitionsByBroker(brokerId)
+      val logDirInfos = logDirInfosByBroker.get(brokerId)
+      val replicaInfos = logDirInfos.asScala.flatMap { case (logDir, logDirInfo) => logDirInfo.replicaInfos.asScala }.filterKeys(_.topic == topic)
+
+      assertEquals(expectedPartitions.toSet, replicaInfos.keys.map(_.partition).toSet)
+      logDirInfos.asScala.foreach { case (logDir, logDirInfo) =>
+        logDirInfo.replicaInfos.asScala.keys.foreach(tp =>
+          assertEquals(server.logManager.getLog(tp).get.dir.getParent, logDir)
+        )
+      }
+    }
+
+    client.close()
+  }
+
+  @Test
+  def testDescribeReplicaLogDir(): Unit = {
+    client = AdminClient.create(createConfig())
+    val topic = "topic"
+    val leaderByPartition = TestUtils.createTopic(zkUtils, topic, 10, 1, servers, new Properties())
+    val replicas = leaderByPartition.map { case (partition, brokerId) => new TopicPartitionReplica(topic, partition, brokerId) }.toSeq
+
+    val replicaDirInfos = client.describeReplicaLogDir(replicas.asJavaCollection).all.get
+    replicaDirInfos.asScala.foreach { case (topicPartitionReplica, replicaDirInfo) =>
+      val server = servers.find(_.config.brokerId == topicPartitionReplica.brokerId()).get
+      val tp = new TopicPartition(topicPartitionReplica.topic(), topicPartitionReplica.partition())
+      assertEquals(server.logManager.getLog(tp).get.dir.getParent, replicaDirInfo.getCurrentReplicaLogDir)
+    }
+
+    client.close()
+  }
+
+  @Test
+  def testAlterReplicaLogDirBeforeTopicCreation(): Unit = {
+    val adminClient = AdminClient.create(createConfig())
+    val topic = "topic"
+    val tp = new TopicPartition(topic, 0)
+
+    val replicaAssignment = servers.map { server =>
+      val logDir = new File(server.config.logDirs(Random.nextInt(2))).getAbsolutePath
+      new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir
+    }.toMap
+
+    adminClient.alterReplicaDir(replicaAssignment.asJava, new AlterReplicaDirOptions()).values().asScala.values.foreach { future =>
+      try {
+        future.get()
+        fail("Future should fail with ReplicaNotAvailableException")
+      } catch {
+        case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[ReplicaNotAvailableException])
+      }
+    }
+
+    TestUtils.createTopic(zkUtils, topic, 1, brokerCount, servers, new Properties())
+    servers.foreach { server =>
+      val logDir = server.logManager.getLog(tp).get.dir.getParent
+      assertEquals(replicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)), logDir)
+    }
+    adminClient.close()
+  }
+
+  @Test
   def testDescribeAndAlterConfigs(): Unit = {
     client = AdminClient.create(createConfig)
 
@@ -366,7 +439,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
 
   override def generateConfigs = {
     val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
-      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
+      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = 2)
     cfgs.foreach { config =>
       config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
       config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 1a4de99..67d15b3 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -64,6 +64,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val correlationId = 0
   val clientId = "client-Id"
   val tp = new TopicPartition(topic, part)
+  val logDir = "logDir"
   val topicAndPartition = TopicAndPartition(topic, part)
   val group = "my-group"
   val topicResource = new Resource(Topic, topic)
@@ -133,7 +134,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.TXN_OFFSET_COMMIT -> classOf[TxnOffsetCommitResponse],
       ApiKeys.CREATE_ACLS -> classOf[CreateAclsResponse],
       ApiKeys.DELETE_ACLS -> classOf[DeleteAclsResponse],
-      ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse]
+      ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse],
+      ApiKeys.ALTER_REPLICA_DIR -> classOf[AlterReplicaDirResponse],
+      ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse]
   )
 
   val requestKeyToError = Map[ApiKeys, Nothing => Errors](
@@ -167,7 +170,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => resp.errors.get(tp)),
     ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) => resp.aclCreationResponses.asScala.head.error.error),
     ApiKeys.DESCRIBE_ACLS -> ((resp: DescribeAclsResponse) => resp.error.error),
-    ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => resp.responses.asScala.head.error.error)
+    ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => resp.responses.asScala.head.error.error),
+    ApiKeys.ALTER_REPLICA_DIR -> ((resp: AlterReplicaDirResponse) => resp.responses.get(tp)),
+    ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) =>
+      if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED)
   )
 
   val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
@@ -199,7 +205,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.TXN_OFFSET_COMMIT -> (groupReadAcl ++ transactionIdWriteAcl),
     ApiKeys.CREATE_ACLS -> clusterAlterAcl,
     ApiKeys.DESCRIBE_ACLS -> clusterDescribeAcl,
-    ApiKeys.DELETE_ACLS -> clusterAlterAcl
+    ApiKeys.DELETE_ACLS -> clusterAlterAcl,
+    ApiKeys.ALTER_REPLICA_DIR -> clusterAlterAcl,
+    ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl
   )
 
   @Before
@@ -340,6 +348,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       new ResourceFilter(AdminResourceType.TOPIC, null),
       new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY)))).build()
 
+  private def alterReplicaDirRequest = new AlterReplicaDirRequest.Builder(Collections.singletonMap(tp, logDir)).build()
+
+  private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(Collections.singleton(tp)).build()
+
 
   @Test
   def testAuthorizationWithTopicExisting() {
@@ -366,7 +378,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.ALTER_CONFIGS -> alterConfigsRequest,
       ApiKeys.CREATE_ACLS -> createAclsRequest,
       ApiKeys.DELETE_ACLS -> deleteAclsRequest,
-      ApiKeys.DESCRIBE_ACLS -> describeAclsRequest
+      ApiKeys.DESCRIBE_ACLS -> describeAclsRequest,
+      ApiKeys.ALTER_REPLICA_DIR -> alterReplicaDirRequest,
+      ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index dd6c951..389cb8f 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -139,7 +139,7 @@ object ReplicationQuotasTestRig {
       val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1
 
       val start = System.currentTimeMillis()
-      ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(config.throttle))
+      ReassignPartitionsCommand.executeAssignment(zkUtils, None, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(config.throttle))
 
       //Await completion
       waitForReassignmentToComplete()

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 68e2c48..7f4eed7 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -203,7 +203,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val newReplicas = Seq(0, 2, 3)
     val partitionToBeReassigned = 0
     val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(topicAndPartition -> newReplicas))
     assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
@@ -233,7 +233,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val newReplicas = Seq(1, 2, 3)
     val partitionToBeReassigned = 0
     val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(topicAndPartition -> newReplicas))
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
@@ -262,7 +262,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
     val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(topicAndPartition -> newReplicas))
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
@@ -288,7 +288,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
     val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(topicAndPartition -> newReplicas))
     assertFalse("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     val reassignedPartitions = zkUtils.getPartitionsBeingReassigned()
     assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition))
@@ -305,7 +305,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val newReplicas = Seq(0, 1)
     val partitionToBeReassigned = 0
     val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(topicAndPartition -> newReplicas))
     reassignPartitionsCommand.reassignPartitions()
     // create brokers
     servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 7df3693..5f76aa7 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -126,7 +126,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // reassign partition 0
     val oldAssignedReplicas = zkUtils.getReplicasForPartition(topic, 0)
     val newReplicas = Seq(1, 2, 3)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(new TopicAndPartition(topicPartition) -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(new TopicAndPartition(topicPartition) -> newReplicas))
     assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {