You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Zhanxiang (Patrick) Huang (JIRA)" <ji...@apache.org> on 2018/09/28 07:05:00 UTC

[jira] [Created] (KAFKA-7452) Deleting snapshot files after check-pointing log recovery offsets can slow down replication when truncation happens

Zhanxiang (Patrick) Huang created KAFKA-7452:
------------------------------------------------

             Summary: Deleting snapshot files after check-pointing log recovery offsets can slow down replication when truncation happens
                 Key: KAFKA-7452
                 URL: https://issues.apache.org/jira/browse/KAFKA-7452
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.0.0, 1.1.1, 1.1.0, 1.0.1, 1.0.0
            Reporter: Zhanxiang (Patrick) Huang


After KAFKA-5829, Kafka will try to iterate through all the partition dirs to delete useless snapshot files in "checkpointLogRecoveryOffsetsInDir". Currently, "checkpointLogRecoveryOffsetsInDir" is used in the following places:
 # Truncation
 # Log dir deletion and movement
 # Background thread checkpointing recovery offsets

In 2.0 deployment on a cluster with 10k partitions per broker, we found out that deleting useless snapshot files in the critical path of log truncation can significantly slow down followers to catch up with leader during rolling bounce (~2x slower than 0.11). The reason is that we basically do a "ls -R" for the whole data directory only to potentially delete the snapshot files in one partition directory because the way we identify snapshot files is to list the directories and check the filename suffix.

In our case, "listSnapshotFiles" takes ~1ms per partition directory so it takes ~1ms * 10k = ~10s to just delete snapshot files in one partition after the truncation, which delays future fetches in the fetcher thread.

Here are the related code snippets:

 LogManager.scala

 
{code:java}
private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
  for {
    partitionToLog <- logsByDir.get(dir.getAbsolutePath)
    checkpoint <- recoveryPointCheckpoints.get(dir)
  } {
    try {
      checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
      allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
    } catch {
      case e: IOException =>
        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
          s"file in directory $dir", e)
    }
  }
}
{code}
 

 ProducerStateChangeManager.scala

 
{code:java}
private[log] def listSnapshotFiles(dir: File): Seq[File] = {
  if (dir.exists && dir.isDirectory) {
    Option(dir.listFiles).map { files =>
      files.filter(f => f.isFile && isSnapshotFile(f)).toSeq
    }.getOrElse(Seq.empty)
  } else Seq.empty
}


private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ => true) {
  listSnapshotFiles(dir).filter(file => predicate(offsetFromFile(file))).foreach { file =>
    Files.deleteIfExists(file.toPath)
  }
}

{code}
 

There are a few things that can be optimized here:
 # We can have an in-memory cache for the snapshot files metadata (filename) in ProducerStateManager to avoid calling dir.listFiles in "deleteSnapshotFiles", "latestSnapshotFile" and "oldestSnapshotFile".
 # After truncation, we can only try to delete snapshot files for the truncated partitions (in replica fetcher thread, we truncate one partition at a time) instead of all partitions. Or maybe we don't even need to delete snapshot files in the critical path of truncation because the background log-recovery-offset-checkpoint-thread will do it periodically. This can also apply on log deletion/movement.
 # If we want to further optimize the actual snapshot file deletion, we can make it async. But I am not sure whether it is needed after we have 1) and 2).

Also, we notice that there is no way to disable transaction/exactly-once support in the broker-side given that it will bring in some extra overhead even though we have no clients using this feature. Not sure whether this is a common use case, but it is useful if we can have a switch to avoid the extra performance overhead.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)