You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2019/06/04 21:15:15 UTC
[kafka] branch trunk updated: KAFKA-4893;
Fix deletion and moving of topics with long names
This is an automated email from the ASF dual-hosted git repository.
gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e6563aa KAFKA-4893; Fix deletion and moving of topics with long names
e6563aa is described below
commit e6563aab722b35c4984b77e9eee42a1904cd1ea6
Author: Colin P. Mccabe <cm...@confluent.io>
AuthorDate: Tue Jun 4 14:14:32 2019 -0700
KAFKA-4893; Fix deletion and moving of topics with long names
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Gwen Shapira, David Arthur, James Cheng, Vahid Hashemian
Closes #6869 from cmccabe/KAFKA-4893
---
core/src/main/scala/kafka/log/Log.scala | 15 ++++++++++-----
core/src/main/scala/kafka/server/ReplicaManager.scala | 8 +++++++-
.../kafka/api/AdminClientIntegrationTest.scala | 18 ++++++++++++++++++
.../test/scala/unit/kafka/log/LogManagerTest.scala | 9 ++++++++-
core/src/test/scala/unit/kafka/log/LogTest.scala | 19 ++++++++++++++++++-
5 files changed, 61 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 56b2969..9ab6fda 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -2180,8 +2180,8 @@ object Log {
/** a directory that is used for future partition */
val FutureDirSuffix = "-future"
- private val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
- private val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")
+ private[log] val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
+ private[log] val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")
val UnknownLogStartOffset = -1L
@@ -2227,11 +2227,16 @@ object Log {
new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix + suffix)
/**
- * Return a directory name to rename the log directory to for async deletion. The name will be in the following
- * format: topic-partition.uniqueId-delete where topic, partition and uniqueId are variables.
+ * Return a directory name to rename the log directory to for async deletion.
+ * The name will be in the following format: "topic-partitionId.uniqueId-delete".
+ * If the topic name is too long, it will be truncated to prevent the total name
+ * from exceeding 255 characters.
*/
def logDeleteDirName(topicPartition: TopicPartition): String = {
- logDirNameWithSuffix(topicPartition, DeleteDirSuffix)
+ val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
+ val suffix = s"-${topicPartition.partition()}.${uniqueId}${DeleteDirSuffix}"
+ val prefixLength = Math.min(topicPartition.topic().size, 255 - suffix.size)
+ s"${topicPartition.topic().substring(0, prefixLength)}${suffix}"
}
/**
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 8cfa247..e9ab738 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -578,6 +578,11 @@ class ReplicaManager(val config: KafkaConfig,
replicaStateChangeLock synchronized {
partitionDirs.map { case (topicPartition, destinationDir) =>
try {
+ /* If the topic name is exceptionally long, we can't support altering the log directory.
+ * See KAFKA-4893 for details.
+ * TODO: fix this by implementing topic IDs. */
+ if (Log.logFutureDirName(topicPartition).size > 255)
+ throw new InvalidTopicException("The topic name is too long.")
if (!logManager.isLogDirOnline(destinationDir))
throw new KafkaStorageException(s"Log directory $destinationDir is offline")
@@ -621,7 +626,8 @@ class ReplicaManager(val config: KafkaConfig,
(topicPartition, Errors.NONE)
} catch {
- case e@(_: LogDirNotFoundException |
+ case e@(_: InvalidTopicException |
+ _: LogDirNotFoundException |
_: ReplicaNotAvailableException |
_: KafkaStorageException) =>
(topicPartition, Errors.forException(e))
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 4145137..88f10ff 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -1793,6 +1793,24 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
Some("Invalid config value for resource"))
}
+
+ @Test
+ def testLongTopicNames(): Unit = {
+ val client = AdminClient.create(createConfig)
+ val longTopicName = String.join("", Collections.nCopies(249, "x"));
+ val invalidTopicName = String.join("", Collections.nCopies(250, "x"));
+ val newTopics2 = Seq(new NewTopic(invalidTopicName, 3, 3),
+ new NewTopic(longTopicName, 3, 3))
+ val results = client.createTopics(newTopics2.asJava).values()
+ assertTrue(results.containsKey(longTopicName))
+ results.get(longTopicName).get()
+ assertTrue(results.containsKey(invalidTopicName))
+ assertFutureExceptionTypeEquals(results.get(invalidTopicName), classOf[InvalidTopicException])
+ assertFutureExceptionTypeEquals(client.alterReplicaLogDirs(
+ Map(new TopicPartitionReplica(longTopicName, 0, 0) -> servers(0).config.logDirs(0)).asJava).all(),
+ classOf[InvalidTopicException])
+ client.close()
+ }
}
object AdminClientIntegrationTest {
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index b3ecd23..3df09e7 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -18,7 +18,7 @@
package kafka.log
import java.io._
-import java.util.Properties
+import java.util.{Collections, Properties}
import kafka.server.FetchDataInfo
import kafka.server.checkpoints.OffsetCheckpointFile
@@ -381,6 +381,13 @@ class LogManagerTest {
}
@Test
+ def testCreateAndDeleteOverlyLongTopic(): Unit = {
+ val invalidTopicName = String.join("", Collections.nCopies(253, "x"));
+ val log = logManager.getOrCreateLog(new TopicPartition(invalidTopicName, 0), logConfig)
+ logManager.asyncDelete(new TopicPartition(invalidTopicName, 0))
+ }
+
+ @Test
def testCheckpointForOnlyAffectedLogs() {
val tps = Seq(
new TopicPartition("test-a", 0),
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 8635969..dc9a423 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -20,7 +20,8 @@ package kafka.log
import java.io._
import java.nio.ByteBuffer
import java.nio.file.{Files, Paths}
-import java.util.{Optional, Properties}
+import java.util.regex.Pattern
+import java.util.{Collections, Optional, Properties}
import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
@@ -75,6 +76,22 @@ class LogTest {
}
@Test
+ def testLogDeleteDirName(): Unit = {
+ val name1 = Log.logDeleteDirName(new TopicPartition("foo", 3))
+ assertTrue(name1.length <= 255)
+ assertTrue(Pattern.compile("foo-3\\.[0-9a-z]{32}-delete").matcher(name1).matches())
+ assertTrue(Log.DeleteDirPattern.matcher(name1).matches())
+ assertFalse(Log.FutureDirPattern.matcher(name1).matches())
+ val name2 = Log.logDeleteDirName(
+ new TopicPartition("n" + String.join("", Collections.nCopies(248, "o")), 5))
+ System.out.println("name2 = " + name2)
+ assertEquals(255, name2.length)
+ assertTrue(Pattern.compile("n[o]{212}-5\\.[0-9a-z]{32}-delete").matcher(name2).matches())
+ assertTrue(Log.DeleteDirPattern.matcher(name2).matches())
+ assertFalse(Log.FutureDirPattern.matcher(name2).matches())
+ }
+
+ @Test
def testOffsetFromFile() {
val offset = 23423423L