You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/09/03 06:21:05 UTC

[3/3] kafka git commit: KAFKA-5694; Add AlterReplicaDirRequest and DescribeReplicaDirRequest (KIP-113 part-1)

KAFKA-5694; Add AlterReplicaDirRequest and DescribeReplicaDirRequest (KIP-113 part-1)

Author: Dong Lin <li...@gmail.com>

Reviewers: Jun Rao <ju...@gmail.com>, Jiangjie Qin <be...@gmail.com>, Colin P. Mccabe <cm...@confluent.io>

Closes #3621 from lindong28/KAFKA-5694


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/adefc8ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/adefc8ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/adefc8ea

Branch: refs/heads/trunk
Commit: adefc8ea076354e07839f0319fee1fba52343b91
Parents: b2a328d
Author: Dong Lin <li...@gmail.com>
Authored: Sat Sep 2 23:20:13 2017 -0700
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Sat Sep 2 23:20:13 2017 -0700

----------------------------------------------------------------------
 bin/kafka-log-dirs.sh                           |  17 ++
 .../kafka/clients/admin/AbstractOptions.java    |  46 ++++
 .../apache/kafka/clients/admin/AdminClient.java |  77 +++++-
 .../kafka/clients/admin/AdminClientConfig.java  |  10 +-
 .../clients/admin/AlterConfigsOptions.java      |  20 +-
 .../clients/admin/AlterReplicaDirOptions.java   |  29 ++
 .../clients/admin/AlterReplicaDirResult.java    |  57 ++++
 .../kafka/clients/admin/CreateAclsOptions.java  |  20 +-
 .../clients/admin/CreateTopicsOptions.java      |  21 +-
 .../kafka/clients/admin/DeleteAclsOptions.java  |  20 +-
 .../clients/admin/DeleteTopicsOptions.java      |  20 +-
 .../clients/admin/DescribeAclsOptions.java      |  20 +-
 .../clients/admin/DescribeClusterOptions.java   |  20 +-
 .../clients/admin/DescribeConfigsOptions.java   |  19 +-
 .../clients/admin/DescribeLogDirsOptions.java   |  33 +++
 .../clients/admin/DescribeLogDirsResult.java    |  70 +++++
 .../admin/DescribeReplicaLogDirOptions.java     |  31 +++
 .../admin/DescribeReplicaLogDirResult.java      | 132 +++++++++
 .../clients/admin/DescribeTopicsOptions.java    |  20 +-
 .../kafka/clients/admin/KafkaAdminClient.java   | 195 ++++++++++++-
 .../kafka/clients/admin/ListTopicsOptions.java  |  21 +-
 .../kafka/common/TopicPartitionReplica.java     |  91 +++++++
 .../common/errors/KafkaStorageException.java    |   2 +-
 .../common/errors/LogDirNotFoundException.java  |  37 +++
 .../apache/kafka/common/protocol/ApiKeys.java   |   4 +-
 .../apache/kafka/common/protocol/Errors.java    |   8 +
 .../apache/kafka/common/protocol/Protocol.java  |  56 +++-
 .../kafka/common/requests/AbstractRequest.java  |   4 +
 .../kafka/common/requests/AbstractResponse.java |   4 +
 .../common/requests/AlterReplicaDirRequest.java | 148 ++++++++++
 .../requests/AlterReplicaDirResponse.java       | 114 ++++++++
 .../common/requests/DescribeLogDirsRequest.java | 145 ++++++++++
 .../requests/DescribeLogDirsResponse.java       | 184 +++++++++++++
 .../main/scala/kafka/admin/LogDirsCommand.scala | 114 ++++++++
 .../kafka/admin/ReassignPartitionsCommand.scala | 272 ++++++++++++++++---
 core/src/main/scala/kafka/log/LogManager.scala  |  35 ++-
 .../src/main/scala/kafka/server/KafkaApis.scala |  34 ++-
 .../main/scala/kafka/server/KafkaServer.scala   |   2 +-
 .../scala/kafka/server/ReplicaManager.scala     |  89 +++++-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   8 +-
 .../kafka/api/AdminClientIntegrationTest.scala  |  81 +++++-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  22 +-
 .../other/kafka/ReplicationQuotasTestRig.scala  |   2 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |  10 +-
 .../unit/kafka/admin/DeleteTopicTest.scala      |   2 +-
 .../admin/ReassignPartitionsClusterTest.scala   | 139 ++++++++--
 .../admin/ReassignPartitionsCommandTest.scala   |  20 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |   2 +-
 .../server/AlterReplicaDirRequestTest.scala     |  83 ++++++
 .../unit/kafka/server/BaseRequestTest.scala     |   4 +-
 .../server/DescribeLogDirsRequestTest.scala     |  64 +++++
 .../unit/kafka/server/RequestQuotaTest.scala    |   9 +
 .../test/scala/unit/kafka/utils/TestUtils.scala |  30 +-
 53 files changed, 2399 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/bin/kafka-log-dirs.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-log-dirs.sh b/bin/kafka-log-dirs.sh
