You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2019/06/04 21:15:26 UTC

[kafka] branch 2.3 updated: KAFKA-4893; Fix deletion and moving of topics with long names

This is an automated email from the ASF dual-hosted git repository.

gwenshap pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new ebc2c0d  KAFKA-4893; Fix deletion and moving of topics with long names
ebc2c0d is described below

commit ebc2c0d0dedfa1fce706a9035b4d066c4e28ac3b
Author: Colin P. Mccabe <cm...@confluent.io>
AuthorDate: Tue Jun 4 14:14:32 2019 -0700

    KAFKA-4893; Fix deletion and moving of topics with long names
    
    Author: Colin P. Mccabe <cm...@confluent.io>
    
    Reviewers: Gwen Shapira, David Arthur, James Cheng, Vahid Hashemian
    
    Closes #6869 from cmccabe/KAFKA-4893
    
    (cherry picked from commit e6563aab722b35c4984b77e9eee42a1904cd1ea6)
    Signed-off-by: Gwen Shapira <cs...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala               | 15 ++++++++++-----
 core/src/main/scala/kafka/server/ReplicaManager.scala |  8 +++++++-
 .../kafka/api/AdminClientIntegrationTest.scala        | 18 ++++++++++++++++++
 .../test/scala/unit/kafka/log/LogManagerTest.scala    |  9 ++++++++-
 core/src/test/scala/unit/kafka/log/LogTest.scala      | 19 ++++++++++++++++++-
 5 files changed, 61 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 56b2969..9ab6fda 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -2180,8 +2180,8 @@ object Log {
   /** a directory that is used for future partition */
   val FutureDirSuffix = "-future"
 
-  private val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
-  private val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")
+  private[log] val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
+  private[log] val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")
 
   val UnknownLogStartOffset = -1L
 
@@ -2227,11 +2227,16 @@ object Log {
     new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix + suffix)
 
   /**
-   * Return a directory name to rename the log directory to for async deletion. The name will be in the following
-   * format: topic-partition.uniqueId-delete where topic, partition and uniqueId are variables.
+   * Return a directory name to rename the log directory to for async deletion.
+   * The name will be in the following format: "topic-partitionId.uniqueId-delete".
+   * If the topic name is too long, it will be truncated to prevent the total name
+   * from exceeding 255 characters.
    */
   def logDeleteDirName(topicPartition: TopicPartition): String = {
-    logDirNameWithSuffix(topicPartition, DeleteDirSuffix)
+    val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
+    val suffix = s"-${topicPartition.partition()}.${uniqueId}${DeleteDirSuffix}"
+    val prefixLength = Math.min(topicPartition.topic().size, 255 - suffix.size)
+    s"${topicPartition.topic().substring(0, prefixLength)}${suffix}"
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2023a97..9b83090 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -580,6 +580,11 @@ class ReplicaManager(val config: KafkaConfig,
     replicaStateChangeLock synchronized {
       partitionDirs.map { case (topicPartition, destinationDir) =>
         try {
+          /* If the topic name is exceptionally long, we can't support altering the log directory.
+           * See KAFKA-4893 for details.
+           * TODO: fix this by implementing topic IDs. */
+          if (Log.logFutureDirName(topicPartition).size > 255)
+            throw new InvalidTopicException("The topic name is too long.")
           if (!logManager.isLogDirOnline(destinationDir))
             throw new KafkaStorageException(s"Log directory $destinationDir is offline")
 
@@ -620,7 +625,8 @@ class ReplicaManager(val config: KafkaConfig,
 
           (topicPartition, Errors.NONE)
         } catch {
-          case e@(_: LogDirNotFoundException |
+          case e@(_: InvalidTopicException |
+                  _: LogDirNotFoundException |
                   _: ReplicaNotAvailableException |
                   _: KafkaStorageException) =>
             (topicPartition, Errors.forException(e))
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 46cd318..40d2820 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -1585,6 +1585,24 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
       Some("Invalid config value for resource"))
   }
+
+  @Test
+  def testLongTopicNames(): Unit = {
+    val client = AdminClient.create(createConfig)
+    val longTopicName = String.join("", Collections.nCopies(249, "x"));
+    val invalidTopicName = String.join("", Collections.nCopies(250, "x"));
+    val newTopics2 = Seq(new NewTopic(invalidTopicName, 3, 3),
+                         new NewTopic(longTopicName, 3, 3))
+    val results = client.createTopics(newTopics2.asJava).values()
+    assertTrue(results.containsKey(longTopicName))
+    results.get(longTopicName).get()
+    assertTrue(results.containsKey(invalidTopicName))
+    assertFutureExceptionTypeEquals(results.get(invalidTopicName), classOf[InvalidTopicException])
+    assertFutureExceptionTypeEquals(client.alterReplicaLogDirs(
+      Map(new TopicPartitionReplica(longTopicName, 0, 0) -> servers(0).config.logDirs(0)).asJava).all(),
+      classOf[InvalidTopicException])
+    client.close()
+  }
 }
 
 object AdminClientIntegrationTest {
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index b3ecd23..3df09e7 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -18,7 +18,7 @@
 package kafka.log
 
 import java.io._
-import java.util.Properties
+import java.util.{Collections, Properties}
 
 import kafka.server.FetchDataInfo
 import kafka.server.checkpoints.OffsetCheckpointFile
@@ -381,6 +381,13 @@ class LogManagerTest {
   }
 
   @Test
+  def testCreateAndDeleteOverlyLongTopic(): Unit = {
+    val invalidTopicName = String.join("", Collections.nCopies(253, "x"));
+    val log = logManager.getOrCreateLog(new TopicPartition(invalidTopicName, 0), logConfig)
+    logManager.asyncDelete(new TopicPartition(invalidTopicName, 0))
+  }
+
+  @Test
   def testCheckpointForOnlyAffectedLogs() {
     val tps = Seq(
       new TopicPartition("test-a", 0),
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 8635969..dc9a423 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -20,7 +20,8 @@ package kafka.log
 import java.io._
 import java.nio.ByteBuffer
 import java.nio.file.{Files, Paths}
-import java.util.{Optional, Properties}
+import java.util.regex.Pattern
+import java.util.{Collections, Optional, Properties}
 
 import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
 import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
@@ -75,6 +76,22 @@ class LogTest {
   }
 
   @Test
+  def testLogDeleteDirName(): Unit = {
+    val name1 = Log.logDeleteDirName(new TopicPartition("foo", 3))
+    assertTrue(name1.length <= 255)
+    assertTrue(Pattern.compile("foo-3\\.[0-9a-z]{32}-delete").matcher(name1).matches())
+    assertTrue(Log.DeleteDirPattern.matcher(name1).matches())
+    assertFalse(Log.FutureDirPattern.matcher(name1).matches())
+    val name2 = Log.logDeleteDirName(
+      new TopicPartition("n" + String.join("", Collections.nCopies(248, "o")), 5))
+    System.out.println("name2 = " + name2)
+    assertEquals(255, name2.length)
+    assertTrue(Pattern.compile("n[o]{212}-5\\.[0-9a-z]{32}-delete").matcher(name2).matches())
+    assertTrue(Log.DeleteDirPattern.matcher(name2).matches())
+    assertFalse(Log.FutureDirPattern.matcher(name2).matches())
+  }
+
+  @Test
   def testOffsetFromFile() {
     val offset = 23423423L