You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/10/03 05:24:14 UTC
[1/2] kafka git commit: KAFKA-5995;
Rename AlterReplicaDir to AlterReplicaDirs
Repository: kafka
Updated Branches:
refs/heads/trunk e110e1c1e -> a1a5e93be
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/core/src/test/scala/unit/kafka/server/AlterReplicaDirRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AlterReplicaDirRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterReplicaDirRequestTest.scala
deleted file mode 100644
index 6e22444..0000000
--- a/core/src/test/scala/unit/kafka/server/AlterReplicaDirRequestTest.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.network.SocketServer
-import kafka.utils._
-import java.io.File
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests._
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-class AlterReplicaDirRequestTest extends BaseRequestTest {
-
- override def numBrokers: Int = 1
- override def logDirCount: Int = 5
-
- val topic = "topic"
-
- @Test
- def testAlterReplicaDirRequestBeforeTopicCreation() {
- val partitionNum = 5
- val logDir = new File(servers.head.config.logDirs.head).getAbsolutePath
- val partitionDirs = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir).toMap
- val alterReplicaDirResponse = sendAlterReplicaDirRequest(partitionDirs)
-
- // The response should show error REPLICA_NOT_AVAILABLE for all partitions
- (0 until partitionNum).foreach { partition =>
- val tp = new TopicPartition(topic, partition)
- assertEquals(Errors.REPLICA_NOT_AVAILABLE, alterReplicaDirResponse.responses().get(tp))
- assertTrue(servers.head.logManager.getLog(tp).isEmpty)
- }
-
- TestUtils.createTopic(zkUtils, topic, partitionNum, 1, servers)
- (0 until partitionNum).foreach { partition =>
- assertEquals(logDir, servers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent)
- }
- }
-
- @Test
- def testAlterReplicaDirRequestErrorCode(): Unit = {
- val validDir = new File(servers.head.config.logDirs.head).getAbsolutePath
- val offlineDir = new File(servers.head.config.logDirs.tail.head).getAbsolutePath
- servers.head.logDirFailureChannel.maybeAddOfflineLogDir(offlineDir, "", new java.io.IOException())
- TestUtils.createTopic(zkUtils, topic, 3, 1, servers)
-
- val partitionDirs = mutable.Map.empty[TopicPartition, String]
- partitionDirs.put(new TopicPartition(topic, 0), "invalidDir")
- partitionDirs.put(new TopicPartition(topic, 1), validDir)
- partitionDirs.put(new TopicPartition(topic, 2), offlineDir)
-
- val alterReplicaDirResponse = sendAlterReplicaDirRequest(partitionDirs.toMap)
- assertEquals(Errors.LOG_DIR_NOT_FOUND, alterReplicaDirResponse.responses().get(new TopicPartition(topic, 0)))
- assertEquals(Errors.NONE, alterReplicaDirResponse.responses().get(new TopicPartition(topic, 1)))
- assertEquals(Errors.KAFKA_STORAGE_ERROR, alterReplicaDirResponse.responses().get(new TopicPartition(topic, 2)))
- }
-
- private def sendAlterReplicaDirRequest(partitionDirs: Map[TopicPartition, String], socketServer: SocketServer = controllerSocketServer): AlterReplicaDirResponse = {
- val request = new AlterReplicaDirRequest.Builder(partitionDirs.asJava).build()
- val response = connectAndSend(request, ApiKeys.ALTER_REPLICA_DIR, socketServer)
- AlterReplicaDirResponse.parse(response, request.version)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
new file mode 100644
index 0000000..02abad4
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.network.SocketServer
+import kafka.utils._
+import java.io.File
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests._
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
+
+ override def numBrokers: Int = 1
+ override def logDirCount: Int = 5
+
+ val topic = "topic"
+
+ @Test
+ def testAlterReplicaLogDirsRequestBeforeTopicCreation() {
+ val partitionNum = 5
+ val logDir = new File(servers.head.config.logDirs.head).getAbsolutePath
+ val partitionDirs = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir).toMap
+ val alterReplicaDirResponse = sendAlterReplicaLogDirsRequest(partitionDirs)
+
+ // The response should show error REPLICA_NOT_AVAILABLE for all partitions
+ (0 until partitionNum).foreach { partition =>
+ val tp = new TopicPartition(topic, partition)
+ assertEquals(Errors.REPLICA_NOT_AVAILABLE, alterReplicaDirResponse.responses().get(tp))
+ assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+ }
+
+ TestUtils.createTopic(zkUtils, topic, partitionNum, 1, servers)
+ (0 until partitionNum).foreach { partition =>
+ assertEquals(logDir, servers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent)
+ }
+ }
+
+ @Test
+ def testAlterReplicaLogDirsRequestErrorCode(): Unit = {
+ val validDir = new File(servers.head.config.logDirs.head).getAbsolutePath
+ val offlineDir = new File(servers.head.config.logDirs.tail.head).getAbsolutePath
+ servers.head.logDirFailureChannel.maybeAddOfflineLogDir(offlineDir, "", new java.io.IOException())
+ TestUtils.createTopic(zkUtils, topic, 3, 1, servers)
+
+ val partitionDirs = mutable.Map.empty[TopicPartition, String]
+ partitionDirs.put(new TopicPartition(topic, 0), "invalidDir")
+ partitionDirs.put(new TopicPartition(topic, 1), validDir)
+ partitionDirs.put(new TopicPartition(topic, 2), offlineDir)
+
+ val alterReplicaDirResponse = sendAlterReplicaLogDirsRequest(partitionDirs.toMap)
+ assertEquals(Errors.LOG_DIR_NOT_FOUND, alterReplicaDirResponse.responses().get(new TopicPartition(topic, 0)))
+ assertEquals(Errors.NONE, alterReplicaDirResponse.responses().get(new TopicPartition(topic, 1)))
+ assertEquals(Errors.KAFKA_STORAGE_ERROR, alterReplicaDirResponse.responses().get(new TopicPartition(topic, 2)))
+ }
+
+ private def sendAlterReplicaLogDirsRequest(partitionDirs: Map[TopicPartition, String], socketServer: SocketServer = controllerSocketServer): AlterReplicaLogDirsResponse = {
+ val request = new AlterReplicaLogDirsRequest.Builder(partitionDirs.asJava).build()
+ val response = connectAndSend(request, ApiKeys.ALTER_REPLICA_LOG_DIRS, socketServer)
+ AlterReplicaLogDirsResponse.parse(response, request.version)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index e15ea4b..d6e0ec0 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -294,8 +294,8 @@ class RequestQuotaTest extends BaseRequestTest {
new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
))), true)
- case ApiKeys.ALTER_REPLICA_DIR =>
- new AlterReplicaDirRequest.Builder(Collections.singletonMap(tp, logDir))
+ case ApiKeys.ALTER_REPLICA_LOG_DIRS =>
+ new AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir))
case ApiKeys.DESCRIBE_LOG_DIRS =>
new DescribeLogDirsRequest.Builder(Collections.singleton(tp))
@@ -396,7 +396,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.DELETE_ACLS => new DeleteAclsResponse(response).throttleTimeMs
case ApiKeys.DESCRIBE_CONFIGS => new DescribeConfigsResponse(response).throttleTimeMs
case ApiKeys.ALTER_CONFIGS => new AlterConfigsResponse(response).throttleTimeMs
- case ApiKeys.ALTER_REPLICA_DIR => new AlterReplicaDirResponse(response).throttleTimeMs
+ case ApiKeys.ALTER_REPLICA_LOG_DIRS => new AlterReplicaLogDirsResponse(response).throttleTimeMs
case ApiKeys.DESCRIBE_LOG_DIRS => new DescribeLogDirsResponse(response).throttleTimeMs
case ApiKeys.CREATE_PARTITIONS => new CreatePartitionsResponse(response).throttleTimeMs
case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
[2/2] kafka git commit: KAFKA-5995;
Rename AlterReplicaDir to AlterReplicaDirs
Posted by ju...@apache.org.
KAFKA-5995; Rename AlterReplicaDir to AlterReplicaDirs
Author: Dong Lin <li...@gmail.com>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #3993 from lindong28/KAFKA-5995
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a1a5e93b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a1a5e93b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a1a5e93b
Branch: refs/heads/trunk
Commit: a1a5e93beb16e38cf997554a7819a0b92c6661e5
Parents: e110e1c
Author: Dong Lin <li...@gmail.com>
Authored: Mon Oct 2 22:24:09 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Oct 2 22:24:09 2017 -0700
----------------------------------------------------------------------
.../apache/kafka/clients/admin/AdminClient.java | 24 +--
.../clients/admin/AlterReplicaDirOptions.java | 29 ----
.../clients/admin/AlterReplicaDirResult.java | 57 -------
.../admin/AlterReplicaLogDirsOptions.java | 29 ++++
.../admin/AlterReplicaLogDirsResult.java | 57 +++++++
.../admin/DescribeReplicaLogDirOptions.java | 31 ----
.../admin/DescribeReplicaLogDirResult.java | 132 ---------------
.../admin/DescribeReplicaLogDirsOptions.java | 31 ++++
.../admin/DescribeReplicaLogDirsResult.java | 132 +++++++++++++++
.../kafka/clients/admin/KafkaAdminClient.java | 26 +--
.../apache/kafka/common/protocol/ApiKeys.java | 8 +-
.../kafka/common/requests/AbstractRequest.java | 4 +-
.../kafka/common/requests/AbstractResponse.java | 4 +-
.../common/requests/AlterReplicaDirRequest.java | 165 -------------------
.../requests/AlterReplicaDirResponse.java | 135 ---------------
.../requests/AlterReplicaLogDirsRequest.java | 165 +++++++++++++++++++
.../requests/AlterReplicaLogDirsResponse.java | 135 +++++++++++++++
.../requests/DescribeLogDirsResponse.java | 2 +-
.../kafka/admin/ReassignPartitionsCommand.scala | 14 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 12 +-
.../scala/kafka/server/ReplicaManager.scala | 4 +-
.../kafka/api/AdminClientIntegrationTest.scala | 8 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 10 +-
.../admin/ReassignPartitionsClusterTest.scala | 6 +-
.../server/AlterReplicaDirRequestTest.scala | 83 ----------
.../server/AlterReplicaLogDirsRequestTest.scala | 83 ++++++++++
.../unit/kafka/server/RequestQuotaTest.scala | 6 +-
27 files changed, 696 insertions(+), 696 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 61d6db0..636317c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -373,16 +373,16 @@ public abstract class AdminClient implements AutoCloseable {
* before the replica has been created on the broker. It will support moving replicas that have already been created after
* KIP-113 is fully implemented.
*
- * This is a convenience method for #{@link AdminClient#alterReplicaDir(Map, AlterReplicaDirOptions)} with default options.
+ * This is a convenience method for #{@link AdminClient#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)} with default options.
* See the overload for more details.
*
* This operation is supported by brokers with version 1.0.0 or higher.
*
* @param replicaAssignment The replicas with their log directory absolute path
- * @return The AlterReplicaDirResult
+ * @return The AlterReplicaLogDirsResult
*/
- public AlterReplicaDirResult alterReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment) {
- return alterReplicaDir(replicaAssignment, new AlterReplicaDirOptions());
+ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
+ return alterReplicaLogDirs(replicaAssignment, new AlterReplicaLogDirsOptions());
}
/**
@@ -396,9 +396,9 @@ public abstract class AdminClient implements AutoCloseable {
*
* @param replicaAssignment The replicas with their log directory absolute path
* @param options The options to use when changing replica dir
- * @return The AlterReplicaDirResult
+ * @return The AlterReplicaLogDirsResult
*/
- public abstract AlterReplicaDirResult alterReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaDirOptions options);
+ public abstract AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaLogDirsOptions options);
/**
* Query the information of all log directories on the given set of brokers
@@ -429,16 +429,16 @@ public abstract class AdminClient implements AutoCloseable {
/**
* Query the replica log directory information for the specified replicas.
*
- * This is a convenience method for #{@link AdminClient#describeReplicaLogDir(Collection, DescribeReplicaLogDirOptions)}
+ * This is a convenience method for #{@link AdminClient#describeReplicaLogDirs(Collection, DescribeReplicaLogDirsOptions)}
* with default options. See the overload for more details.
*
* This operation is supported by brokers with version 1.0.0 or higher.
*
* @param replicas The replicas to query
- * @return The DescribeReplicaLogDirResult
+ * @return The DescribeReplicaLogDirsResult
*/
- public DescribeReplicaLogDirResult describeReplicaLogDir(Collection<TopicPartitionReplica> replicas) {
- return describeReplicaLogDir(replicas, new DescribeReplicaLogDirOptions());
+ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas) {
+ return describeReplicaLogDirs(replicas, new DescribeReplicaLogDirsOptions());
}
/**
@@ -448,9 +448,9 @@ public abstract class AdminClient implements AutoCloseable {
*
* @param replicas The replicas to query
* @param options The options to use when querying replica log dir info
- * @return The DescribeReplicaLogDirResult
+ * @return The DescribeReplicaLogDirsResult
*/
- public abstract DescribeReplicaLogDirResult describeReplicaLogDir(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirOptions options);
+ public abstract DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options);
/**
* Increase the number of partitions of the topics given as the keys of {@code newPartitions}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirOptions.java
deleted file mode 100644
index 68d2ab6..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirOptions.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import java.util.Map;
-
-/**
- * Options for {@link AdminClient#alterReplicaDir(Map, AlterReplicaDirOptions)}.
- */
-@InterfaceStability.Evolving
-public class AlterReplicaDirOptions extends AbstractOptions<AlterReplicaDirOptions> {
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirResult.java
deleted file mode 100644
index 55bf85b..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaDirResult.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.TopicPartitionReplica;
-import org.apache.kafka.common.annotation.InterfaceStability;
-import java.util.Map;
-
-
-/**
- * The result of {@link AdminClient#alterReplicaDir(Map, AlterReplicaDirOptions)}.
- */
-@InterfaceStability.Evolving
-public class AlterReplicaDirResult {
- private final Map<TopicPartitionReplica, KafkaFuture<Void>> futures;
-
- AlterReplicaDirResult(Map<TopicPartitionReplica, KafkaFuture<Void>> futures) {
- this.futures = futures;
- }
-
- /**
- *
- * Return a map from replica to future which can be used to check the status of individual replica movement.
- *
- * Possible error code:
- *
- * LOG_DIR_NOT_FOUND (57)
- * KAFKA_STORAGE_ERROR (56)
- * REPLICA_NOT_AVAILABLE (9)
- * UNKNOWN (-1)
- */
- public Map<TopicPartitionReplica, KafkaFuture<Void>> values() {
- return futures;
- }
-
- /**
- * Return a future which succeeds if all the replica movement have succeeded
- */
- public KafkaFuture<Void> all() {
- return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsOptions.java
new file mode 100644
index 0000000..d6892ef
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Map;
+
+/**
+ * Options for {@link AdminClient#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)}.
+ */
+@InterfaceStability.Evolving
+public class AlterReplicaLogDirsOptions extends AbstractOptions<AlterReplicaLogDirsOptions> {
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsResult.java
new file mode 100644
index 0000000..a3da216
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterReplicaLogDirsResult.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Map;
+
+
+/**
+ * The result of {@link AdminClient#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)}.
+ */
+@InterfaceStability.Evolving
+public class AlterReplicaLogDirsResult {
+ private final Map<TopicPartitionReplica, KafkaFuture<Void>> futures;
+
+ AlterReplicaLogDirsResult(Map<TopicPartitionReplica, KafkaFuture<Void>> futures) {
+ this.futures = futures;
+ }
+
+ /**
+ *
+ * Return a map from replica to future which can be used to check the status of individual replica movement.
+ *
+ * Possible error code:
+ *
+ * LOG_DIR_NOT_FOUND (57)
+ * KAFKA_STORAGE_ERROR (56)
+ * REPLICA_NOT_AVAILABLE (9)
+ * UNKNOWN (-1)
+ */
+ public Map<TopicPartitionReplica, KafkaFuture<Void>> values() {
+ return futures;
+ }
+
+ /**
+ * Return a future which succeeds if all the replica movement have succeeded
+ */
+ public KafkaFuture<Void> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirOptions.java
deleted file mode 100644
index 72d9643..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirOptions.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import java.util.Collection;
-
-/**
- * Options for {@link AdminClient#describeReplicaLogDir(Collection<org.apache.kafka.common.TopicPartitionReplica>)}.
- *
- * The API of this class is evolving, see {@link AdminClient} for details.
- */
-@InterfaceStability.Evolving
-public class DescribeReplicaLogDirOptions extends AbstractOptions<DescribeReplicaLogDirOptions> {
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirResult.java
deleted file mode 100644
index 6139cc7..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirResult.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.TopicPartitionReplica;
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.requests.DescribeLogDirsResponse;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Collection;
-import java.util.concurrent.ExecutionException;
-
-
-/**
- * The result of {@link AdminClient#describeReplicaLogDir(Collection)}.
- *
- * The API of this class is evolving, see {@link AdminClient} for details.
- */
-@InterfaceStability.Evolving
-public class DescribeReplicaLogDirResult {
- private final Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> futures;
-
- DescribeReplicaLogDirResult(Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> futures) {
- this.futures = futures;
- }
-
- /**
- * Return a map from replica to future which can be used to check the log directory information of individual replicas
- */
- public Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> values() {
- return futures;
- }
-
- /**
- * Return a future which succeeds if log directory information of all replicas are available
- */
- public KafkaFuture<Map<TopicPartitionReplica, ReplicaLogDirInfo>> all() {
- return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
- thenApply(new KafkaFuture.Function<Void, Map<TopicPartitionReplica, ReplicaLogDirInfo>>() {
- @Override
- public Map<TopicPartitionReplica, ReplicaLogDirInfo> apply(Void v) {
- Map<TopicPartitionReplica, ReplicaLogDirInfo> replicaLogDirInfos = new HashMap<>();
- for (Map.Entry<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> entry : futures.entrySet()) {
- try {
- replicaLogDirInfos.put(entry.getKey(), entry.getValue().get());
- } catch (InterruptedException | ExecutionException e) {
- // This should be unreachable, because allOf ensured that all the futures completed successfully.
- throw new RuntimeException(e);
- }
- }
- return replicaLogDirInfos;
- }
- });
- }
-
- static public class ReplicaLogDirInfo {
- // The current log directory of the replica of this partition on the given broker.
- // Null if no replica is not found for this partition on the given broker.
- private final String currentReplicaLogDir;
- // Defined as max(HW of partition - LEO of the replica, 0).
- private final long currentReplicaOffsetLag;
- // The future log directory of the replica of this partition on the given broker.
- // Null if the replica of this partition is not being moved to another log directory on the given broker.
- private final String futureReplicaLogDir;
- // The LEO of the replica - LEO of the future log of this replica in the destination log directory.
- // -1 if either there is not replica for this partition or the replica of this partition is not being moved to another log directory on the given broker.
- private final long futureReplicaOffsetLag;
-
- ReplicaLogDirInfo() {
- this(null, DescribeLogDirsResponse.INVALID_OFFSET_LAG, null, DescribeLogDirsResponse.INVALID_OFFSET_LAG);
- }
-
- ReplicaLogDirInfo(String currentReplicaLogDir,
- long currentReplicaOffsetLag,
- String futureReplicaLogDir,
- long futureReplicaOffsetLag) {
- this.currentReplicaLogDir = currentReplicaLogDir;
- this.currentReplicaOffsetLag = currentReplicaOffsetLag;
- this.futureReplicaLogDir = futureReplicaLogDir;
- this.futureReplicaOffsetLag = futureReplicaOffsetLag;
- }
-
- public String getCurrentReplicaLogDir() {
- return currentReplicaLogDir;
- }
-
- public long getCurrentReplicaOffsetLag() {
- return currentReplicaOffsetLag;
- }
-
- public String getFutureReplicaLogDir() {
- return futureReplicaLogDir;
- }
-
- public long getFutureReplicaOffsetLag() {
- return futureReplicaOffsetLag;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- if (futureReplicaLogDir != null) {
- builder.append("(currentReplicaLogDir=")
- .append(currentReplicaLogDir)
- .append(", futureReplicaLogDir=")
- .append(futureReplicaLogDir)
- .append(", futureReplicaOffsetLag=")
- .append(futureReplicaOffsetLag)
- .append(")");
- } else {
- builder.append("ReplicaLogDirInfo(currentReplicaLogDir=").append(currentReplicaLogDir).append(")");
- }
- return builder.toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsOptions.java
new file mode 100644
index 0000000..943795c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Collection;
+
+/**
+ * Options for {@link AdminClient#describeReplicaLogDirs(Collection<org.apache.kafka.common.TopicPartitionReplica>)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeReplicaLogDirsOptions extends AbstractOptions<DescribeReplicaLogDirsOptions> {
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
new file mode 100644
index 0000000..401b4aa
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+
+
+/**
+ * The result of {@link AdminClient#describeReplicaLogDirs(Collection)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeReplicaLogDirsResult {
+ private final Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> futures;
+
+ DescribeReplicaLogDirsResult(Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> futures) {
+ this.futures = futures;
+ }
+
+ /**
+ * Return a map from replica to future which can be used to check the log directory information of individual replicas
+ */
+ public Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> values() {
+ return futures;
+ }
+
+ /**
+ * Return a future which succeeds if log directory information of all replicas are available
+ */
+ public KafkaFuture<Map<TopicPartitionReplica, ReplicaLogDirInfo>> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+ thenApply(new KafkaFuture.Function<Void, Map<TopicPartitionReplica, ReplicaLogDirInfo>>() {
+ @Override
+ public Map<TopicPartitionReplica, ReplicaLogDirInfo> apply(Void v) {
+ Map<TopicPartitionReplica, ReplicaLogDirInfo> replicaLogDirInfos = new HashMap<>();
+ for (Map.Entry<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> entry : futures.entrySet()) {
+ try {
+ replicaLogDirInfos.put(entry.getKey(), entry.getValue().get());
+ } catch (InterruptedException | ExecutionException e) {
+ // This should be unreachable, because allOf ensured that all the futures completed successfully.
+ throw new RuntimeException(e);
+ }
+ }
+ return replicaLogDirInfos;
+ }
+ });
+ }
+
+ static public class ReplicaLogDirInfo {
+ // The current log directory of the replica of this partition on the given broker.
+ // Null if no replica is not found for this partition on the given broker.
+ private final String currentReplicaLogDir;
+ // Defined as max(HW of partition - LEO of the replica, 0).
+ private final long currentReplicaOffsetLag;
+ // The future log directory of the replica of this partition on the given broker.
+ // Null if the replica of this partition is not being moved to another log directory on the given broker.
+ private final String futureReplicaLogDir;
+ // The LEO of the replica - LEO of the future log of this replica in the destination log directory.
+ // -1 if either there is not replica for this partition or the replica of this partition is not being moved to another log directory on the given broker.
+ private final long futureReplicaOffsetLag;
+
+ ReplicaLogDirInfo() {
+ this(null, DescribeLogDirsResponse.INVALID_OFFSET_LAG, null, DescribeLogDirsResponse.INVALID_OFFSET_LAG);
+ }
+
+ ReplicaLogDirInfo(String currentReplicaLogDir,
+ long currentReplicaOffsetLag,
+ String futureReplicaLogDir,
+ long futureReplicaOffsetLag) {
+ this.currentReplicaLogDir = currentReplicaLogDir;
+ this.currentReplicaOffsetLag = currentReplicaOffsetLag;
+ this.futureReplicaLogDir = futureReplicaLogDir;
+ this.futureReplicaOffsetLag = futureReplicaOffsetLag;
+ }
+
+ public String getCurrentReplicaLogDir() {
+ return currentReplicaLogDir;
+ }
+
+ public long getCurrentReplicaOffsetLag() {
+ return currentReplicaOffsetLag;
+ }
+
+ public String getFutureReplicaLogDir() {
+ return futureReplicaLogDir;
+ }
+
+ public long getFutureReplicaOffsetLag() {
+ return futureReplicaOffsetLag;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ if (futureReplicaLogDir != null) {
+ builder.append("(currentReplicaLogDir=")
+ .append(currentReplicaLogDir)
+ .append(", futureReplicaLogDir=")
+ .append(futureReplicaLogDir)
+ .append(", futureReplicaOffsetLag=")
+ .append(futureReplicaOffsetLag)
+ .append(")");
+ } else {
+ builder.append("ReplicaLogDirInfo(currentReplicaLogDir=").append(currentReplicaLogDir).append(")");
+ }
+ return builder.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 5f37b8e..1a66371 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -27,7 +27,7 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
-import org.apache.kafka.clients.admin.DescribeReplicaLogDirResult.ReplicaLogDirInfo;
+import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
@@ -64,8 +64,8 @@ import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AlterConfigsRequest;
import org.apache.kafka.common.requests.AlterConfigsResponse;
-import org.apache.kafka.common.requests.AlterReplicaDirRequest;
-import org.apache.kafka.common.requests.AlterReplicaDirResponse;
+import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
+import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.ApiError;
@@ -1653,7 +1653,7 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
- public AlterReplicaDirResult alterReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaDirOptions options) {
+ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options) {
final Map<TopicPartitionReplica, KafkaFutureImpl<Void>> futures = new HashMap<>(replicaAssignment.size());
for (TopicPartitionReplica replica : replicaAssignment.keySet()) {
@@ -1677,17 +1677,17 @@ public class KafkaAdminClient extends AdminClient {
final int brokerId = entry.getKey();
final Map<TopicPartition, String> assignment = entry.getValue();
- runnable.call(new Call("alterReplicaDir", calcDeadlineMs(now, options.timeoutMs()),
+ runnable.call(new Call("alterReplicaLogDirs", calcDeadlineMs(now, options.timeoutMs()),
new ConstantNodeIdProvider(brokerId)) {
@Override
public AbstractRequest.Builder createRequest(int timeoutMs) {
- return new AlterReplicaDirRequest.Builder(assignment);
+ return new AlterReplicaLogDirsRequest.Builder(assignment);
}
@Override
public void handleResponse(AbstractResponse abstractResponse) {
- AlterReplicaDirResponse response = (AlterReplicaDirResponse) abstractResponse;
+ AlterReplicaLogDirsResponse response = (AlterReplicaLogDirsResponse) abstractResponse;
for (Map.Entry<TopicPartition, Errors> responseEntry: response.responses().entrySet()) {
TopicPartition tp = responseEntry.getKey();
Errors error = responseEntry.getValue();
@@ -1710,7 +1710,7 @@ public class KafkaAdminClient extends AdminClient {
}, now);
}
- return new AlterReplicaDirResult(new HashMap<TopicPartitionReplica, KafkaFuture<Void>>(futures));
+ return new AlterReplicaLogDirsResult(new HashMap<TopicPartitionReplica, KafkaFuture<Void>>(futures));
}
@Override
@@ -1754,11 +1754,11 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
- public DescribeReplicaLogDirResult describeReplicaLogDir(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirOptions options) {
- final Map<TopicPartitionReplica, KafkaFutureImpl<DescribeReplicaLogDirResult.ReplicaLogDirInfo>> futures = new HashMap<>(replicas.size());
+ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options) {
+ final Map<TopicPartitionReplica, KafkaFutureImpl<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> futures = new HashMap<>(replicas.size());
for (TopicPartitionReplica replica : replicas) {
- futures.put(replica, new KafkaFutureImpl<DescribeReplicaLogDirResult.ReplicaLogDirInfo>());
+ futures.put(replica, new KafkaFutureImpl<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>());
}
Map<Integer, Set<TopicPartition>> partitionsByBroker = new HashMap<>();
@@ -1777,7 +1777,7 @@ public class KafkaAdminClient extends AdminClient {
for (TopicPartition topicPartition: topicPartitions)
replicaDirInfoByPartition.put(topicPartition, new ReplicaLogDirInfo());
- runnable.call(new Call("describeReplicaLogDir", calcDeadlineMs(now, options.timeoutMs()),
+ runnable.call(new Call("describeReplicaLogDirs", calcDeadlineMs(now, options.timeoutMs()),
new ConstantNodeIdProvider(brokerId)) {
@Override
@@ -1834,7 +1834,7 @@ public class KafkaAdminClient extends AdminClient {
}, now);
}
- return new DescribeReplicaLogDirResult(new HashMap<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>>(futures));
+ return new DescribeReplicaLogDirsResult(new HashMap<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>>(futures));
}
public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions,
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index d094134..cf1bff5 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -27,8 +27,8 @@ import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.AlterConfigsRequest;
import org.apache.kafka.common.requests.AlterConfigsResponse;
-import org.apache.kafka.common.requests.AlterReplicaDirRequest;
-import org.apache.kafka.common.requests.AlterReplicaDirResponse;
+import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
+import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.ControlledShutdownRequest;
@@ -164,8 +164,8 @@ public enum ApiKeys {
DescribeConfigsResponse.schemaVersions()),
ALTER_CONFIGS(33, "AlterConfigs", AlterConfigsRequest.schemaVersions(),
AlterConfigsResponse.schemaVersions()),
- ALTER_REPLICA_DIR(34, "AlterReplicaDir", AlterReplicaDirRequest.schemaVersions(),
- AlterReplicaDirResponse.schemaVersions()),
+ ALTER_REPLICA_LOG_DIRS(34, "AlterReplicaLogDirs", AlterReplicaLogDirsRequest.schemaVersions(),
+ AlterReplicaLogDirsResponse.schemaVersions()),
DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequest.schemaVersions(),
DescribeLogDirsResponse.schemaVersions()),
SASL_AUTHENTICATE(36, "SaslAuthenticate", SaslAuthenticateRequest.schemaVersions(),
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index e6dd6da..5a1c4f4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -206,8 +206,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
return new DescribeConfigsRequest(struct, apiVersion);
case ALTER_CONFIGS:
return new AlterConfigsRequest(struct, apiVersion);
- case ALTER_REPLICA_DIR:
- return new AlterReplicaDirRequest(struct, apiVersion);
+ case ALTER_REPLICA_LOG_DIRS:
+ return new AlterReplicaLogDirsRequest(struct, apiVersion);
case DESCRIBE_LOG_DIRS:
return new DescribeLogDirsRequest(struct, apiVersion);
case SASL_AUTHENTICATE:
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 12fe3c8..6294af4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -138,8 +138,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
return new DescribeConfigsResponse(struct);
case ALTER_CONFIGS:
return new AlterConfigsResponse(struct);
- case ALTER_REPLICA_DIR:
- return new AlterReplicaDirResponse(struct);
+ case ALTER_REPLICA_LOG_DIRS:
+ return new AlterReplicaLogDirsResponse(struct);
case DESCRIBE_LOG_DIRS:
return new DescribeLogDirsResponse(struct);
case SASL_AUTHENTICATE:
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
deleted file mode 100644
index 7e58fd6..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirRequest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
-
-public class AlterReplicaDirRequest extends AbstractRequest {
-
- // request level key names
- private static final String LOG_DIRS_KEY_NAME = "log_dirs";
-
- // log dir level key names
- private static final String LOG_DIR_KEY_NAME = "log_dir";
- private static final String TOPICS_KEY_NAME = "topics";
-
- // topic level key names
- private static final String PARTITIONS_KEY_NAME = "partitions";
-
- private static final Schema ALTER_REPLICA_DIR_REQUEST_V0 = new Schema(
- new Field("log_dirs", new ArrayOf(new Schema(
- new Field("log_dir", STRING, "The absolute log directory path."),
- new Field("topics", new ArrayOf(new Schema(
- TOPIC_NAME,
- new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic."))))))));
-
- public static Schema[] schemaVersions() {
- return new Schema[]{ALTER_REPLICA_DIR_REQUEST_V0};
- }
-
- private final Map<TopicPartition, String> partitionDirs;
-
- public static class Builder extends AbstractRequest.Builder<AlterReplicaDirRequest> {
- private final Map<TopicPartition, String> partitionDirs;
-
- public Builder(Map<TopicPartition, String> partitionDirs) {
- super(ApiKeys.ALTER_REPLICA_DIR);
- this.partitionDirs = partitionDirs;
- }
-
- @Override
- public AlterReplicaDirRequest build(short version) {
- return new AlterReplicaDirRequest(partitionDirs, version);
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("(type=AlterReplicaDirRequest")
- .append(", partitionDirs=")
- .append(partitionDirs)
- .append(")");
- return builder.toString();
- }
- }
-
- public AlterReplicaDirRequest(Struct struct, short version) {
- super(version);
- partitionDirs = new HashMap<>();
- for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) {
- Struct logDirStruct = (Struct) logDirStructObj;
- String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME);
- for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) {
- Struct topicStruct = (Struct) topicStructObj;
- String topic = topicStruct.get(TOPIC_NAME);
- for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
- int partition = (Integer) partitionObj;
- partitionDirs.put(new TopicPartition(topic, partition), logDir);
- }
- }
- }
- }
-
- public AlterReplicaDirRequest(Map<TopicPartition, String> partitionDirs, short version) {
- super(version);
- this.partitionDirs = partitionDirs;
- }
-
- @Override
- protected Struct toStruct() {
- Map<String, List<TopicPartition>> dirPartitions = new HashMap<>();
- for (Map.Entry<TopicPartition, String> entry: partitionDirs.entrySet()) {
- if (!dirPartitions.containsKey(entry.getValue()))
- dirPartitions.put(entry.getValue(), new ArrayList<TopicPartition>());
- dirPartitions.get(entry.getValue()).add(entry.getKey());
- }
-
- Struct struct = new Struct(ApiKeys.ALTER_REPLICA_DIR.requestSchema(version()));
- List<Struct> logDirStructArray = new ArrayList<>();
- for (Map.Entry<String, List<TopicPartition>> logDirEntry: dirPartitions.entrySet()) {
- Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME);
- logDirStruct.set(LOG_DIR_KEY_NAME, logDirEntry.getKey());
-
- List<Struct> topicStructArray = new ArrayList<>();
- for (Map.Entry<String, List<Integer>> topicEntry: CollectionUtils.groupDataByTopic(logDirEntry.getValue()).entrySet()) {
- Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME);
- topicStruct.set(TOPIC_NAME, topicEntry.getKey());
- topicStruct.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
- topicStructArray.add(topicStruct);
- }
- logDirStruct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
- logDirStructArray.add(logDirStruct);
- }
- struct.set(LOG_DIRS_KEY_NAME, logDirStructArray.toArray());
- return struct;
- }
-
- @Override
- public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
- Map<TopicPartition, Errors> responseMap = new HashMap<>();
-
- for (Map.Entry<TopicPartition, String> entry : partitionDirs.entrySet()) {
- responseMap.put(entry.getKey(), Errors.forException(e));
- }
-
- short versionId = version();
- switch (versionId) {
- case 0:
- return new AlterReplicaDirResponse(throttleTimeMs, responseMap);
- default:
- throw new IllegalArgumentException(
- String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId,
- this.getClass().getSimpleName(), ApiKeys.ALTER_REPLICA_DIR.latestVersion()));
- }
- }
-
- public Map<TopicPartition, String> partitionDirs() {
- return partitionDirs;
- }
-
- public static AlterReplicaDirRequest parse(ByteBuffer buffer, short version) {
- return new AlterReplicaDirRequest(ApiKeys.ALTER_REPLICA_DIR.parseRequest(version, buffer), version);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
deleted file mode 100644
index b875104..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-
-
-public class AlterReplicaDirResponse extends AbstractResponse {
-
- // request level key names
- private static final String TOPICS_KEY_NAME = "topics";
-
- // topic level key names
- private static final String PARTITIONS_KEY_NAME = "partitions";
-
- private static final Schema ALTER_REPLICA_DIR_RESPONSE_V0 = new Schema(
- THROTTLE_TIME_MS,
- new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
- TOPIC_NAME,
- new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema(
- PARTITION_ID,
- ERROR_CODE)))))));
-
- public static Schema[] schemaVersions() {
- return new Schema[]{ALTER_REPLICA_DIR_RESPONSE_V0};
- }
-
- /**
- * Possible error code:
- *
- * LOG_DIR_NOT_FOUND (57)
- * KAFKA_STORAGE_ERROR (56)
- * REPLICA_NOT_AVAILABLE (9)
- * UNKNOWN (-1)
- */
- private final Map<TopicPartition, Errors> responses;
- private final int throttleTimeMs;
-
- public AlterReplicaDirResponse(Struct struct) {
- throttleTimeMs = struct.get(THROTTLE_TIME_MS);
- responses = new HashMap<>();
- for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
- Struct topicStruct = (Struct) topicStructObj;
- String topic = topicStruct.get(TOPIC_NAME);
- for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
- Struct partitionStruct = (Struct) partitionStructObj;
- int partition = partitionStruct.get(PARTITION_ID);
- Errors error = Errors.forCode(partitionStruct.get(ERROR_CODE));
- responses.put(new TopicPartition(topic, partition), error);
- }
- }
- }
-
- /**
- * Constructor for version 0.
- */
- public AlterReplicaDirResponse(int throttleTimeMs, Map<TopicPartition, Errors> responses) {
- this.throttleTimeMs = throttleTimeMs;
- this.responses = responses;
- }
-
- @Override
- protected Struct toStruct(short version) {
- Struct struct = new Struct(ApiKeys.ALTER_REPLICA_DIR.responseSchema(version));
- struct.set(THROTTLE_TIME_MS, throttleTimeMs);
- Map<String, Map<Integer, Errors>> responsesByTopic = CollectionUtils.groupDataByTopic(responses);
- List<Struct> topicStructArray = new ArrayList<>();
- for (Map.Entry<String, Map<Integer, Errors>> responsesByTopicEntry : responsesByTopic.entrySet()) {
- Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
- topicStruct.set(TOPIC_NAME, responsesByTopicEntry.getKey());
- List<Struct> partitionStructArray = new ArrayList<>();
- for (Map.Entry<Integer, Errors> responsesByPartitionEntry : responsesByTopicEntry.getValue().entrySet()) {
- Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
- Errors response = responsesByPartitionEntry.getValue();
- partitionStruct.set(PARTITION_ID, responsesByPartitionEntry.getKey());
- partitionStruct.set(ERROR_CODE, response.code());
- partitionStructArray.add(partitionStruct);
- }
- topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray());
- topicStructArray.add(topicStruct);
- }
- struct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
- return struct;
- }
-
- public int throttleTimeMs() {
- return throttleTimeMs;
- }
-
- public Map<TopicPartition, Errors> responses() {
- return this.responses;
- }
-
- @Override
- public Map<Errors, Integer> errorCounts() {
- return errorCounts(responses);
- }
-
- public static AlterReplicaDirResponse parse(ByteBuffer buffer, short version) {
- return new AlterReplicaDirResponse(ApiKeys.ALTER_REPLICA_DIR.responseSchema(version).read(buffer));
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
new file mode 100644
index 0000000..ba21759
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
+public class AlterReplicaLogDirsRequest extends AbstractRequest {
+
+ // request level key names
+ private static final String LOG_DIRS_KEY_NAME = "log_dirs";
+
+ // log dir level key names
+ private static final String LOG_DIR_KEY_NAME = "log_dir";
+ private static final String TOPICS_KEY_NAME = "topics";
+
+ // topic level key names
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+
+ private static final Schema ALTER_REPLICA_LOG_DIRS_REQUEST_V0 = new Schema(
+ new Field("log_dirs", new ArrayOf(new Schema(
+ new Field("log_dir", STRING, "The absolute log directory path."),
+ new Field("topics", new ArrayOf(new Schema(
+ TOPIC_NAME,
+ new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic."))))))));
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{ALTER_REPLICA_LOG_DIRS_REQUEST_V0};
+ }
+
+ private final Map<TopicPartition, String> partitionDirs;
+
+ public static class Builder extends AbstractRequest.Builder<AlterReplicaLogDirsRequest> {
+ private final Map<TopicPartition, String> partitionDirs;
+
+ public Builder(Map<TopicPartition, String> partitionDirs) {
+ super(ApiKeys.ALTER_REPLICA_LOG_DIRS);
+ this.partitionDirs = partitionDirs;
+ }
+
+ @Override
+ public AlterReplicaLogDirsRequest build(short version) {
+ return new AlterReplicaLogDirsRequest(partitionDirs, version);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("(type=AlterReplicaLogDirsRequest")
+ .append(", partitionDirs=")
+ .append(partitionDirs)
+ .append(")");
+ return builder.toString();
+ }
+ }
+
+ public AlterReplicaLogDirsRequest(Struct struct, short version) {
+ super(version);
+ partitionDirs = new HashMap<>();
+ for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) {
+ Struct logDirStruct = (Struct) logDirStructObj;
+ String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME);
+ for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) {
+ Struct topicStruct = (Struct) topicStructObj;
+ String topic = topicStruct.get(TOPIC_NAME);
+ for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+ int partition = (Integer) partitionObj;
+ partitionDirs.put(new TopicPartition(topic, partition), logDir);
+ }
+ }
+ }
+ }
+
+ public AlterReplicaLogDirsRequest(Map<TopicPartition, String> partitionDirs, short version) {
+ super(version);
+ this.partitionDirs = partitionDirs;
+ }
+
+ @Override
+ protected Struct toStruct() {
+ Map<String, List<TopicPartition>> dirPartitions = new HashMap<>();
+ for (Map.Entry<TopicPartition, String> entry: partitionDirs.entrySet()) {
+ if (!dirPartitions.containsKey(entry.getValue()))
+ dirPartitions.put(entry.getValue(), new ArrayList<TopicPartition>());
+ dirPartitions.get(entry.getValue()).add(entry.getKey());
+ }
+
+ Struct struct = new Struct(ApiKeys.ALTER_REPLICA_LOG_DIRS.requestSchema(version()));
+ List<Struct> logDirStructArray = new ArrayList<>();
+ for (Map.Entry<String, List<TopicPartition>> logDirEntry: dirPartitions.entrySet()) {
+ Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME);
+ logDirStruct.set(LOG_DIR_KEY_NAME, logDirEntry.getKey());
+
+ List<Struct> topicStructArray = new ArrayList<>();
+ for (Map.Entry<String, List<Integer>> topicEntry: CollectionUtils.groupDataByTopic(logDirEntry.getValue()).entrySet()) {
+ Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME);
+ topicStruct.set(TOPIC_NAME, topicEntry.getKey());
+ topicStruct.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
+ topicStructArray.add(topicStruct);
+ }
+ logDirStruct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+ logDirStructArray.add(logDirStruct);
+ }
+ struct.set(LOG_DIRS_KEY_NAME, logDirStructArray.toArray());
+ return struct;
+ }
+
+ @Override
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ Map<TopicPartition, Errors> responseMap = new HashMap<>();
+
+ for (Map.Entry<TopicPartition, String> entry : partitionDirs.entrySet()) {
+ responseMap.put(entry.getKey(), Errors.forException(e));
+ }
+
+ short versionId = version();
+ switch (versionId) {
+ case 0:
+ return new AlterReplicaLogDirsResponse(throttleTimeMs, responseMap);
+ default:
+ throw new IllegalArgumentException(
+ String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId,
+ this.getClass().getSimpleName(), ApiKeys.ALTER_REPLICA_LOG_DIRS.latestVersion()));
+ }
+ }
+
+ public Map<TopicPartition, String> partitionDirs() {
+ return partitionDirs;
+ }
+
+ public static AlterReplicaLogDirsRequest parse(ByteBuffer buffer, short version) {
+ return new AlterReplicaLogDirsRequest(ApiKeys.ALTER_REPLICA_LOG_DIRS.parseRequest(version, buffer), version);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
new file mode 100644
index 0000000..f8d1546
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+
+
+public class AlterReplicaLogDirsResponse extends AbstractResponse {
+
+ // request level key names
+ private static final String TOPICS_KEY_NAME = "topics";
+
+ // topic level key names
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+
+ private static final Schema ALTER_REPLICA_LOG_DIRS_RESPONSE_V0 = new Schema(
+ THROTTLE_TIME_MS,
+ new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
+ TOPIC_NAME,
+ new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema(
+ PARTITION_ID,
+ ERROR_CODE)))))));
+
+ public static Schema[] schemaVersions() {
+ return new Schema[]{ALTER_REPLICA_LOG_DIRS_RESPONSE_V0};
+ }
+
+ /**
+ * Possible error code:
+ *
+ * LOG_DIR_NOT_FOUND (57)
+ * KAFKA_STORAGE_ERROR (56)
+ * REPLICA_NOT_AVAILABLE (9)
+ * UNKNOWN (-1)
+ */
+ private final Map<TopicPartition, Errors> responses;
+ private final int throttleTimeMs;
+
+ public AlterReplicaLogDirsResponse(Struct struct) {
+ throttleTimeMs = struct.get(THROTTLE_TIME_MS);
+ responses = new HashMap<>();
+ for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
+ Struct topicStruct = (Struct) topicStructObj;
+ String topic = topicStruct.get(TOPIC_NAME);
+ for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionStruct = (Struct) partitionStructObj;
+ int partition = partitionStruct.get(PARTITION_ID);
+ Errors error = Errors.forCode(partitionStruct.get(ERROR_CODE));
+ responses.put(new TopicPartition(topic, partition), error);
+ }
+ }
+ }
+
+ /**
+ * Constructor for version 0.
+ */
+ public AlterReplicaLogDirsResponse(int throttleTimeMs, Map<TopicPartition, Errors> responses) {
+ this.throttleTimeMs = throttleTimeMs;
+ this.responses = responses;
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ Struct struct = new Struct(ApiKeys.ALTER_REPLICA_LOG_DIRS.responseSchema(version));
+ struct.set(THROTTLE_TIME_MS, throttleTimeMs);
+ Map<String, Map<Integer, Errors>> responsesByTopic = CollectionUtils.groupDataByTopic(responses);
+ List<Struct> topicStructArray = new ArrayList<>();
+ for (Map.Entry<String, Map<Integer, Errors>> responsesByTopicEntry : responsesByTopic.entrySet()) {
+ Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
+ topicStruct.set(TOPIC_NAME, responsesByTopicEntry.getKey());
+ List<Struct> partitionStructArray = new ArrayList<>();
+ for (Map.Entry<Integer, Errors> responsesByPartitionEntry : responsesByTopicEntry.getValue().entrySet()) {
+ Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
+ Errors response = responsesByPartitionEntry.getValue();
+ partitionStruct.set(PARTITION_ID, responsesByPartitionEntry.getKey());
+ partitionStruct.set(ERROR_CODE, response.code());
+ partitionStructArray.add(partitionStruct);
+ }
+ topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray());
+ topicStructArray.add(topicStruct);
+ }
+ struct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+ return struct;
+ }
+
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
+ public Map<TopicPartition, Errors> responses() {
+ return this.responses;
+ }
+
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(responses);
+ }
+
+ public static AlterReplicaLogDirsResponse parse(ByteBuffer buffer, short version) {
+ return new AlterReplicaLogDirsResponse(ApiKeys.ALTER_REPLICA_LOG_DIRS.responseSchema(version).read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
index 6613dfe..a242240 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
@@ -74,7 +74,7 @@ public class DescribeLogDirsResponse extends AbstractResponse {
"(if it is the current log for the partition) or current replica's LEO " +
"(if it is the future log for the partition)"),
new Field(IS_FUTURE_KEY_NAME, BOOLEAN, "True if this log is created by " +
- "AlterReplicaDirRequest and will replace the current log of the replica " +
+ "AlterReplicaLogDirsRequest and will replace the current log of the replica " +
"in the future.")))))))))));
public static Schema[] schemaVersions() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 5dbcfcf..af81697 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -30,10 +30,10 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.TopicPartitionReplica
import org.apache.kafka.common.errors.{LogDirNotFoundException, ReplicaNotAvailableException}
-import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaDirOptions, AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, DescribeReplicaLogDirsResult, AdminClient => JAdminClient}
import LogConfig._
import joptsimple.OptionParser
-import org.apache.kafka.clients.admin.DescribeReplicaLogDirResult.ReplicaLogDirInfo
+import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo
object ReassignPartitionsCommand extends Logging {
@@ -320,7 +320,7 @@ object ReassignPartitionsCommand extends Logging {
if (replicaAssignment.nonEmpty) {
val adminClient = adminClientOpt.getOrElse(
throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory"))
- adminClient.describeReplicaLogDir(replicaAssignment.keySet.asJava).all().get().asScala
+ adminClient.describeReplicaLogDirs(replicaAssignment.keySet.asJava).all().get().asScala
} else {
Map.empty[TopicPartitionReplica, ReplicaLogDirInfo]
}
@@ -551,14 +551,14 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils,
if (validPartitions.isEmpty) false
else {
if (proposedReplicaAssignment.nonEmpty) {
- // Send AlterReplicaDirRequest to allow broker to create replica in the right log dir later if the replica
+ // Send AlterReplicaLogDirsRequest to allow broker to create replica in the right log dir later if the replica
// has not been created it. This allows us to rebalance load across log directories in the cluster even if
// we can not move replicas between log directories on the same broker. We will be able to move replicas
// between log directories on the same broker after KIP-113 is implemented.
val adminClient = adminClientOpt.getOrElse(
throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory"))
- val alterReplicaDirResult = adminClient.alterReplicaDir(
- proposedReplicaAssignment.asJava, new AlterReplicaDirOptions().timeoutMs(timeoutMs.toInt))
+ val alterReplicaDirResult = adminClient.alterReplicaLogDirs(
+ proposedReplicaAssignment.asJava, new AlterReplicaLogDirsOptions().timeoutMs(timeoutMs.toInt))
alterReplicaDirResult.values().asScala.foreach { case (replica, future) => {
try {
/*
@@ -568,7 +568,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils,
* for this replica.
*
* After KIP-113 is fully implemented, we will not need to verify that the broker returns this ReplicaNotAvailableException
- * in this step. And after the reassignment znode is created, we will need to re-send AlterReplicaDirRequest to broker
+ * in this step. And after the reassignment znode is created, we will need to re-send AlterReplicaLogDirsRequest to broker
* if broker returns ReplicaNotAvailableException for any replica in the request.
*/
future.get()
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6e02164..13959b8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -130,7 +130,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
- case ApiKeys.ALTER_REPLICA_DIR => handleAlterReplicaDirRequest(request)
+ case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
@@ -1966,15 +1966,15 @@ class KafkaApis(val requestChannel: RequestChannel,
new DescribeConfigsResponse(requestThrottleMs, (authorizedConfigs ++ unauthorizedConfigs).asJava))
}
- def handleAlterReplicaDirRequest(request: RequestChannel.Request): Unit = {
- val alterReplicaDirRequest = request.body[AlterReplicaDirRequest]
+ def handleAlterReplicaLogDirsRequest(request: RequestChannel.Request): Unit = {
+ val alterReplicaDirsRequest = request.body[AlterReplicaLogDirsRequest]
val responseMap = {
if (authorize(request.session, Alter, Resource.ClusterResource))
- replicaManager.alterReplicaDir(alterReplicaDirRequest.partitionDirs.asScala)
+ replicaManager.alterReplicaLogDirs(alterReplicaDirsRequest.partitionDirs.asScala)
else
- alterReplicaDirRequest.partitionDirs.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
+ alterReplicaDirsRequest.partitionDirs.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
}
- sendResponseMaybeThrottle(request, requestThrottleMs => new AlterReplicaDirResponse(requestThrottleMs, responseMap.asJava))
+ sendResponseMaybeThrottle(request, requestThrottleMs => new AlterReplicaLogDirsResponse(requestThrottleMs, responseMap.asJava))
}
def handleDescribeLogDirsRequest(request: RequestChannel.Request): Unit = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b5a93b0..a361e16 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -569,14 +569,14 @@ class ReplicaManager(val config: KafkaConfig,
* that are already created to the user-specified log directory after KIP-113 is fully implemented
*
*/
- def alterReplicaDir(partitionDirs: Map[TopicPartition, String]): Map[TopicPartition, Errors] = {
+ def alterReplicaLogDirs(partitionDirs: Map[TopicPartition, String]): Map[TopicPartition, Errors] = {
partitionDirs.map { case (topicPartition, destinationDir) =>
try {
if (!logManager.isLogDirOnline(destinationDir))
throw new KafkaStorageException(s"Log directory $destinationDir is offline")
// If the log for this partition has not been created yet:
- // 1) Respond with ReplicaNotAvailableException for this partition in the AlterReplicaDirResponse
+ // 1) Respond with ReplicaNotAvailableException for this partition in the AlterReplicaLogDirsResponse
// 2) Record the destination log directory in the memory so that the partition will be created in this log directory
// when broker receives LeaderAndIsrRequest for this partition later.
getReplica(topicPartition) match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index da818d6..e916efa 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -245,13 +245,13 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
}
@Test
- def testDescribeReplicaLogDir(): Unit = {
+ def testDescribeReplicaLogDirs(): Unit = {
client = AdminClient.create(createConfig())
val topic = "topic"
val leaderByPartition = TestUtils.createTopic(zkUtils, topic, 10, 1, servers, new Properties())
val replicas = leaderByPartition.map { case (partition, brokerId) => new TopicPartitionReplica(topic, partition, brokerId) }.toSeq
- val replicaDirInfos = client.describeReplicaLogDir(replicas.asJavaCollection).all.get
+ val replicaDirInfos = client.describeReplicaLogDirs(replicas.asJavaCollection).all.get
replicaDirInfos.asScala.foreach { case (topicPartitionReplica, replicaDirInfo) =>
val server = servers.find(_.config.brokerId == topicPartitionReplica.brokerId()).get
val tp = new TopicPartition(topicPartitionReplica.topic(), topicPartitionReplica.partition())
@@ -262,7 +262,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
}
@Test
- def testAlterReplicaLogDirBeforeTopicCreation(): Unit = {
+ def testAlterReplicaLogDirsBeforeTopicCreation(): Unit = {
val adminClient = AdminClient.create(createConfig())
val topic = "topic"
val tp = new TopicPartition(topic, 0)
@@ -272,7 +272,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir
}.toMap
- adminClient.alterReplicaDir(replicaAssignment.asJava, new AlterReplicaDirOptions()).values().asScala.values.foreach { future =>
+ adminClient.alterReplicaLogDirs(replicaAssignment.asJava, new AlterReplicaLogDirsOptions()).values().asScala.values.foreach { future =>
try {
future.get()
fail("Future should fail with ReplicaNotAvailableException")
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 013315a..18a73b9 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -142,7 +142,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.CREATE_ACLS -> classOf[CreateAclsResponse],
ApiKeys.DELETE_ACLS -> classOf[DeleteAclsResponse],
ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse],
- ApiKeys.ALTER_REPLICA_DIR -> classOf[AlterReplicaDirResponse],
+ ApiKeys.ALTER_REPLICA_LOG_DIRS -> classOf[AlterReplicaLogDirsResponse],
ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse]
)
@@ -180,7 +180,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) => resp.aclCreationResponses.asScala.head.error.error),
ApiKeys.DESCRIBE_ACLS -> ((resp: DescribeAclsResponse) => resp.error.error),
ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => resp.responses.asScala.head.error.error),
- ApiKeys.ALTER_REPLICA_DIR -> ((resp: AlterReplicaDirResponse) => resp.responses.get(tp)),
+ ApiKeys.ALTER_REPLICA_LOG_DIRS -> ((resp: AlterReplicaLogDirsResponse) => resp.responses.get(tp)),
ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) =>
if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED)
)
@@ -217,7 +217,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.CREATE_ACLS -> clusterAlterAcl,
ApiKeys.DESCRIBE_ACLS -> clusterDescribeAcl,
ApiKeys.DELETE_ACLS -> clusterAlterAcl,
- ApiKeys.ALTER_REPLICA_DIR -> clusterAlterAcl,
+ ApiKeys.ALTER_REPLICA_LOG_DIRS -> clusterAlterAcl,
ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl
)
@@ -366,7 +366,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
new ResourceFilter(AdminResourceType.TOPIC, null),
new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY)))).build()
- private def alterReplicaDirRequest = new AlterReplicaDirRequest.Builder(Collections.singletonMap(tp, logDir)).build()
+ private def alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)).build()
private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(Collections.singleton(tp)).build()
@@ -399,7 +399,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.CREATE_ACLS -> createAclsRequest,
ApiKeys.DELETE_ACLS -> deleteAclsRequest,
ApiKeys.DESCRIBE_ACLS -> describeAclsRequest,
- ApiKeys.ALTER_REPLICA_DIR -> alterReplicaDirRequest,
+ ApiKeys.ALTER_REPLICA_LOG_DIRS -> alterReplicaLogDirsRequest,
ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest
)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1a5e93b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index ce16971..7002e84 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -97,7 +97,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
assertEquals(Seq(101), zkUtils.getPartitionAssignmentForTopics(Seq(topicName)).get(topicName).get(partition))
// The replica should be in the expected log directory on broker 101
val replica = new TopicPartitionReplica(topicName, 0, 101)
- assertEquals(expectedLogDir, adminClient.describeReplicaLogDir(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir)
+ assertEquals(expectedLogDir, adminClient.describeReplicaLogDirs(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir)
}
@Test
@@ -128,7 +128,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
assertEquals(Seq(100, 101, 102), actual.values.flatten.toSeq.distinct.sorted)
// The replica should be in the expected log directory on broker 102
- assertEquals(expectedLogDir, adminClient.describeReplicaLogDir(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir)
+ assertEquals(expectedLogDir, adminClient.describeReplicaLogDirs(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir)
}
@Test
@@ -199,7 +199,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
assertEquals(Seq(100, 102), actual("topic2")(2))//changed
// The replicas should be in the expected log directories
- val replicaDirs = adminClient.describeReplicaLogDir(List(replica1, replica2).asJavaCollection).all().get()
+ val replicaDirs = adminClient.describeReplicaLogDirs(List(replica1, replica2).asJavaCollection).all().get()
assertEquals(proposedReplicaAssignment(replica1), replicaDirs.get(replica1).getCurrentReplicaLogDir)
assertEquals(proposedReplicaAssignment(replica2), replicaDirs.get(replica2).getCurrentReplicaLogDir)
}