new file mode 100755
index 0000000..dc16edc
--- /dev/null
+++ b/bin/kafka-log-dirs.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.LogDirsCommand "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java
new file mode 100644
index 0000000..5b13dea
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+
+/*
+ * This class implements the common APIs that are shared by Options classes for various AdminClient commands
+ */
+public abstract class AbstractOptions<T extends AbstractOptions> {
+
+    private Integer timeoutMs = null;
+
+    /**
+     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
+     * AdminClient should be used.
+     */
+    public T timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return (T) this;
+    }
+
+    /**
+     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
+     * AdminClient should be used.
+     */
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index dd2aad6..3f4a07c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.annotation.InterfaceStability;
@@ -335,9 +336,6 @@ public abstract class AdminClient implements AutoCloseable {
     /**
      * Update the configuration for the specified resources with the default options.
      *
-     * Updates are not transactional so they may succeed for some resources while fail for others. The configs for
-     * a particular resource are updated atomically.
-     *
      * This operation is supported by brokers with version 0.11.0.0 or higher.
      *
      * @param configs         The resources with their configs (topic is the only resource type with configs that can
@@ -346,4 +344,77 @@ public abstract class AdminClient implements AutoCloseable {
      * @return                The AlterConfigsResult
      */
     public abstract AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);
