You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/06/11 17:57:17 UTC
[kafka] branch 2.1 updated: KAFKA-4893;
Fix deletion and moving of topics with long names
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new e832dd0 KAFKA-4893; Fix deletion and moving of topics with long names
e832dd0 is described below
commit e832dd013e92455365603bbcdeebb17602ce67df
Author: Colin P. Mccabe <cm...@confluent.io>
AuthorDate: Tue Jun 4 14:14:32 2019 -0700
KAFKA-4893; Fix deletion and moving of topics with long names
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Gwen Shapira, David Arthur, James Cheng, Vahid Hashemian
Closes #6869 from cmccabe/KAFKA-4893
(cherry picked from commit e6563aab722b35c4984b77e9eee42a1904cd1ea6)
(cherry picked from commit bda2a02ce7db26a126c99cc477ef64c9e7133b70)
---
core/src/main/scala/kafka/log/Log.scala | 15 ++++++++++-----
core/src/main/scala/kafka/server/ReplicaManager.scala | 8 +++++++-
.../kafka/api/AdminClientIntegrationTest.scala | 17 +++++++++++++++++
.../test/scala/unit/kafka/log/LogManagerTest.scala | 9 ++++++++-
core/src/test/scala/unit/kafka/log/LogTest.scala | 19 ++++++++++++++++++-
5 files changed, 60 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 55ce9b4..33cb48f 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -2160,8 +2160,8 @@ object Log {
/** a directory that is used for future partition */
val FutureDirSuffix = "-future"
- private val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
- private val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")
+ private[log] val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
+ private[log] val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")
val UnknownLogStartOffset = -1L
@@ -2207,11 +2207,16 @@ object Log {
new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix + suffix)
/**
- * Return a directory name to rename the log directory to for async deletion. The name will be in the following
- * format: topic-partition.uniqueId-delete where topic, partition and uniqueId are variables.
+ * Return a directory name to rename the log directory to for async deletion.
+ * The name will be in the following format: "topic-partitionId.uniqueId-delete".
+ * If the topic name is too long, it will be truncated to prevent the total name
+ * from exceeding 255 characters.
*/
def logDeleteDirName(topicPartition: TopicPartition): String = {
- logDirNameWithSuffix(topicPartition, DeleteDirSuffix)
+ val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
+ val suffix = s"-${topicPartition.partition()}.${uniqueId}${DeleteDirSuffix}"
+ val prefixLength = Math.min(topicPartition.topic().size, 255 - suffix.size)
+ s"${topicPartition.topic().substring(0, prefixLength)}${suffix}"
}
/**
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f52bfe6..02366dd 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -563,6 +563,11 @@ class ReplicaManager(val config: KafkaConfig,
replicaStateChangeLock synchronized {
partitionDirs.map { case (topicPartition, destinationDir) =>
try {
+ /* If the topic name is exceptionally long, we can't support altering the log directory.
+ * See KAFKA-4893 for details.
+ * TODO: fix this by implementing topic IDs. */
+ if (Log.logFutureDirName(topicPartition).size > 255)
+ throw new InvalidTopicException("The topic name is too long.")
if (!logManager.isLogDirOnline(destinationDir))
throw new KafkaStorageException(s"Log directory $destinationDir is offline")
@@ -603,7 +608,8 @@ class ReplicaManager(val config: KafkaConfig,
(topicPartition, Errors.NONE)
} catch {
- case e@(_: LogDirNotFoundException |
+ case e@(_: InvalidTopicException |
+ _: LogDirNotFoundException |
_: ReplicaNotAvailableException |
_: KafkaStorageException) =>
(topicPartition, Errors.forException(e))
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 58de351..0716824 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -1211,6 +1211,23 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
}
}
+ @Test
+ def testLongTopicNames(): Unit = {
+ val client = AdminClient.create(createConfig)
+ val longTopicName = String.join("", Collections.nCopies(249, "x"));
+ val invalidTopicName = String.join("", Collections.nCopies(250, "x"));
+ val newTopics2 = Seq(new NewTopic(invalidTopicName, 3, 3),
+ new NewTopic(longTopicName, 3, 3))
+ val results = client.createTopics(newTopics2.asJava).values()
+ assertTrue(results.containsKey(longTopicName))
+ results.get(longTopicName).get()
+ assertTrue(results.containsKey(invalidTopicName))
+ assertFutureExceptionTypeEquals(results.get(invalidTopicName), classOf[InvalidTopicException])
+ assertFutureExceptionTypeEquals(client.alterReplicaLogDirs(
+ Map(new TopicPartitionReplica(longTopicName, 0, 0) -> servers(0).config.logDirs(0)).asJava).all(),
+ classOf[InvalidTopicException])
+ client.close()
+ }
}
object AdminClientIntegrationTest {
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 2fbb875..804d632 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -18,7 +18,7 @@
package kafka.log
import java.io._
-import java.util.Properties
+import java.util.{Collections, Properties}
import kafka.server.FetchDataInfo
import kafka.server.checkpoints.OffsetCheckpointFile
@@ -375,6 +375,13 @@ class LogManagerTest {
assertFalse("Logs not deleted", logManager.hasLogsToBeDeleted)
}
+ @Test
+ def testCreateAndDeleteOverlyLongTopic(): Unit = {
+ val invalidTopicName = String.join("", Collections.nCopies(253, "x"));
+ val log = logManager.getOrCreateLog(new TopicPartition(invalidTopicName, 0), logConfig)
+ logManager.asyncDelete(new TopicPartition(invalidTopicName, 0))
+ }
+
private def readLog(log: Log, offset: Long, maxLength: Int = 1024): FetchDataInfo = {
log.read(offset, maxLength, maxOffset = None, minOneMessage = true, includeAbortedTxns = false)
}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index f88b0be..fc306ee 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -20,7 +20,8 @@ package kafka.log
import java.io._
import java.nio.ByteBuffer
import java.nio.file.{Files, Paths}
-import java.util.Properties
+import java.util.regex.Pattern
+import java.util.{Collections, Properties}
import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
@@ -73,6 +74,22 @@ class LogTest {
}
@Test
+ def testLogDeleteDirName(): Unit = {
+ val name1 = Log.logDeleteDirName(new TopicPartition("foo", 3))
+ assertTrue(name1.length <= 255)
+ assertTrue(Pattern.compile("foo-3\\.[0-9a-z]{32}-delete").matcher(name1).matches())
+ assertTrue(Log.DeleteDirPattern.matcher(name1).matches())
+ assertFalse(Log.FutureDirPattern.matcher(name1).matches())
+ val name2 = Log.logDeleteDirName(
+ new TopicPartition("n" + String.join("", Collections.nCopies(248, "o")), 5))
+ System.out.println("name2 = " + name2)
+ assertEquals(255, name2.length)
+ assertTrue(Pattern.compile("n[o]{212}-5\\.[0-9a-z]{32}-delete").matcher(name2).matches())
+ assertTrue(Log.DeleteDirPattern.matcher(name2).matches())
+ assertFalse(Log.FutureDirPattern.matcher(name2).matches())
+ }
+
+ @Test
def testOffsetFromFile() {
val offset = 23423423L