+
+    /**
+     * Change the log directory for the specified replicas. This API is currently only useful if it is used
+     * before the replica has been created on the broker. It will support moving replicas that have already been created after
+     * KIP-113 is fully implemented.
+     *
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param replicaAssignment  The replicas with their log directory absolute path
+     * @return                   The AlterReplicaDirResult
+     */
+    public AlterReplicaDirResult alterReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment) {
+        return alterReplicaDir(replicaAssignment, new AlterReplicaDirOptions());
+    }
+
+    /**
+     * Change the log directory for the specified replicas. This API is currently only useful if it is used
+     * before the replica has been created on the broker. It will support moving replicas that have already been created after
+     * KIP-113 is fully implemented.
+     *
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param replicaAssignment  The replicas with their log directory absolute path
+     * @param options            The options to use when changing replica dir
+     * @return                   The AlterReplicaDirResult
+     */
+    public abstract AlterReplicaDirResult alterReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaDirOptions options);
+
+    /**
+     * Query the information of all log directories on the given set of brokers
+     *
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param brokers     A list of brokers
+     * @return            The DescribeLogDirsResult
+     */
+    public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers) {
+        return describeLogDirs(brokers, new DescribeLogDirsOptions());
+    }
+
+    /**
+     * Query the information of all log directories on the given set of brokers
+     *
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param brokers     A list of brokers
+     * @param options     The options to use when querying log dir info
+     * @return            The DescribeLogDirsResult
+     */
+    public abstract DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options);
+
+    /**
+     * Query the replica log directory information for the specified replicas.
+     *
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param replicas      The replicas to query
+     * @return              The DescribeReplicaLogDirResult
+     */
+    public DescribeReplicaLogDirResult describeReplicaLogDir(Collection<TopicPartitionReplica> replicas) {
+        return describeReplicaLogDir(replicas, new DescribeReplicaLogDirOptions());
+    }
+
+    /**
+     * Query the replica log directory information for the specified replicas.
+     *
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param replicas      The replicas to query
+     * @param options       The options to use when querying replica log dir info
+     * @return              The DescribeReplicaLogDirResult
+     */
+    public abstract DescribeReplicaLogDirResult describeReplicaLogDir(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirOptions options);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index ed51e67..f0a117c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -154,11 +154,11 @@ public class AdminClientConfig extends AbstractConfig {
                                 .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
                                 .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
                                 .define(METRICS_RECORDING_LEVEL_CONFIG,
-                                    Type.STRING,
-                                    Sensor.RecordingLevel.INFO.toString(),
-                                    in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
-                                    Importance.LOW,
-                                    METRICS_RECORDING_LEVEL_DOC)
+                                        Type.STRING,
+                                        Sensor.RecordingLevel.INFO.toString(),
+                                        in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
+                                        Importance.LOW,
+                                        METRICS_RECORDING_LEVEL_DOC)
                                 // security support
                                 .define(SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
index c5665c0..7c84e05 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
@@ -27,29 +27,11 @@ import java.util.Map;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class AlterConfigsOptions {
+public class AlterConfigsOptions extends AbstractOptions<AlterConfigsOptions> {
 
-    private Integer timeoutMs = null;
     private boolean validateOnly = false;
 
     /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public AlterConfigsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
-
-    /**
      * Return true if the request should be validated without altering the configs.
      */
     public boolean shouldValidateOnly() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirOptions.java
new file mode 100644
index 0000000..68d2ab6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Map;
+
+/**
+ * Options for {@link AdminClient#alterReplicaDir(Map, AlterReplicaDirOptions)}.
+ */
+@InterfaceStability.Evolving
+public class AlterReplicaDirOptions extends AbstractOptions<AlterReplicaDirOptions> {
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirResult.java
new file mode 100644
index 0000000..55bf85b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirResult.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Map;
+
+
+/**
+ * The result of {@link AdminClient#alterReplicaDir(Map, AlterReplicaDirOptions)}.
+ */
+@InterfaceStability.Evolving
+public class AlterReplicaDirResult {
+    private final Map<TopicPartitionReplica, KafkaFuture<Void>> futures;
+
+    AlterReplicaDirResult(Map<TopicPartitionReplica, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     *
+     * Return a map from replica to future which can be used to check the status of individual replica movement.
+     *
+     * Possible error code:
+     *
+     * LOG_DIR_NOT_FOUND (57)
+     * KAFKA_STORAGE_ERROR (56)
+     * REPLICA_NOT_AVAILABLE (9)
+     * UNKNOWN (-1)
+     */
+    public Map<TopicPartitionReplica, KafkaFuture<Void>> values() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds if all the replica movement have succeeded
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
index 008c678..410f079 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
@@ -27,24 +27,6 @@ import java.util.Collection;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class CreateAclsOptions {
-    private Integer timeoutMs = null;
-
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public CreateAclsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
+public class CreateAclsOptions extends AbstractOptions<CreateAclsOptions> {
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
index cb23a8d..7d4bd9e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
@@ -27,26 +27,9 @@ import java.util.Collection;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class CreateTopicsOptions {
-    private Integer timeoutMs = null;
-    private boolean validateOnly = false;
-
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public CreateTopicsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
+public class CreateTopicsOptions extends AbstractOptions<CreateTopicsOptions> {
 
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
+    private boolean validateOnly = false;
 
     /**
      * Set to true if the request should be validated without creating the topic.

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
index e06e775..ca57978 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
@@ -27,24 +27,6 @@ import java.util.Collection;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class DeleteAclsOptions {
-    private Integer timeoutMs = null;
-
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public DeleteAclsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
+public class DeleteAclsOptions extends AbstractOptions<DeleteAclsOptions> {
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
index ffe4ed7..d7c5af3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
@@ -27,24 +27,6 @@ import java.util.Collection;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class DeleteTopicsOptions {
-    private Integer timeoutMs = null;
-
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public DeleteTopicsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
+public class DeleteTopicsOptions extends AbstractOptions<DeleteTopicsOptions> {
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
index 55ae2e4..097cd19 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
@@ -26,24 +26,6 @@ import org.apache.kafka.common.annotation.InterfaceStability;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class DescribeAclsOptions {
-    private Integer timeoutMs = null;
-
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public DescribeAclsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
+public class DescribeAclsOptions extends AbstractOptions<DescribeAclsOptions> {
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
index 0c3ea51..cb5652b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
@@ -25,24 +25,6 @@ import org.apache.kafka.common.annotation.InterfaceStability;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class DescribeClusterOptions {
-    private Integer timeoutMs = null;
-
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public DescribeClusterOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
+public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptions> {
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
index cc7d9cc..bb37e6b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
@@ -27,23 +27,6 @@ import java.util.Collection;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class DescribeConfigsOptions {
-    private Integer timeoutMs = null;
+public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptions> {
 
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public DescribeConfigsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsOptions.java
new file mode 100644
index 0000000..48711bf
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsOptions.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+
+/**
+ * Options for {@link AdminClient#describeLogDirs(Collection<Integer>)}
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeLogDirsOptions extends AbstractOptions<DescribeLogDirsOptions> {
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
new file mode 100644
index 0000000..907b48d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.HashMap;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo;
+
+
+/**
+ * The result of the {@link AdminClient#describeLogDirs(Collection<Integer>)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeLogDirsResult {
+    private final Map<Integer, KafkaFuture<Map<String, LogDirInfo>>> futures;
+
+    DescribeLogDirsResult(Map<Integer, KafkaFuture<Map<String, LogDirInfo>>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from brokerId to future which can be used to check the information of partitions on each individual broker
+     */
+    public Map<Integer, KafkaFuture<Map<String, LogDirInfo>>> values() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds only if all the brokers have responded without error
+     */
+    public KafkaFuture<Map<Integer, Map<String, LogDirInfo>>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+            thenApply(new KafkaFuture.Function<Void, Map<Integer, Map<String, LogDirInfo>>>() {
+                @Override
+                public Map<Integer, Map<String, LogDirInfo>> apply(Void v) {
+                    Map<Integer, Map<String, LogDirInfo>> descriptions = new HashMap<>(futures.size());
+                    for (Map.Entry<Integer, KafkaFuture<Map<String, LogDirInfo>>> entry : futures.entrySet()) {
+                        try {
+                            descriptions.put(entry.getKey(), entry.getValue().get());
+                        } catch (InterruptedException | ExecutionException e) {
+                            // This should be unreachable, because allOf ensured that all the futures completed successfully.
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    return descriptions;
+                }
+            });
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirOptions.java
new file mode 100644
index 0000000..72d9643
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Collection;
+
+/**
+ * Options for {@link AdminClient#describeReplicaLogDir(Collection<org.apache.kafka.common.TopicPartitionReplica>)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeReplicaLogDirOptions extends AbstractOptions<DescribeReplicaLogDirOptions> {
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirResult.java
new file mode 100644
index 0000000..6139cc7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirResult.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+
+
+/**
+ * The result of {@link AdminClient#describeReplicaLogDir(Collection)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeReplicaLogDirResult {
+    private final Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> futures;
+
+    DescribeReplicaLogDirResult(Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from replica to future which can be used to check the log directory information of individual replicas
+     */
+    public Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> values() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds if log directory information of all replicas are available
+     */
+    public KafkaFuture<Map<TopicPartitionReplica, ReplicaLogDirInfo>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+            thenApply(new KafkaFuture.Function<Void, Map<TopicPartitionReplica, ReplicaLogDirInfo>>() {
+                @Override
+                public Map<TopicPartitionReplica, ReplicaLogDirInfo> apply(Void v) {
+                    Map<TopicPartitionReplica, ReplicaLogDirInfo> replicaLogDirInfos = new HashMap<>();
+                    for (Map.Entry<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> entry : futures.entrySet()) {
+                        try {
+                            replicaLogDirInfos.put(entry.getKey(), entry.getValue().get());
+                        } catch (InterruptedException | ExecutionException e) {
+                            // This should be unreachable, because allOf ensured that all the futures completed successfully.
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    return replicaLogDirInfos;
+                }
+            });
+    }
+
+    static public class ReplicaLogDirInfo {
+        // The current log directory of the replica of this partition on the given broker.
+        // Null if no replica is not found for this partition on the given broker.
+        private final String currentReplicaLogDir;
+        // Defined as max(HW of partition - LEO of the replica, 0).
+        private final long currentReplicaOffsetLag;
+        // The future log directory of the replica of this partition on the given broker.
+        // Null if the replica of this partition is not being moved to another log directory on the given broker.
+        private final String futureReplicaLogDir;
+        // The LEO of the replica - LEO of the future log of this replica in the destination log directory.
+        // -1 if either there is not replica for this partition or the replica of this partition is not being moved to another log directory on the given broker.
+        private final long futureReplicaOffsetLag;
+
+        ReplicaLogDirInfo() {
+            this(null, DescribeLogDirsResponse.INVALID_OFFSET_LAG, null, DescribeLogDirsResponse.INVALID_OFFSET_LAG);
+        }
+
+        ReplicaLogDirInfo(String currentReplicaLogDir,
+                          long currentReplicaOffsetLag,
+                          String futureReplicaLogDir,
+                          long futureReplicaOffsetLag) {
+            this.currentReplicaLogDir = currentReplicaLogDir;
+            this.currentReplicaOffsetLag = currentReplicaOffsetLag;
+            this.futureReplicaLogDir = futureReplicaLogDir;
+            this.futureReplicaOffsetLag = futureReplicaOffsetLag;
+        }
+
+        public String getCurrentReplicaLogDir() {
+            return currentReplicaLogDir;
+        }
+
+        public long getCurrentReplicaOffsetLag() {
+            return currentReplicaOffsetLag;
+        }
+
+        public String getFutureReplicaLogDir() {
+            return futureReplicaLogDir;
+        }
+
+        public long getFutureReplicaOffsetLag() {
+            return futureReplicaOffsetLag;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            if (futureReplicaLogDir != null) {
+                builder.append("(currentReplicaLogDir=")
+                    .append(currentReplicaLogDir)
+                    .append(", futureReplicaLogDir=")
+                    .append(futureReplicaLogDir)
+                    .append(", futureReplicaOffsetLag=")
+                    .append(futureReplicaOffsetLag)
+                    .append(")");
+            } else {
+                builder.append("ReplicaLogDirInfo(currentReplicaLogDir=").append(currentReplicaLogDir).append(")");
+            }
+            return builder.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
index f81569e..64ead48 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
@@ -27,24 +27,6 @@ import java.util.Collection;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class DescribeTopicsOptions {
-    private Integer timeoutMs = null;
-
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public DescribeTopicsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
+public class DescribeTopicsOptions extends AbstractOptions<DescribeTopicsOptions> {
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index dca9f16..49ac93e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.Set;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
@@ -26,12 +27,15 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
+import org.apache.kafka.clients.admin.DescribeReplicaLogDirResult.ReplicaLogDirInfo;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.annotation.InterfaceStability;
@@ -58,6 +62,8 @@ import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.AlterConfigsRequest;
 import org.apache.kafka.common.requests.AlterConfigsResponse;
+import org.apache.kafka.common.requests.AlterReplicaDirRequest;
+import org.apache.kafka.common.requests.AlterReplicaDirResponse;
 import org.apache.kafka.common.requests.CreateAclsRequest;
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsResponse;
@@ -75,6 +81,8 @@ import org.apache.kafka.common.requests.DescribeAclsResponse;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.DescribeConfigsRequest;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.requests.DescribeLogDirsRequest;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.Resource;
@@ -395,7 +403,7 @@ public class KafkaAdminClient extends AdminClient {
             thread.join();
 
             AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
-            
+
             log.debug("Kafka admin client closed.");
         } catch (InterruptedException e) {
             log.debug("Interrupted while joining I/O thread", e);
@@ -1595,4 +1603,189 @@ public class KafkaAdminClient extends AdminClient {
         }, now);
         return new AlterConfigsResult(new HashMap<ConfigResource, KafkaFuture<Void>>(futures));
     }
+
+    @Override
+    public AlterReplicaDirResult alterReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaDirOptions options) {
+        final Map<TopicPartitionReplica, KafkaFutureImpl<Void>> futures = new HashMap<>(replicaAssignment.size());
+
+        for (TopicPartitionReplica replica : replicaAssignment.keySet()) {
+            futures.put(replica, new KafkaFutureImpl<Void>());
+        }
+
+        Map<Integer, Map<TopicPartition, String>> replicaAssignmentByBroker = new HashMap<>();
+
+        for (Map.Entry<TopicPartitionReplica, String> entry: replicaAssignment.entrySet()) {
+            TopicPartitionReplica replica = entry.getKey();
+            String logDir = entry.getValue();
+            int brokerId = replica.brokerId();
+            TopicPartition topicPartition = new TopicPartition(replica.topic(), replica.partition());
+            if (!replicaAssignmentByBroker.containsKey(brokerId))
+                replicaAssignmentByBroker.put(brokerId, new HashMap<TopicPartition, String>());
+            replicaAssignmentByBroker.get(brokerId).put(topicPartition, logDir);
+        }
+
+        final long now = time.milliseconds();
+        for (Map.Entry<Integer, Map<TopicPartition, String>> entry: replicaAssignmentByBroker.entrySet()) {
+            final int brokerId = entry.getKey();
+            final Map<TopicPartition, String> assignment = entry.getValue();
+
+            runnable.call(new Call("alterReplicaDir", calcDeadlineMs(now, options.timeoutMs()),
+                new ConstantNodeIdProvider(brokerId)) {
+
+                @Override
+                public AbstractRequest.Builder createRequest(int timeoutMs) {
+                    return new AlterReplicaDirRequest.Builder(assignment);
+                }
+
+                @Override
+                public void handleResponse(AbstractResponse abstractResponse) {
+                    AlterReplicaDirResponse response = (AlterReplicaDirResponse) abstractResponse;
+                    for (Map.Entry<TopicPartition, Errors> responseEntry: response.responses().entrySet()) {
+                        TopicPartition tp = responseEntry.getKey();
+                        Errors error = responseEntry.getValue();
+                        TopicPartitionReplica replica = new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId);
+                        KafkaFutureImpl<Void> future = futures.get(replica);
+                        if (future == null) {
+                            handleFailure(new IllegalArgumentException(
+                                "The partition " + tp + " in the response from broker " + brokerId + " is not in the request"));
+                        } else if (error == Errors.NONE) {
+                            future.complete(null);
+                        } else {
+                            future.completeExceptionally(error.exception());
+                        }
+                    }
+                }
+                @Override
+                void handleFailure(Throwable throwable) {
+                    completeAllExceptionally(futures.values(), throwable);
+                }
+            }, now);
+        }
+
+        return new AlterReplicaDirResult(new HashMap<TopicPartitionReplica, KafkaFuture<Void>>(futures));
+    }
+
+    @Override
+    public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options) {
+        final Map<Integer, KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>>> futures = new HashMap<>(brokers.size());
+
+        for (Integer brokerId: brokers) {
+            futures.put(brokerId, new KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>>());
+        }
+
+        final long now = time.milliseconds();
+        for (final Integer brokerId: brokers) {
+            runnable.call(new Call("describeLogDirs", calcDeadlineMs(now, options.timeoutMs()),
+                new ConstantNodeIdProvider(brokerId)) {
+
+                @Override
+                public AbstractRequest.Builder createRequest(int timeoutMs) {
+                    // Query selected partitions in all log directories
+                    return new DescribeLogDirsRequest.Builder(null);
+                }
+
+                @Override
+                public void handleResponse(AbstractResponse abstractResponse) {
+                    DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse;
+                    KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>> future = futures.get(brokerId);
+                    if (response.logDirInfos().size() > 0) {
+                        future.complete(response.logDirInfos());
+                    } else {
+                        // response.logDirInfos() will be empty if and only if the user is not authorized to describe clsuter resource.
+                        future.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
+                    }
+                }
+                @Override
+                void handleFailure(Throwable throwable) {
+                    completeAllExceptionally(futures.values(), throwable);
+                }
+            }, now);
+        }
+
+        return new DescribeLogDirsResult(new HashMap<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>>(futures));
+    }
+
+    @Override
+    public DescribeReplicaLogDirResult describeReplicaLogDir(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirOptions options) {
+        final Map<TopicPartitionReplica, KafkaFutureImpl<DescribeReplicaLogDirResult.ReplicaLogDirInfo>> futures = new HashMap<>(replicas.size());
+
+        for (TopicPartitionReplica replica : replicas) {
+            futures.put(replica, new KafkaFutureImpl<DescribeReplicaLogDirResult.ReplicaLogDirInfo>());
+        }
+
+        Map<Integer, Set<TopicPartition>> partitionsByBroker = new HashMap<>();
+
+        for (TopicPartitionReplica replica: replicas) {
+            if (!partitionsByBroker.containsKey(replica.brokerId()))
+                partitionsByBroker.put(replica.brokerId(), new HashSet<TopicPartition>());
+            partitionsByBroker.get(replica.brokerId()).add(new TopicPartition(replica.topic(), replica.partition()));
+        }
+
+        final long now = time.milliseconds();
+        for (Map.Entry<Integer, Set<TopicPartition>> entry: partitionsByBroker.entrySet()) {
+            final int brokerId = entry.getKey();
+            final Set<TopicPartition> topicPartitions = entry.getValue();
+            final Map<TopicPartition, ReplicaLogDirInfo> replicaDirInfoByPartition = new HashMap<>();
+            for (TopicPartition topicPartition: topicPartitions)
+                replicaDirInfoByPartition.put(topicPartition, new ReplicaLogDirInfo());
+
+            runnable.call(new Call("describeReplicaLogDir", calcDeadlineMs(now, options.timeoutMs()),
+                new ConstantNodeIdProvider(brokerId)) {
+
+                @Override
+                public AbstractRequest.Builder createRequest(int timeoutMs) {
+                    // Query selected partitions in all log directories
+                    return new DescribeLogDirsRequest.Builder(topicPartitions);
+                }
+
+                @Override
+                public void handleResponse(AbstractResponse abstractResponse) {
+                    DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse;
+                    for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> responseEntry: response.logDirInfos().entrySet()) {
+                        String logDir = responseEntry.getKey();
+                        DescribeLogDirsResponse.LogDirInfo logDirInfo = responseEntry.getValue();
+
+                        // No replica info will be provided if the log directory is offline
+                        if (logDirInfo.error == Errors.KAFKA_STORAGE_ERROR)
+                            continue;
+                        else if (logDirInfo.error != Errors.NONE)
+                            handleFailure(new IllegalArgumentException(
+                                "The error " + logDirInfo.error + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal"));
+
+                        for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfoEntry: logDirInfo.replicaInfos.entrySet()) {
+                            TopicPartition tp = replicaInfoEntry.getKey();
+                            DescribeLogDirsResponse.ReplicaInfo replicaInfo = replicaInfoEntry.getValue();
+                            ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp);
+                            if (replicaLogDirInfo == null) {
+                                handleFailure(new IllegalArgumentException(
+                                    "The partition " + tp + " in the response from broker " + brokerId + " is not in the request"));
+                            } else if (replicaInfo.isFuture) {
+                                replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
+                                                                                        replicaLogDirInfo.getCurrentReplicaOffsetLag(),
+                                                                                        logDir,
+                                                                                        replicaInfo.offsetLag));
+                            } else {
+                                replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir,
+                                                                                        replicaInfo.offsetLag,
+                                                                                        replicaLogDirInfo.getFutureReplicaLogDir(),
+                                                                                        replicaLogDirInfo.getFutureReplicaOffsetLag()));
+                            }
+                        }
+                    }
+
+                    for (Map.Entry<TopicPartition, ReplicaLogDirInfo> entry: replicaDirInfoByPartition.entrySet()) {
+                        TopicPartition tp = entry.getKey();
+                        KafkaFutureImpl<ReplicaLogDirInfo> future = futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId));
+                        future.complete(entry.getValue());
+                    }
+                }
+                @Override
+                void handleFailure(Throwable throwable) {
+                    completeAllExceptionally(futures.values(), throwable);
+                }
+            }, now);
+        }
+
+        return new DescribeReplicaLogDirResult(new HashMap<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>>(futures));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
index 81d834f..f656ff4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
@@ -25,26 +25,9 @@ import org.apache.kafka.common.annotation.InterfaceStability;
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class ListTopicsOptions {
-    private Integer timeoutMs = null;
-    private boolean listInternal = false;
-
-    /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public ListTopicsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
+public class ListTopicsOptions extends AbstractOptions<ListTopicsOptions> {
 
-    /**
-     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     */
-    public Integer timeoutMs() {
-        return timeoutMs;
-    }
+    private boolean listInternal = false;
 
     /**
      * Set whether we should list internal topics.

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/TopicPartitionReplica.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartitionReplica.java b/clients/src/main/java/org/apache/kafka/common/TopicPartitionReplica.java
new file mode 100644
index 0000000..2a10439
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartitionReplica.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.io.Serializable;
+
+
+/**
+ * The topic name, partition number and the brokerId of the replica
+ */
+public final class TopicPartitionReplica implements Serializable {
+
+    private int hash = 0;
+    private final int brokerId;
+    private final int partition;
+    private final String topic;
+
+    public TopicPartitionReplica(String topic, int partition, int brokerId) {
+        this.topic = topic;
+        this.partition = partition;
+        this.brokerId = brokerId;
+    }
+
+    public String topic() {
+        return topic;
+    }
+
+    public int partition() {
+        return partition;
+    }
+
+    public int brokerId() {
+        return brokerId;
+    }
+
+    @Override
+    public int hashCode() {
+        if (hash != 0) {
+            return hash;
+        }
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+        result = prime * result + partition;
+        result = prime * result + brokerId;
+        this.hash = result;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        TopicPartitionReplica other = (TopicPartitionReplica) obj;
+        if (partition != other.partition)
+            return false;
+        if (brokerId != other.brokerId)
+            return false;
+        if (topic == null) {
+            if (other.topic != null) {
+                return false;
+            }
+        } else if (!topic.equals(other.topic)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s-%d-%d", topic, partition, brokerId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java b/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java
index 00c7cee..c45afb0 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java
@@ -23,7 +23,7 @@ package org.apache.kafka.common.errors;
  * Here are the guidelines on how to handle KafkaStorageException and IOException:
  *
  * 1) If the server has not finished loading logs, IOException does not need to be converted to KafkaStorageException
- * 2) After the server has finished loading logs, IOException should be caught and trigger LogDirFailureChannel.maybeAddLogFailureEvent
+ * 2) After the server has finished loading logs, IOException should be caught and trigger LogDirFailureChannel.maybeAddOfflineLogDir()
  *    Then the IOException should either be swallowed and logged, or be converted and re-thrown as KafkaStorageException
  * 3) It is preferred for IOException to be caught in Log rather than in ReplicaManager or LogSegment.
  *

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/errors/LogDirNotFoundException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/LogDirNotFoundException.java b/clients/src/main/java/org/apache/kafka/common/errors/LogDirNotFoundException.java
new file mode 100644
index 0000000..0a4ae16
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/LogDirNotFoundException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Thrown when a request is made for a log directory that is not present on the broker
+ */
+public class LogDirNotFoundException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public LogDirNotFoundException(String message) {
+        super(message);
+    }
+
+    public LogDirNotFoundException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public LogDirNotFoundException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 721a610..5ac02fa 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -68,7 +68,9 @@ public enum ApiKeys {
     CREATE_ACLS(30, "CreateAcls"),
     DELETE_ACLS(31, "DeleteAcls"),
     DESCRIBE_CONFIGS(32, "DescribeConfigs"),
-    ALTER_CONFIGS(33, "AlterConfigs");
+    ALTER_CONFIGS(33, "AlterConfigs"),
+    ALTER_REPLICA_DIR(34, "AlterReplicaDir"),
+    DESCRIBE_LOG_DIRS(35, "DescribeLogDirs");
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 19acfd6..9decef2 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.ControllerMovedException;
 import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.LogDirNotFoundException;
 import org.apache.kafka.common.errors.DuplicateSequenceNumberException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
@@ -508,6 +509,13 @@ public enum Errors {
             public ApiException build(String message) {
                 return new KafkaStorageException(message);
             }
+    }),
+    LOG_DIR_NOT_FOUND(57, "The user-specified log directory is not found in the broker config.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new LogDirNotFoundException(message);
+            }
     });
 
     private interface ApiExceptionBuilder {

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 9f6ae3d..10b1823 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -1830,6 +1830,56 @@ public class Protocol {
     public static final Schema[] DELETE_ACLS_REQUEST = {DELETE_ACLS_REQUEST_V0};
     public static final Schema[] DELETE_ACLS_RESPONSE = {DELETE_ACLS_RESPONSE_V0};
 
+    public static final Schema ALTER_REPLICA_DIR_REQUEST_V0 = new Schema(
+        new Field("log_dirs", new ArrayOf(new Schema(
+                new Field("log_dir", STRING, "The absolute log directory path."),
+                new Field("topics", new ArrayOf(new Schema(
+                    new Field("topic", STRING, "The name of the topic."),
+                    new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic.")
+                )))
+        ))));
+
+    public static final Schema ALTER_REPLICA_DIR_RESPONSE_V0 = new Schema(
+        newThrottleTimeField(),
+        new Field("topics", new ArrayOf(new Schema(
+                new Field("topic", STRING, "The name of the topic."),
+                new Field("partitions", new ArrayOf(new Schema(
+                    new Field("partition", INT32, "The id of the partition."),
+                    new Field("error_code", INT16, "The error code for the partition.")
+                )))
+        ))));
+
+    public static final Schema[] ALTER_REPLICA_DIR_REQUEST = {ALTER_REPLICA_DIR_REQUEST_V0};
+    public static final Schema[] ALTER_REPLICA_DIR_RESPONSE = {ALTER_REPLICA_DIR_RESPONSE_V0};
+
+    public static final Schema DESCRIBE_LOG_DIRS_REQUEST_V0 = new Schema(
+        new Field("topics", ArrayOf.nullable(new Schema(
+            new Field("topic", STRING, "The name of the topic."),
+            new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic.")
+        )))
+    );
+
+    public static final Schema DESCRIBE_LOG_DIRS_RESPONSE_V0 = new Schema(
+        newThrottleTimeField(),
+        new Field("log_dirs",
+            new ArrayOf(new Schema(
+                new Field("error_code", INT16, "The error code for the log directory."),
+                new Field("log_dir", STRING, "The absolute log directory path."),
+                new Field("topics", new ArrayOf(new Schema(
+                    new Field("topic", STRING, "The name of the topic."),
+                    new Field("partitions", new ArrayOf(new Schema(
+                        new Field("partition", INT32, "The id of the partition."),
+                        new Field("size", INT64, "The size of the log segments of the partition in bytes."),
+                        new Field("offset_lag", INT64,
+                            "The lag of the log's LEO w.r.t. partition's HW (if it is the current log for the partition) or current replica's LEO (if it is the future log for the partition)"),
+                        new Field("is_future", BOOLEAN, "True if this log is created by AlterReplicaDirRequest and will replace the current log of the replica in the future.")
+                    )))
+                )))
+            ))));
+
+    public static final Schema[] DESCRIBE_LOG_DIRS_REQUEST = {DESCRIBE_LOG_DIRS_REQUEST_V0};
+    public static final Schema[] DESCRIBE_LOG_DIRS_RESPONSE = {DESCRIBE_LOG_DIRS_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
      * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -1875,6 +1925,8 @@ public class Protocol {
         REQUESTS[ApiKeys.DELETE_ACLS.id] = DELETE_ACLS_REQUEST;
         REQUESTS[ApiKeys.DESCRIBE_CONFIGS.id] = DESCRIBE_CONFIGS_REQUEST;
         REQUESTS[ApiKeys.ALTER_CONFIGS.id] = ALTER_CONFIGS_REQUEST;
+        REQUESTS[ApiKeys.ALTER_REPLICA_DIR.id] = ALTER_REPLICA_DIR_REQUEST;
+        REQUESTS[ApiKeys.DESCRIBE_LOG_DIRS.id] = DESCRIBE_LOG_DIRS_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -1910,6 +1962,8 @@ public class Protocol {
         RESPONSES[ApiKeys.DELETE_ACLS.id] = DELETE_ACLS_RESPONSE;
         RESPONSES[ApiKeys.DESCRIBE_CONFIGS.id] = DESCRIBE_CONFIGS_RESPONSE;
         RESPONSES[ApiKeys.ALTER_CONFIGS.id] = ALTER_CONFIGS_RESPONSE;
+        RESPONSES[ApiKeys.ALTER_REPLICA_DIR.id] = ALTER_REPLICA_DIR_RESPONSE;
+        RESPONSES[ApiKeys.DESCRIBE_LOG_DIRS.id] = DESCRIBE_LOG_DIRS_RESPONSE;
 
         /* set the minimum and maximum version of each api */
         for (ApiKeys api : ApiKeys.values()) {
@@ -1988,7 +2042,7 @@ public class Protocol {
     public static boolean requiresDelayedDeallocation(int apiKey) {
         return DELAYED_DEALLOCATION_REQUESTS.contains(ApiKeys.forId(apiKey));
     }
-    
+
     public static Schema requestHeaderSchema(short apiKey, short version) {
         if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY.id && version == 0)
             // This will be removed once we remove support for v0 of ControlledShutdownRequest, which

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 00de8c1..f819371 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -177,6 +177,10 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return new DescribeConfigsRequest(struct, apiVersion);
             case ALTER_CONFIGS:
                 return new AlterConfigsRequest(struct, apiVersion);
+            case ALTER_REPLICA_DIR:
+                return new AlterReplicaDirRequest(struct, apiVersion);
+            case DESCRIBE_LOG_DIRS:
+                return new DescribeLogDirsRequest(struct, apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 5f1f615..f9ff6e8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -110,6 +110,10 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new DescribeConfigsResponse(struct);
             case ALTER_CONFIGS:
                 return new AlterConfigsResponse(struct);
+            case ALTER_REPLICA_DIR:
+                return new AlterReplicaDirResponse(struct);
+            case DESCRIBE_LOG_DIRS:
+                return new DescribeLogDirsResponse(struct);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/adefc8ea/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
new file mode 100644
index 0000000..2c2401b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class AlterReplicaDirRequest extends AbstractRequest {
+
+    // request level key names
+    private static final String LOG_DIRS_KEY_NAME = "log_dirs";
+
+    // log dir level key names
+    private static final String LOG_DIR_KEY_NAME = "log_dir";
+    private static final String TOPICS_KEY_NAME = "topics";
+
+    // topic level key names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    private final Map<TopicPartition, String> partitionDirs;
+
+    public static class Builder extends AbstractRequest.Builder<AlterReplicaDirRequest> {
+        private final Map<TopicPartition, String> partitionDirs;
+
+        public Builder(Map<TopicPartition, String> partitionDirs) {
+            super(ApiKeys.ALTER_REPLICA_DIR);
+            this.partitionDirs = partitionDirs;
+        }
+
+        @Override
+        public AlterReplicaDirRequest build(short version) {
+            return new AlterReplicaDirRequest(partitionDirs, version);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("(type=AlterReplicaDirRequest")
+                .append(", partitionDirs=")
+                .append(partitionDirs)
+                .append(")");
+            return builder.toString();
+        }
+    }
+
+    public AlterReplicaDirRequest(Struct struct, short version) {
+        super(version);
+        partitionDirs = new HashMap<>();
+        for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) {
+            Struct logDirStruct = (Struct) logDirStructObj;
+            String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME);
+            for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) {
+                Struct topicStruct = (Struct) topicStructObj;
+                String topic = topicStruct.getString(TOPIC_KEY_NAME);
+                for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+                    int partition = (Integer) partitionObj;
+                    partitionDirs.put(new TopicPartition(topic, partition), logDir);
+                }
+            }
+        }
+    }
+
+    public AlterReplicaDirRequest(Map<TopicPartition, String> partitionDirs, short version) {
+        super(version);
+        this.partitionDirs = partitionDirs;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Map<String, List<TopicPartition>> dirPartitions = new HashMap<>();
+        for (Map.Entry<TopicPartition, String> entry: partitionDirs.entrySet()) {
+            if (!dirPartitions.containsKey(entry.getValue()))
+                dirPartitions.put(entry.getValue(), new ArrayList<TopicPartition>());
+            dirPartitions.get(entry.getValue()).add(entry.getKey());
+        }
+
+        Struct struct = new Struct(ApiKeys.ALTER_REPLICA_DIR.requestSchema(version()));
+        List<Struct> logDirStructArray = new ArrayList<>();
+        for (Map.Entry<String, List<TopicPartition>> logDirEntry: dirPartitions.entrySet()) {
+            Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME);
+            logDirStruct.set(LOG_DIR_KEY_NAME, logDirEntry.getKey());
+
+            List<Struct> topicStructArray = new ArrayList<>();
+            for (Map.Entry<String, List<Integer>> topicEntry: CollectionUtils.groupDataByTopic(logDirEntry.getValue()).entrySet()) {
+                Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME);
+                topicStruct.set(TOPIC_KEY_NAME, topicEntry.getKey());
+                topicStruct.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
+                topicStructArray.add(topicStruct);
+            }
+            logDirStruct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+            logDirStructArray.add(logDirStruct);
+        }
+        struct.set(LOG_DIRS_KEY_NAME, logDirStructArray.toArray());
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        Map<TopicPartition, Errors> responseMap = new HashMap<>();
+
+        for (Map.Entry<TopicPartition, String> entry : partitionDirs.entrySet()) {
+            responseMap.put(entry.getKey(), Errors.forException(e));
+        }
+
+        short versionId = version();
+        switch (versionId) {
+            case 0:
+                return new AlterReplicaDirResponse(throttleTimeMs, responseMap);
+            default:
+                throw new IllegalArgumentException(
+                    String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId,
+                        this.getClass().getSimpleName(), ApiKeys.ALTER_REPLICA_DIR.latestVersion()));
+        }
+    }
+
+    public Map<TopicPartition, String> partitionDirs() {
+        return partitionDirs;
+    }
+
+    public static AlterReplicaDirRequest parse(ByteBuffer buffer, short version) {
+        return new AlterReplicaDirRequest(ApiKeys.ALTER_REPLICA_DIR.parseRequest(version, buffer), version);
+    }
+}