You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "divijvaidya (via GitHub)" <gi...@apache.org> on 2023/06/30 16:25:13 UTC

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager

divijvaidya commented on code in PR #13837:
URL: https://github.com/apache/kafka/pull/13837#discussion_r1247912475


##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteTopicPartitionDirectory.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static java.util.Arrays.asList;
+import static java.util.Objects.requireNonNull;
+import static java.util.regex.Pattern.compile;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.getUuid;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.deleteFilesOnly;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.deleteQuietly;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Represents a topic-partition directory in the local tiered storage under which filesets for
+ * log segments are stored.
+ *
+ *
+ * <code>
+ * / storage-directory / uuidBase64-0-topic / tvHCaSDsQZWsjr5rbtCjxA-segment
+ *                     .                   .  tvHCaSDsQZWsjr5rbtCjxA-offset_index
+ *                     .                   .  tvHCaSDsQZWsjr5rbtCjxA-time_index
+ *                     .
+ *                     / 5fEBmixCR5-dMntYSLIr1g-3-topic / BFyXlC8ySMm-Uzxw5lZSMg-segment
+ *                                                      . BFyXlC8ySMm-Uzxw5lZSMg-offset_index
+ *                                                      . BFyXlC8ySMm-Uzxw5lZSMg-time_index
+ * </code>
+ */
+public final class RemoteTopicPartitionDirectory {
+    private static final Logger LOGGER = getLogger(RemoteTopicPartitionDirectory.class);
+    private static final String UUID_LEGAL_CHARS = "[a-zA-Z0-9_-]{22}";

Review Comment:
   `_` (underscore) is not a valid UUID char as per https://www.ietf.org/rfc/rfc4122.txt



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local file system to store
+ * offloaded log segments and associated data.
+ * <p>
+ * Due to the consistency semantic of POSIX-compliant file systems, this remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the copy to the storage succeeded.
+ * </p>
+ * <p>
+ * In order to guarantee isolation, independence, reproducibility and consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via the storage ID provided to
+ * the constructor, should be limited to a test or well-defined self-contained use-case.
+ * </p>
+ * <p>
+ * The local tiered storage keeps a simple structure of directories mimicking that of Apache Kafka.
+ * <p>
+ * The name of each of the files under the scope of a log segment (the log file, its indexes, etc.)
+ * follows the structure UuidBase64-FileType.
+ * <p>
+ * Given the root directory of the storage, segments and associated files are organized as represented below.
+ * </p>
+ * <code>
+ * / storage-directory  / LWgrMmVrT0a__7a4SasuPA-0-topic / bCqX9U--S-6U8XUM9II25Q-segment
+ * .                                                     . bCqX9U--S-6U8XUM9II25Q-offset_index
+ * .                                                     . bCqX9U--S-6U8XUM9II25Q-time_index
+ * .                                                     . h956soEzTzi9a-NOQ-DvKA-segment
+ * .                                                     . h956soEzTzi9a-NOQ-DvKA-offset_index
+ * .                                                     . h956soEzTzi9a-NOQ-DvKA-segment
+ * .
+ * / LWgrMmVrT0a__7a4SasuPA-1-topic / o8CQPT86QQmbFmi3xRmiHA-segment
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-offset_index
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-time_index
+ * .
+ * / DRagLm_PS9Wl8fz1X43zVg-3-btopic / jvj3vhliTGeU90sIosmp_g-segment
+ * .                                 . jvj3vhliTGeU90sIosmp_g-offset_index
+ * .                                 . jvj3vhliTGeU90sIosmp_g-time_index
+ * </code>
+ */
+public final class LocalTieredStorage implements RemoteStorageManager {
+
+    public static final String STORAGE_CONFIG_PREFIX = "remote.log.storage.local.";
+
+    /**
+     * The root directory of this storage.
+     */
+    public static final String STORAGE_DIR_PROP = "dir";
+
+    /**
+     * Delete all files and directories from this storage on close, substantially removing it
+     * entirely from the file system.
+     */
+    public static final String DELETE_ON_CLOSE_PROP = "delete.on.close";
+
+    /**
+     * The implementation of the transfer of the data of the canonical segment and index files to
+     * this storage. The only reason the "transferer" abstraction exists is to be able to simulate
+     * file copy errors and exercise the associated failure modes.
+     */
+    public static final String TRANSFERER_CLASS_PROP = "transferer";
+
+    /**
+     * Whether the deleteLogSegment() implemented by this storage should actually delete data or behave
+     * as a no-operation. This allows to simulate non-strongly consistent storage systems which do not
+     * guarantee visibility of a successful delete for subsequent read or list operations.
+     */
+    public static final String ENABLE_DELETE_API_PROP = "delete.enable";
+
+    /**
+     * The ID of the broker which owns this instance of {@link LocalTieredStorage}.
+     */
+    public static final String BROKER_ID = "broker.id";

Review Comment:
   Use `KafkaConfig.BrokerIdProp`?



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local file system to store
+ * offloaded log segments and associated data.
+ * <p>
+ * Due to the consistency semantic of POSIX-compliant file systems, this remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the copy to the storage succeeded.
+ * </p>
+ * <p>
+ * In order to guarantee isolation, independence, reproducibility and consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via the storage ID provided to
+ * the constructor, should be limited to a test or well-defined self-contained use-case.
+ * </p>
+ * <p>
+ * The local tiered storage keeps a simple structure of directories mimicking that of Apache Kafka.
+ * <p>
+ * The name of each of the files under the scope of a log segment (the log file, its indexes, etc.)
+ * follows the structure UuidBase64-FileType.
+ * <p>
+ * Given the root directory of the storage, segments and associated files are organized as represented below.
+ * </p>
+ * <code>
+ * / storage-directory  / LWgrMmVrT0a__7a4SasuPA-0-topic / bCqX9U--S-6U8XUM9II25Q-segment
+ * .                                                     . bCqX9U--S-6U8XUM9II25Q-offset_index
+ * .                                                     . bCqX9U--S-6U8XUM9II25Q-time_index
+ * .                                                     . h956soEzTzi9a-NOQ-DvKA-segment
+ * .                                                     . h956soEzTzi9a-NOQ-DvKA-offset_index
+ * .                                                     . h956soEzTzi9a-NOQ-DvKA-segment
+ * .
+ * / LWgrMmVrT0a__7a4SasuPA-1-topic / o8CQPT86QQmbFmi3xRmiHA-segment
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-offset_index
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-time_index
+ * .
+ * / DRagLm_PS9Wl8fz1X43zVg-3-btopic / jvj3vhliTGeU90sIosmp_g-segment
+ * .                                 . jvj3vhliTGeU90sIosmp_g-offset_index
+ * .                                 . jvj3vhliTGeU90sIosmp_g-time_index
+ * </code>
+ */
+public final class LocalTieredStorage implements RemoteStorageManager {
+
+    public static final String STORAGE_CONFIG_PREFIX = "remote.log.storage.local.";
+
+    /**
+     * The root directory of this storage.
+     */
+    public static final String STORAGE_DIR_PROP = "dir";
+
+    /**
+     * Delete all files and directories from this storage on close, substantially removing it
+     * entirely from the file system.
+     */
+    public static final String DELETE_ON_CLOSE_PROP = "delete.on.close";
+
+    /**
+     * The implementation of the transfer of the data of the canonical segment and index files to
+     * this storage. The only reason the "transferer" abstraction exists is to be able to simulate
+     * file copy errors and exercise the associated failure modes.
+     */
+    public static final String TRANSFERER_CLASS_PROP = "transferer";
+
+    /**
+     * Whether the deleteLogSegment() implemented by this storage should actually delete data or behave
+     * as a no-operation. This allows to simulate non-strongly consistent storage systems which do not
+     * guarantee visibility of a successful delete for subsequent read or list operations.
+     */
+    public static final String ENABLE_DELETE_API_PROP = "delete.enable";
+
+    /**
+     * The ID of the broker which owns this instance of {@link LocalTieredStorage}.
+     */
+    public static final String BROKER_ID = "broker.id";
+
+    private static final String ROOT_STORAGES_DIR_NAME = "kafka-tiered-storage";
+
+    private volatile File storageDirectory;
+    private volatile boolean deleteOnClose = false;
+    private volatile boolean deleteEnabled = true;
+    private volatile Transferer transferer = new Transferer() {
+        @Override
+        public void transfer(File from, File to) throws IOException {
+            if (from.exists()) {
+                Files.copy(from.toPath(), to.toPath());
+            }
+        }
+
+        @Override
+        public void transfer(ByteBuffer from, File to) throws IOException {
+            if (from != null && from.hasRemaining()) {
+                try (FileOutputStream fileOutputStream = new FileOutputStream(to, false);
+                     FileChannel channel = fileOutputStream.getChannel()) {
+                    channel.write(from);
+                }
+            }
+        }
+    };
+
+    private volatile int brokerId = -1;
+
+    private volatile Logger logger = LoggerFactory.getLogger(LocalTieredStorage.class);
+
+    /**
+     * Used to explicit a chronological ordering of the events generated by the local tiered storage
+     * which this instance gives access to.
+     */
+    // TODO: Makes this timestamp only dependent on the assigned broker, not the class instance.
+    private final AtomicInteger eventTimestamp = new AtomicInteger(-1);
+
+    /**
+     * Used to notify users of this storage of internal updates - new topic-partition recorded (upon directory
+     * creation) and segment file written (upon segment file write(2)).
+     */
+    private final LocalTieredStorageListeners storageListeners = new LocalTieredStorageListeners();
+
+    private final LocalTieredStorageHistory history = new LocalTieredStorageHistory();
+
+    public LocalTieredStorage() {
+        history.listenTo(this);
+    }
+
+    /**
+     * Walks through this storage and notify the traverser of every topic-partition, segment and record discovered.
+     * <p>
+     * - The order of traversal of the topic-partition is not specified.
+     * - The order of traversal of the segments within a topic-partition is in ascending order
+     * of the modified timestamp of the segment file.
+     * - The order of traversal of records within a segment corresponds to the insertion
+     * order of these records in the original segment from which the segment in this storage
+     * was transferred from.
+     * <p>
+     * This method is NOT an atomic operation w.r.t the local tiered storage. This storage may change while
+     * being traversed topic-partitions, segments and records are communicated to the traverser. There is
+     * no guarantee updates to the storage which happens during traversal will be communicated to the traverser.
+     * Especially, in case of concurrent read and write/delete to a topic-partition, a segment or a record,
+     * the behaviour depends on the underlying file system.
+     *
+     * @param traverser User-specified object to which this storage communicates the topic-partitions,
+     *                  segments and records as they are discovered.
+     */
+    public void traverse(final LocalTieredStorageTraverser traverser) {
+        Objects.requireNonNull(traverser);
+
+        final File[] files = storageDirectory.listFiles();
+        if (files == null) {
+            // files can be null if the directory is empty.
+            return;
+        }
+
+        Arrays.stream(files)
+                .filter(File::isDirectory)
+                .forEach(dir ->
+                        openExistingTopicPartitionDirectory(dir.getName(), storageDirectory).traverse(traverser));
+    }
+
+    public void addListener(final LocalTieredStorageListener listener) {
+        this.storageListeners.add(listener);
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        if (storageDirectory != null) {
+            throw new InvalidConfigurationException(format("This instance of local remote storage" +
+                    "is already configured. The existing storage directory is %s. Ensure the method " +
+                    "configure() is only called once.", storageDirectory.getAbsolutePath()));
+        }
+
+        final String storageDir = (String) configs.get(STORAGE_DIR_PROP);
+        final String shouldDeleteOnClose = (String) configs.get(DELETE_ON_CLOSE_PROP);
+        final String transfererClass = (String) configs.get(TRANSFERER_CLASS_PROP);
+        final String isDeleteEnabled = (String) configs.get(ENABLE_DELETE_API_PROP);
+        final Integer brokerIdInt = (Integer) configs.get(BROKER_ID);
+
+        if (brokerIdInt == null) {
+            throw new InvalidConfigurationException(
+                    "Broker ID is required to configure the LocalTieredStorage manager.");
+        }
+
+        brokerId = brokerIdInt;
+        logger = new LogContext(format("[LocalTieredStorage Id=%d] ", brokerId)).logger(this.getClass());
+
+        if (shouldDeleteOnClose != null) {
+            deleteOnClose = Boolean.parseBoolean(shouldDeleteOnClose);
+        }
+
+        if (isDeleteEnabled != null) {
+            deleteEnabled = Boolean.parseBoolean(isDeleteEnabled);
+        }
+
+        if (transfererClass != null) {
+            try {
+                transferer = (Transferer) getClass().getClassLoader().loadClass(transfererClass).newInstance();
+
+            } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | ClassCastException e) {
+                throw new RuntimeException(format("Cannot create transferer from class '%s'", transfererClass), e);
+            }
+        }
+
+
+        if (storageDir == null) {
+            storageDirectory = TestUtils.tempDirectory(ROOT_STORAGES_DIR_NAME + "-");

Review Comment:
   This will be deleted on exit because TestUtils.tempDirectory registers `file.deleteOnExit()`. Is this the behaviour we want when we pass deleteOnClose=false?



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local file system to store
+ * offloaded log segments and associated data.
+ * <p>
+ * Due to the consistency semantic of POSIX-compliant file systems, this remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the copy to the storage succeeded.
+ * </p>
+ * <p>
+ * In order to guarantee isolation, independence, reproducibility and consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via the storage ID provided to
+ * the constructor, should be limited to a test or well-defined self-contained use-case.
+ * </p>
+ * <p>
+ * The local tiered storage keeps a simple structure of directories mimicking that of Apache Kafka.
+ * <p>
+ * The name of each of the files under the scope of a log segment (the log file, its indexes, etc.)
+ * follows the structure UuidBase64-FileType.
+ * <p>
+ * Given the root directory of the storage, segments and associated files are organized as represented below.
+ * </p>
+ * <code>
+ * / storage-directory  / LWgrMmVrT0a__7a4SasuPA-0-topic / bCqX9U--S-6U8XUM9II25Q-segment
+ * .                                                     . bCqX9U--S-6U8XUM9II25Q-offset_index
+ * .                                                     . bCqX9U--S-6U8XUM9II25Q-time_index
+ * .                                                     . h956soEzTzi9a-NOQ-DvKA-segment
+ * .                                                     . h956soEzTzi9a-NOQ-DvKA-offset_index
+ * .                                                     . h956soEzTzi9a-NOQ-DvKA-segment
+ * .
+ * / LWgrMmVrT0a__7a4SasuPA-1-topic / o8CQPT86QQmbFmi3xRmiHA-segment
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-offset_index
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-time_index
+ * .
+ * / DRagLm_PS9Wl8fz1X43zVg-3-btopic / jvj3vhliTGeU90sIosmp_g-segment
+ * .                                 . jvj3vhliTGeU90sIosmp_g-offset_index
+ * .                                 . jvj3vhliTGeU90sIosmp_g-time_index
+ * </code>
+ */
+public final class LocalTieredStorage implements RemoteStorageManager {
+
+    public static final String STORAGE_CONFIG_PREFIX = "remote.log.storage.local.";
+
+    /**
+     * The root directory of this storage.
+     */
+    public static final String STORAGE_DIR_PROP = "dir";
+
+    /**
+     * Delete all files and directories from this storage on close, substantially removing it
+     * entirely from the file system.
+     */
+    public static final String DELETE_ON_CLOSE_PROP = "delete.on.close";
+
+    /**
+     * The implementation of the transfer of the data of the canonical segment and index files to
+     * this storage. The only reason the "transferer" abstraction exists is to be able to simulate
+     * file copy errors and exercise the associated failure modes.
+     */
+    public static final String TRANSFERER_CLASS_PROP = "transferer";
+
+    /**
+     * Whether the deleteLogSegment() implemented by this storage should actually delete data or behave
+     * as a no-operation. This allows to simulate non-strongly consistent storage systems which do not
+     * guarantee visibility of a successful delete for subsequent read or list operations.
+     */
+    public static final String ENABLE_DELETE_API_PROP = "delete.enable";
+
+    /**
+     * The ID of the broker which owns this instance of {@link LocalTieredStorage}.
+     */
+    public static final String BROKER_ID = "broker.id";
+
+    private static final String ROOT_STORAGES_DIR_NAME = "kafka-tiered-storage";
+
+    private volatile File storageDirectory;
+    private volatile boolean deleteOnClose = false;
+    private volatile boolean deleteEnabled = true;
+    private volatile Transferer transferer = new Transferer() {
+        @Override
+        public void transfer(File from, File to) throws IOException {
+            if (from.exists()) {
+                Files.copy(from.toPath(), to.toPath());
+            }
+        }
+
+        @Override
+        public void transfer(ByteBuffer from, File to) throws IOException {
+            if (from != null && from.hasRemaining()) {
+                try (FileOutputStream fileOutputStream = new FileOutputStream(to, false);
+                     FileChannel channel = fileOutputStream.getChannel()) {
+                    channel.write(from);
+                }
+            }
+        }
+    };
+
+    private volatile int brokerId = -1;
+
+    private volatile Logger logger = LoggerFactory.getLogger(LocalTieredStorage.class);
+
+    /**
+     * Used to explicit a chronological ordering of the events generated by the local tiered storage
+     * which this instance gives access to.
+     */
+    // TODO: Makes this timestamp only dependent on the assigned broker, not the class instance.
+    private final AtomicInteger eventTimestamp = new AtomicInteger(-1);
+
+    /**
+     * Used to notify users of this storage of internal updates - new topic-partition recorded (upon directory
+     * creation) and segment file written (upon segment file write(2)).
+     */
+    private final LocalTieredStorageListeners storageListeners = new LocalTieredStorageListeners();
+
+    private final LocalTieredStorageHistory history = new LocalTieredStorageHistory();
+
+    public LocalTieredStorage() {
+        history.listenTo(this);
+    }
+
+    /**
+     * Walks through this storage and notify the traverser of every topic-partition, segment and record discovered.
+     * <p>
+     * - The order of traversal of the topic-partition is not specified.
+     * - The order of traversal of the segments within a topic-partition is in ascending order
+     * of the modified timestamp of the segment file.
+     * - The order of traversal of records within a segment corresponds to the insertion
+     * order of these records in the original segment from which the segment in this storage
+     * was transferred from.
+     * <p>
+     * This method is NOT an atomic operation w.r.t the local tiered storage. This storage may change while
+     * being traversed topic-partitions, segments and records are communicated to the traverser. There is
+     * no guarantee updates to the storage which happens during traversal will be communicated to the traverser.
+     * Especially, in case of concurrent read and write/delete to a topic-partition, a segment or a record,
+     * the behaviour depends on the underlying file system.
+     *
+     * @param traverser User-specified object to which this storage communicates the topic-partitions,
+     *                  segments and records as they are discovered.
+     */
+    public void traverse(final LocalTieredStorageTraverser traverser) {
+        Objects.requireNonNull(traverser);
+
+        final File[] files = storageDirectory.listFiles();
+        if (files == null) {
+            // files can be null if the directory is empty.
+            return;
+        }
+
+        Arrays.stream(files)
+                .filter(File::isDirectory)
+                .forEach(dir ->
+                        openExistingTopicPartitionDirectory(dir.getName(), storageDirectory).traverse(traverser));
+    }
+
+    public void addListener(final LocalTieredStorageListener listener) {
+        this.storageListeners.add(listener);
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        if (storageDirectory != null) {
+            throw new InvalidConfigurationException(format("This instance of local remote storage" +
+                    "is already configured. The existing storage directory is %s. Ensure the method " +
+                    "configure() is only called once.", storageDirectory.getAbsolutePath()));
+        }
+
+        final String storageDir = (String) configs.get(STORAGE_DIR_PROP);
+        final String shouldDeleteOnClose = (String) configs.get(DELETE_ON_CLOSE_PROP);
+        final String transfererClass = (String) configs.get(TRANSFERER_CLASS_PROP);
+        final String isDeleteEnabled = (String) configs.get(ENABLE_DELETE_API_PROP);
+        final Integer brokerIdInt = (Integer) configs.get(BROKER_ID);
+
+        if (brokerIdInt == null) {
+            throw new InvalidConfigurationException(
+                    "Broker ID is required to configure the LocalTieredStorage manager.");
+        }
+
+        brokerId = brokerIdInt;
+        logger = new LogContext(format("[LocalTieredStorage Id=%d] ", brokerId)).logger(this.getClass());
+
+        if (shouldDeleteOnClose != null) {
+            deleteOnClose = Boolean.parseBoolean(shouldDeleteOnClose);
+        }
+
+        if (isDeleteEnabled != null) {
+            deleteEnabled = Boolean.parseBoolean(isDeleteEnabled);
+        }
+
+        if (transfererClass != null) {
+            try {
+                transferer = (Transferer) getClass().getClassLoader().loadClass(transfererClass).newInstance();
+
+            } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | ClassCastException e) {
+                throw new RuntimeException(format("Cannot create transferer from class '%s'", transfererClass), e);
+            }
+        }
+
+
+        if (storageDir == null) {
+            storageDirectory = TestUtils.tempDirectory(ROOT_STORAGES_DIR_NAME + "-");
+
+            logger.debug("No storage directory specified, created temporary directory: {}",
+                    storageDirectory.getAbsolutePath());
+
+        } else {
+            storageDirectory = new File(storageDir + "/" + ROOT_STORAGES_DIR_NAME);
+            // NOTE: Provide the relative storage directory path to view the files in the same directory when running tests.
+            // storageDirectory = new File(new File("."), ROOT_STORAGES_DIR_NAME + "/" + storageDir);
+            final boolean existed = storageDirectory.exists();
+
+            if (!existed) {
+                logger.info("Creating directory: [{}]", storageDirectory.getAbsolutePath());
+                storageDirectory.mkdirs();

Review Comment:
   please use `Files.createDirectories`. The new (relatively) Files API introduced in JDK 7 has better error handling semantics.



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static java.lang.String.format;
+import static java.util.Arrays.stream;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
+import static java.util.regex.Pattern.compile;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.getFileType;
+import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Represents the set of files offloaded to the local tiered storage for a single log segment.
+ * A {@link RemoteLogSegmentFileset} corresponds to the leaves of the file system structure of
+ * the local tiered storage:
+ *
+ * <code>
+ * / storage-directory / uuidBase64-partition-topic / oAtiIQ95REujbuzNd_lkLQ-segment
+ *                                                  . oAtiIQ95REujbuzNd_lkLQ-offset_index
+ *                                                  . oAtiIQ95REujbuzNd_lkLQ-time_index
+ * </code>
+ */
+public final class RemoteLogSegmentFileset {
+
+    /**
+     * The format of a file which belongs to the fileset, i.e. a file which is assigned to a log segment in
+     * Kafka's log directory.
+     *
+     * The name of each of the files under the scope of a log segment (the log file, its indexes, etc.)
+     * follows the structure UUID-FileType.
+     */
+    private static final String UUID_LEGAL_CHARS = "[a-zA-Z0-9_-]{22}";
+    private static final Pattern FILENAME_FORMAT = compile("(" + UUID_LEGAL_CHARS + ")-([a-z_]+)");
+    private static final int GROUP_UUID = 1;
+    private static final int GROUP_FILE_TYPE = 2;
+
+    /**
+     * Characterises the type of a file in the local tiered storage copied from Apache Kafka's standard storage.
+     */
+    public enum RemoteLogSegmentFileType {
+        SEGMENT(false),
+        OFFSET_INDEX(false),
+        TIME_INDEX(false),
+        TRANSACTION_INDEX(true),
+        LEADER_EPOCH_CHECKPOINT(false),
+        PRODUCER_SNAPSHOT(true);
+
+        private final boolean optional;
+
+        RemoteLogSegmentFileType(boolean optional) {
+            this.optional = optional;
+        }
+
+        /**
+         * Provides the name of the file of this type for the given UUID in the local tiered storage,
+         * e.g. uuid-segment.
+         */
+        public String toFilename(final Uuid uuid) {
+            return format("%s-%s", uuid.toString(), name().toLowerCase(Locale.ROOT));
+        }
+
+        /**
+         * Returns the nature of the data stored in the file with the provided name.
+         */
+        public static RemoteLogSegmentFileType getFileType(final String filename) {
+            try {
+                return RemoteLogSegmentFileType.valueOf(substr(filename, GROUP_FILE_TYPE).toUpperCase(Locale.ROOT));
+
+            } catch (final RuntimeException e) {
+                throw new IllegalArgumentException(format("Not a remote log segment file: %s", filename), e);
+            }
+        }
+
+        /**
+         * Extract the UUID from the filename. This UUID is that of the remote log segment id which uniquely
+         * identify the log segment which filename's data belongs to (not necessarily segment data, but also
+         * indexes or other associated files).
+         */
+        public static Uuid getUuid(final String filename) {
+            return Uuid.fromString(substr(filename, GROUP_UUID));
+        }
+
+        static String substr(final String filename, final int group) {
+            final Matcher m = FILENAME_FORMAT.matcher(filename);
+            if (!m.matches()) {
+                throw new IllegalArgumentException(format("Not a remote log segment file: %s", filename));
+            }
+            return m.group(group);
+        }
+
+        public boolean isOptional() {
+            return optional;
+        }
+    }
+
+    private static final Logger LOGGER = getLogger(RemoteLogSegmentFileset.class);
+
+    private final RemoteTopicPartitionDirectory partitionDirectory;
+    private final RemoteLogSegmentId remoteLogSegmentId;
+    private final Map<RemoteLogSegmentFileType, File> files;
+
+    /**
+     * Creates a new fileset located under the given storage directory for the provided remote log segment id.
+     * The topic-partition directory is created if it does not exist yet. However the files corresponding to
+     * the log segment offloaded are not created on the file system until transfer happens.
+     *
+     * @param storageDir The root directory of the local tiered storage.
+     * @param id Remote log segment id assigned to a log segment in Kafka.
+     * @return A new fileset instance.
+     */
+    public static RemoteLogSegmentFileset openFileset(final File storageDir, final RemoteLogSegmentId id) {
+
+        final RemoteTopicPartitionDirectory tpDir = openTopicPartitionDirectory(id.topicIdPartition(), storageDir);
+        final File partitionDirectory = tpDir.getDirectory();
+        final Uuid uuid = id.id();
+
+        final Map<RemoteLogSegmentFileType, File> files = stream(RemoteLogSegmentFileType.values())
+                .collect(toMap(identity(), type -> new File(partitionDirectory, type.toFilename(uuid))));
+
+        return new RemoteLogSegmentFileset(tpDir, id, files);
+    }
+
+    /**
+     * Creates a fileset instance for the physical set of files located under the given topic-partition directory.
+     * The fileset MUST exist on the file system with the given uuid.
+     *
+     * @param tpDirectory The topic-partition directory which this fileset's segment belongs to.
+     * @param uuid The expected UUID of the fileset.
+     * @return A new fileset instance.
+     */
+    public static RemoteLogSegmentFileset openExistingFileset(final RemoteTopicPartitionDirectory tpDirectory,
+                                                              final Uuid uuid) {
+        final Map<RemoteLogSegmentFileType, File> files =
+                stream(tpDirectory.getDirectory().listFiles())

Review Comment:
   Please use newer API `Files.list()`



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local file system to store
+ * offloaded log segments and associated data.
+ * <p>
+ * Due to the consistency semantic of POSIX-compliant file systems, this remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the copy to the storage succeeded.
+ * </p>
+ * <p>
+ * In order to guarantee isolation, independence, reproducibility and consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via the storage ID provided to
+ * the constructor, should be limited to a test or well-defined self-contained use-case.
+ * </p>
+ * <p>
+ * The local tiered storage keeps a simple structure of directories mimicking that of Apache Kafka.
+ * <p>
+ * The name of each of the files under the scope of a log segment (the log file, its indexes, etc.)
+ * follows the structure UuidBase64-FileType.
+ * <p>
+ * Given the root directory of the storage, segments and associated files are organized as represented below.
+ * </p>
+ * <code>
+ * / storage-directory  / LWgrMmVrT0a__7a4SasuPA-0-topic / bCqX9U--S-6U8XUM9II25Q-segment
+ * .                                                     . bCqX9U--S-6U8XUM9II25Q-offset_index
+ * .                                                     . bCqX9U--S-6U8XUM9II25Q-time_index
+ * .                                                     . h956soEzTzi9a-NOQ-DvKA-segment
+ * .                                                     . h956soEzTzi9a-NOQ-DvKA-offset_index
+ * .                                                     . h956soEzTzi9a-NOQ-DvKA-segment
+ * .
+ * / LWgrMmVrT0a__7a4SasuPA-1-topic / o8CQPT86QQmbFmi3xRmiHA-segment
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-offset_index
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-time_index
+ * .
+ * / DRagLm_PS9Wl8fz1X43zVg-3-btopic / jvj3vhliTGeU90sIosmp_g-segment
+ * .                                 . jvj3vhliTGeU90sIosmp_g-offset_index
+ * .                                 . jvj3vhliTGeU90sIosmp_g-time_index
+ * </code>
+ */
+public final class LocalTieredStorage implements RemoteStorageManager {
+
+    public static final String STORAGE_CONFIG_PREFIX = "remote.log.storage.local.";
+
+    /**
+     * The root directory of this storage.
+     */
+    public static final String STORAGE_DIR_PROP = "dir";
+
+    /**
+     * Delete all files and directories from this storage on close, substantially removing it
+     * entirely from the file system.
+     */
+    public static final String DELETE_ON_CLOSE_PROP = "delete.on.close";
+
+    /**
+     * The implementation of the transfer of the data of the canonical segment and index files to
+     * this storage. The only reason the "transferer" abstraction exists is to be able to simulate
+     * file copy errors and exercise the associated failure modes.
+     */
+    public static final String TRANSFERER_CLASS_PROP = "transferer";
+
+    /**
+     * Whether the deleteLogSegment() implemented by this storage should actually delete data or behave
+     * as a no-operation. This allows to simulate non-strongly consistent storage systems which do not
+     * guarantee visibility of a successful delete for subsequent read or list operations.
+     */
+    public static final String ENABLE_DELETE_API_PROP = "delete.enable";
+
+    /**
+     * The ID of the broker which owns this instance of {@link LocalTieredStorage}.
+     */
+    public static final String BROKER_ID = "broker.id";
+
+    private static final String ROOT_STORAGES_DIR_NAME = "kafka-tiered-storage";
+
+    private volatile File storageDirectory;
+    private volatile boolean deleteOnClose = false;
+    private volatile boolean deleteEnabled = true;
+    private volatile Transferer transferer = new Transferer() {
+        @Override
+        public void transfer(File from, File to) throws IOException {
+            if (from.exists()) {
+                Files.copy(from.toPath(), to.toPath());
+            }
+        }
+
+        @Override
+        public void transfer(ByteBuffer from, File to) throws IOException {
+            if (from != null && from.hasRemaining()) {
+                try (FileOutputStream fileOutputStream = new FileOutputStream(to, false);
+                     FileChannel channel = fileOutputStream.getChannel()) {
+                    channel.write(from);
+                }
+            }
+        }
+    };
+
+    private volatile int brokerId = -1;
+
+    private volatile Logger logger = LoggerFactory.getLogger(LocalTieredStorage.class);
+
+    /**
+     * Used to explicit a chronological ordering of the events generated by the local tiered storage
+     * which this instance gives access to.
+     */
+    // TODO: Makes this timestamp only dependent on the assigned broker, not the class instance.
+    private final AtomicInteger eventTimestamp = new AtomicInteger(-1);
+
+    /**
+     * Used to notify users of this storage of internal updates - new topic-partition recorded (upon directory
+     * creation) and segment file written (upon segment file write(2)).
+     */
+    private final LocalTieredStorageListeners storageListeners = new LocalTieredStorageListeners();
+
+    private final LocalTieredStorageHistory history = new LocalTieredStorageHistory();
+
+    public LocalTieredStorage() {
+        history.listenTo(this);
+    }
+
+    /**
+     * Walks through this storage and notify the traverser of every topic-partition, segment and record discovered.
+     * <p>
+     * - The order of traversal of the topic-partition is not specified.
+     * - The order of traversal of the segments within a topic-partition is in ascending order
+     * of the modified timestamp of the segment file.
+     * - The order of traversal of records within a segment corresponds to the insertion
+     * order of these records in the original segment from which the segment in this storage
+     * was transferred from.
+     * <p>
+     * This method is NOT an atomic operation w.r.t the local tiered storage. This storage may change while
+     * being traversed topic-partitions, segments and records are communicated to the traverser. There is
+     * no guarantee updates to the storage which happens during traversal will be communicated to the traverser.
+     * Especially, in case of concurrent read and write/delete to a topic-partition, a segment or a record,
+     * the behaviour depends on the underlying file system.
+     *
+     * @param traverser User-specified object to which this storage communicates the topic-partitions,
+     *                  segments and records as they are discovered.
+     */
+    public void traverse(final LocalTieredStorageTraverser traverser) {
+        Objects.requireNonNull(traverser);
+
+        final File[] files = storageDirectory.listFiles();
+        if (files == null) {
+            // files can be null if the directory is empty.
+            return;
+        }
+
+        Arrays.stream(files)
+                .filter(File::isDirectory)
+                .forEach(dir ->
+                        openExistingTopicPartitionDirectory(dir.getName(), storageDirectory).traverse(traverser));
+    }
+
+    public void addListener(final LocalTieredStorageListener listener) {
+        this.storageListeners.add(listener);
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        if (storageDirectory != null) {
+            throw new InvalidConfigurationException(format("This instance of local remote storage" +
+                    "is already configured. The existing storage directory is %s. Ensure the method " +
+                    "configure() is only called once.", storageDirectory.getAbsolutePath()));
+        }
+
+        final String storageDir = (String) configs.get(STORAGE_DIR_PROP);
+        final String shouldDeleteOnClose = (String) configs.get(DELETE_ON_CLOSE_PROP);
+        final String transfererClass = (String) configs.get(TRANSFERER_CLASS_PROP);
+        final String isDeleteEnabled = (String) configs.get(ENABLE_DELETE_API_PROP);
+        final Integer brokerIdInt = (Integer) configs.get(BROKER_ID);
+
+        if (brokerIdInt == null) {
+            throw new InvalidConfigurationException(
+                    "Broker ID is required to configure the LocalTieredStorage manager.");
+        }
+
+        brokerId = brokerIdInt;
+        logger = new LogContext(format("[LocalTieredStorage Id=%d] ", brokerId)).logger(this.getClass());
+
+        if (shouldDeleteOnClose != null) {
+            deleteOnClose = Boolean.parseBoolean(shouldDeleteOnClose);
+        }
+
+        if (isDeleteEnabled != null) {
+            deleteEnabled = Boolean.parseBoolean(isDeleteEnabled);
+        }
+
+        if (transfererClass != null) {
+            try {
+                transferer = (Transferer) getClass().getClassLoader().loadClass(transfererClass).newInstance();
+
+            } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | ClassCastException e) {
+                throw new RuntimeException(format("Cannot create transferer from class '%s'", transfererClass), e);
+            }
+        }
+
+
+        if (storageDir == null) {
+            storageDirectory = TestUtils.tempDirectory(ROOT_STORAGES_DIR_NAME + "-");
+
+            logger.debug("No storage directory specified, created temporary directory: {}",
+                    storageDirectory.getAbsolutePath());
+
+        } else {
+            storageDirectory = new File(storageDir + "/" + ROOT_STORAGES_DIR_NAME);
+            // NOTE: Provide the relative storage directory path to view the files in the same directory when running tests.
+            // storageDirectory = new File(new File("."), ROOT_STORAGES_DIR_NAME + "/" + storageDir);
+            final boolean existed = storageDirectory.exists();

Review Comment:
   please use `Files.exists()` which was introduced in JDK 7 in NIO2 APIs. It has better error handling semantics.



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/Transferer.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface Transferer {

Review Comment:
   a java doc on the purpose of this interface will be very useful here



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java:
##########
@@ -0,0 +1,675 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.text.NumberFormat;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static java.nio.ByteBuffer.wrap;
+import static java.util.Arrays.asList;
+import static java.util.Objects.requireNonNull;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot.takeSnapshot;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import static org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.LEADER_EPOCH;
+import static org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.OFFSET;
+import static org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT;
+import static org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.TIMESTAMP;
+import static org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.TRANSACTION;
+
+public final class LocalTieredStorageTest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(LocalTieredStorageTest.class);
+
+    private final LocalLogSegments localLogSegments = new LocalLogSegments();
+    private final TopicPartition topicPartition = new TopicPartition("my-topic", 1);
+    private final TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), topicPartition);
+
+    private LocalTieredStorage tieredStorage;
+    private Verifier remoteStorageVerifier;
+    private String storageDir;
+
+    private void init(Map<String, Object> extraConfig, String testName) {
+        tieredStorage = new LocalTieredStorage();
+        remoteStorageVerifier = new Verifier(tieredStorage, topicIdPartition);
+        storageDir = generateStorageId(testName);
+
+        Map<String, Object> config = new HashMap<>();
+        config.put(LocalTieredStorage.STORAGE_DIR_PROP, storageDir);
+        config.put(LocalTieredStorage.DELETE_ON_CLOSE_PROP, "true");
+        config.put(LocalTieredStorage.BROKER_ID, 1);
+        config.putAll(extraConfig);
+
+        tieredStorage.configure(config);
+    }
+
+    @BeforeEach
+    public void before(TestInfo testInfo) {
+        init(Collections.emptyMap(), testInfo.getDisplayName());
+    }
+
+    @AfterEach
+    public void after() throws IOException {
+        tieredStorage.clear();
+        localLogSegments.deleteAll();
+        Files.deleteIfExists(Paths.get(storageDir));
+    }
+
+    @Test
+    public void copyEmptyLogSegment() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+        final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
+        tieredStorage.copyLogSegmentData(metadata, segment);
+
+        remoteStorageVerifier.verifyContainsLogSegmentFiles(id, segment);
+    }
+
+    @Test
+    public void copyDataFromLogSegment() throws RemoteStorageException {
+        final byte[] data = new byte[]{0, 1, 2};
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment(data);
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
+
+        remoteStorageVerifier.verifyRemoteLogSegmentMatchesLocal(id, segment);
+    }
+
+    @Test
+    public void fetchLogSegment() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment(new byte[]{0, 1, 2});
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
+
+        remoteStorageVerifier.verifyFetchedLogSegment(id, 0, new byte[]{0, 1, 2});
+        //FIXME: Fetch at arbitrary index does not work as proper support for records need to be added.
+    }
+
+    @Test
+    public void fetchOffsetIndex() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
+
+        remoteStorageVerifier.verifyFetchedOffsetIndex(id, LocalLogSegments.OFFSET_FILE_BYTES);
+    }
+
+    @Test
+    public void fetchTimeIndex() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
+
+        remoteStorageVerifier.verifyFetchedTimeIndex(id, LocalLogSegments.TIME_FILE_BYTES);
+    }
+
+    @Test
+    public void fetchTransactionIndex() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
+
+        remoteStorageVerifier.verifyFetchedTransactionIndex(id, LocalLogSegments.TXN_FILE_BYTES);
+    }
+
+    @Test
+    public void fetchLeaderEpochCheckpoint() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
+
+        remoteStorageVerifier.verifyLeaderEpochCheckpoint(id, LocalLogSegments.LEADER_EPOCH_CHECKPOINT_FILE_BYTES);
+    }
+
+    @Test
+    public void fetchProducerSnapshot() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
+
+        remoteStorageVerifier.verifyProducerSnapshot(id, LocalLogSegments.PRODUCER_SNAPSHOT_FILE_BYTES);
+    }
+
+    @Test
+    public void deleteLogSegment() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
+        remoteStorageVerifier.verifyContainsLogSegmentFiles(id, segment);
+
+        tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
+        remoteStorageVerifier.verifyLogSegmentFilesAbsent(id, segment);
+    }
+
+    @Test
+    public void deletePartition() throws RemoteStorageException {
+        int segmentCount = 10;
+        List<RemoteLogSegmentId> segmentIds = new ArrayList<>();
+        for (int i = 0; i < segmentCount; i++) {
+            final RemoteLogSegmentId id = newRemoteLogSegmentId();
+            final LogSegmentData segment = localLogSegments.nextSegment();
+            tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
+            remoteStorageVerifier.verifyContainsLogSegmentFiles(id, segment);
+            segmentIds.add(id);
+        }
+        tieredStorage.deletePartition(topicIdPartition);
+        remoteStorageVerifier.assertFileDoesNotExist(remoteStorageVerifier.expectedPartitionPath());
+        for (RemoteLogSegmentId segmentId: segmentIds) {
+            remoteStorageVerifier.verifyLogSegmentFilesAbsent(segmentId, null);
+        }
+    }
+
+    @Test
+    public void deleteLogSegmentWithoutOptionalFiles() throws RemoteStorageException {
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+        segment.transactionIndex().get().toFile().delete();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
+        remoteStorageVerifier.verifyContainsLogSegmentFiles(id, path -> {
+            String fileName = path.getFileName().toString();
+            if (!(fileName.contains("transaction_index") || fileName.contains("snapshot"))) {
+                remoteStorageVerifier.assertFileExists(path);
+            }
+        });
+
+        tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
+        remoteStorageVerifier.verifyLogSegmentFilesAbsent(id, segment);
+    }
+
+    @Test
+    public void segmentsAreNotDeletedIfDeleteApiIsDisabled(TestInfo testInfo) throws RemoteStorageException {
+        init(Collections.singletonMap(LocalTieredStorage.ENABLE_DELETE_API_PROP, "false"), testInfo.getDisplayName());
+
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
+        remoteStorageVerifier.verifyContainsLogSegmentFiles(id, segment);
+
+        tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
+        remoteStorageVerifier.verifyContainsLogSegmentFiles(id, segment);
+    }
+
+    @Test
+    public void traverseSingleOffloadedRecord() throws RemoteStorageException {
+        final byte[] bytes = new byte[]{0, 1, 2};
+
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+        final LogSegmentData segment = localLogSegments.nextSegment(bytes);
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
+
+        tieredStorage.traverse(new LocalTieredStorageTraverser() {
+            @Override
+            public void visitTopicIdPartition(TopicIdPartition topicIdPartition) {
+                assertEquals(LocalTieredStorageTest.this.topicPartition, topicIdPartition.topicPartition());
+            }
+
+            @Override
+            public void visitSegment(RemoteLogSegmentFileset fileset) {
+                assertEquals(id, fileset.getRemoteLogSegmentId());
+
+                try {
+                    final FileRecords records = FileRecords.open(fileset.getFile(SEGMENT));
+                    final Iterator<Record> it = records.records().iterator();
+
+                    assertEquals(wrap(bytes), it.next().value());
+
+                } catch (IOException e) {
+                    throw new AssertionError(e);
+                }
+            }
+        });
+    }
+
+    @Test
+    public void traverseMultipleOffloadedRecordsInOneSegment() throws RemoteStorageException, IOException {
+        final byte[] record1 = new byte[]{0, 1, 2};
+        final byte[] record2 = new byte[]{3, 4, 5};
+        final RemoteLogSegmentId id = newRemoteLogSegmentId();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), localLogSegments.nextSegment(record1, record2));
+
+        final LocalTieredStorageSnapshot snapshot = takeSnapshot(tieredStorage);
+
+        assertEquals(asList(topicPartition), snapshot.getTopicPartitions());
+        assertEquals(asList(wrap(record1), wrap(record2)), extractRecordsValue(snapshot, id));
+    }
+
+    @Test
+    public void traverseMultipleOffloadedRecordsInTwoSegments() throws RemoteStorageException, IOException {
+        final byte[] record1a = new byte[]{0, 1, 2};
+        final byte[] record2a = new byte[]{3, 4, 5};
+        final byte[] record1b = new byte[]{6, 7, 8};
+        final byte[] record2b = new byte[]{9, 10, 11};
+
+        final RemoteLogSegmentId idA = newRemoteLogSegmentId();
+        final RemoteLogSegmentId idB = newRemoteLogSegmentId();
+
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(idA), localLogSegments.nextSegment(record1a, record2a));
+        tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(idB), localLogSegments.nextSegment(record1b, record2b));
+
+        final LocalTieredStorageSnapshot snapshot = takeSnapshot(tieredStorage);
+
+        final Map<RemoteLogSegmentId, List<ByteBuffer>> expected = new HashMap<>();
+        expected.put(idA, asList(wrap(record1a), wrap(record2a)));
+        expected.put(idB, asList(wrap(record1b), wrap(record2b)));
+
+        final Map<RemoteLogSegmentId, List<ByteBuffer>> actual = new HashMap<>();
+        actual.put(idA, extractRecordsValue(snapshot, idA));
+        actual.put(idB, extractRecordsValue(snapshot, idB));
+
+        assertEquals(asList(topicPartition), snapshot.getTopicPartitions());
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void fetchThrowsIfDataDoesNotExist() {
+        final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(newRemoteLogSegmentId());
+
+        assertThrows(RemoteResourceNotFoundException.class,
+            () -> tieredStorage.fetchLogSegment(metadata, 0, Integer.MAX_VALUE));
+        assertThrows(RemoteResourceNotFoundException.class, () -> tieredStorage.fetchIndex(metadata, OFFSET));
+        assertThrows(RemoteResourceNotFoundException.class, () -> tieredStorage.fetchIndex(metadata, TIMESTAMP));
+        assertThrows(RemoteResourceNotFoundException.class, () -> tieredStorage.fetchIndex(metadata, LEADER_EPOCH));
+
+        try {
+            assertArrayEquals(new byte[0], remoteStorageVerifier.readFully(tieredStorage.fetchIndex(metadata, TRANSACTION)));
+            assertArrayEquals(new byte[0], remoteStorageVerifier.readFully(tieredStorage.fetchIndex(metadata, PRODUCER_SNAPSHOT)));
+        } catch (Exception ex) {
+            fail("Shouldn't have thrown an exception when optional file doesn't exists in the remote store");
+        }
+    }
+
+    @Test
+    public void assertStartAndEndPositionConsistency() {
+        final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(newRemoteLogSegmentId());
+
+        assertThrows(IllegalArgumentException.class, () -> tieredStorage.fetchLogSegment(metadata, -1, Integer.MAX_VALUE));
+        assertThrows(IllegalArgumentException.class, () -> tieredStorage.fetchLogSegment(metadata, 1, -1));
+        assertThrows(IllegalArgumentException.class, () -> tieredStorage.fetchLogSegment(metadata, 2, 1));
+    }
+
+    private RemoteLogSegmentMetadata newRemoteLogSegmentMetadata(final RemoteLogSegmentId id) {
+        return new RemoteLogSegmentMetadata(id, 0, 0, -1L, -1, 1000L,
+                1024, Collections.singletonMap(0, 0L));
+    }
+
+    private RemoteLogSegmentId newRemoteLogSegmentId() {
+        return new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
+    }
+
+    private static List<ByteBuffer> extractRecordsValue(
+            final LocalTieredStorageSnapshot snapshot,
+            final RemoteLogSegmentId id) throws IOException {
+
+        final FileRecords records = FileRecords.open(snapshot.getFile(id, SEGMENT));
+        final List<ByteBuffer> buffers = new ArrayList<>();
+
+        for (Record record: records.records()) {
+            buffers.add(record.value());
+        }
+
+        return buffers;
+    }
+
+    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH:mm:ss");
+
+    private String generateStorageId(String testName) {
+        return format("%s-%s-%s",
+                getClass().getSimpleName(), testName, DATE_TIME_FORMATTER.format(LocalDateTime.now()));
+    }
+
+    public final class Verifier {
+        private final LocalTieredStorage remoteStorage;
+        private final TopicIdPartition topicIdPartition;
+
+        public Verifier(final LocalTieredStorage remoteStorage, final TopicIdPartition topicIdPartition) {
+            this.remoteStorage = requireNonNull(remoteStorage);
+            this.topicIdPartition = requireNonNull(topicIdPartition);
+        }
+
+        private List<Path> expectedPaths(final RemoteLogSegmentId id) {
+            final String rootPath = getStorageRootDirectory();
+            TopicPartition tp = topicIdPartition.topicPartition();
+            final String topicPartitionSubpath = format("%s-%d-%s", topicIdPartition.topicId(),
+                   tp.partition(), tp.topic());
+            final String uuid = id.id().toString();
+
+            return Arrays.asList(
+                    Paths.get(rootPath, topicPartitionSubpath, uuid + "-segment"),

Review Comment:
   these suffix should be moved to a constant



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static java.lang.String.format;
+import static java.util.Arrays.stream;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
+import static java.util.regex.Pattern.compile;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.getFileType;
+import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Represents the set of files offloaded to the local tiered storage for a single log segment.
+ * A {@link RemoteLogSegmentFileset} corresponds to the leaves of the file system structure of
+ * the local tiered storage:
+ *
+ * <code>
+ * / storage-directory / uuidBase64-partition-topic / oAtiIQ95REujbuzNd_lkLQ-segment
+ *                                                  . oAtiIQ95REujbuzNd_lkLQ-offset_index
+ *                                                  . oAtiIQ95REujbuzNd_lkLQ-time_index
+ * </code>
+ */
+public final class RemoteLogSegmentFileset {
+
+    /**
+     * The format of a file which belongs to the fileset, i.e. a file which is assigned to a log segment in
+     * Kafka's log directory.
+     *
+     * The name of each of the files under the scope of a log segment (the log file, its indexes, etc.)
+     * follows the structure UUID-FileType.
+     */
+    private static final String UUID_LEGAL_CHARS = "[a-zA-Z0-9_-]{22}";
+    private static final Pattern FILENAME_FORMAT = compile("(" + UUID_LEGAL_CHARS + ")-([a-z_]+)");
+    private static final int GROUP_UUID = 1;
+    private static final int GROUP_FILE_TYPE = 2;
+
+    /**
+     * Characterises the type of a file in the local tiered storage copied from Apache Kafka's standard storage.
+     */
+    public enum RemoteLogSegmentFileType {
+        SEGMENT(false),
+        OFFSET_INDEX(false),
+        TIME_INDEX(false),
+        TRANSACTION_INDEX(true),
+        LEADER_EPOCH_CHECKPOINT(false),
+        PRODUCER_SNAPSHOT(true);

Review Comment:
   Current code expects producer snapshot to be present (although this is incorrect because segments uploaded prior to 2.8 won't have this), hence, this is a mandatory entry. The code fails at:
   https://github.com/apache/kafka/blob/43574beb972d47d696e0de077f453b36ce148026/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L592 with null pointer exception if producer snapshot is not available.
   
   



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteTopicPartitionDirectory.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static java.util.Arrays.asList;
+import static java.util.Objects.requireNonNull;
+import static java.util.regex.Pattern.compile;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.getUuid;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.deleteFilesOnly;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.deleteQuietly;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Represents a topic-partition directory in the local tiered storage under which filesets for
+ * log segments are stored.
+ *
+ *
+ * <code>
+ * / storage-directory / uuidBase64-0-topic / tvHCaSDsQZWsjr5rbtCjxA-segment
+ *                     .                   .  tvHCaSDsQZWsjr5rbtCjxA-offset_index
+ *                     .                   .  tvHCaSDsQZWsjr5rbtCjxA-time_index
+ *                     .
+ *                     / 5fEBmixCR5-dMntYSLIr1g-3-topic / BFyXlC8ySMm-Uzxw5lZSMg-segment
+ *                                                      . BFyXlC8ySMm-Uzxw5lZSMg-offset_index

Review Comment:
   Can we use `UnifiedLog.IndexFileSuffix` (similar for offset and trx index) here which will use `.` instead of underscore to name the files? This will bring consistency with how we store the file names on log and RemoteIndexCache.



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static java.lang.String.format;
+import static java.util.Arrays.stream;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
+import static java.util.regex.Pattern.compile;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.getFileType;
+import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Represents the set of files offloaded to the local tiered storage for a single log segment.
+ * A {@link RemoteLogSegmentFileset} corresponds to the leaves of the file system structure of
+ * the local tiered storage:
+ *
+ * <code>
+ * / storage-directory / uuidBase64-partition-topic / oAtiIQ95REujbuzNd_lkLQ-segment
+ *                                                  . oAtiIQ95REujbuzNd_lkLQ-offset_index
+ *                                                  . oAtiIQ95REujbuzNd_lkLQ-time_index
+ * </code>
+ */
+public final class RemoteLogSegmentFileset {
+
+    /**
+     * The format of a file which belongs to the fileset, i.e. a file which is assigned to a log segment in
+     * Kafka's log directory.
+     *
+     * The name of each of the files under the scope of a log segment (the log file, its indexes, etc.)
+     * follows the structure UUID-FileType.
+     */
+    private static final String UUID_LEGAL_CHARS = "[a-zA-Z0-9_-]{22}";
+    private static final Pattern FILENAME_FORMAT = compile("(" + UUID_LEGAL_CHARS + ")-([a-z_]+)");
+    private static final int GROUP_UUID = 1;
+    private static final int GROUP_FILE_TYPE = 2;
+
+    /**
+     * Characterises the type of a file in the local tiered storage copied from Apache Kafka's standard storage.
+     */
+    public enum RemoteLogSegmentFileType {
+        SEGMENT(false),
+        OFFSET_INDEX(false),
+        TIME_INDEX(false),
+        TRANSACTION_INDEX(true),
+        LEADER_EPOCH_CHECKPOINT(false),
+        PRODUCER_SNAPSHOT(true);
+
+        private final boolean optional;
+
+        RemoteLogSegmentFileType(boolean optional) {
+            this.optional = optional;
+        }
+
+        /**
+         * Provides the name of the file of this type for the given UUID in the local tiered storage,
+         * e.g. uuid-segment.
+         */
+        public String toFilename(final Uuid uuid) {
+            return format("%s-%s", uuid.toString(), name().toLowerCase(Locale.ROOT));
+        }
+
+        /**
+         * Returns the nature of the data stored in the file with the provided name.
+         */
+        public static RemoteLogSegmentFileType getFileType(final String filename) {
+            try {
+                return RemoteLogSegmentFileType.valueOf(substr(filename, GROUP_FILE_TYPE).toUpperCase(Locale.ROOT));
+
+            } catch (final RuntimeException e) {
+                throw new IllegalArgumentException(format("Not a remote log segment file: %s", filename), e);
+            }
+        }
+
+        /**
+         * Extract the UUID from the filename. This UUID is that of the remote log segment id which uniquely
+         * identify the log segment which filename's data belongs to (not necessarily segment data, but also
+         * indexes or other associated files).
+         */
+        public static Uuid getUuid(final String filename) {
+            return Uuid.fromString(substr(filename, GROUP_UUID));
+        }
+
+        static String substr(final String filename, final int group) {
+            final Matcher m = FILENAME_FORMAT.matcher(filename);
+            if (!m.matches()) {
+                throw new IllegalArgumentException(format("Not a remote log segment file: %s", filename));
+            }
+            return m.group(group);
+        }
+
+        public boolean isOptional() {
+            return optional;
+        }
+    }
+
+    private static final Logger LOGGER = getLogger(RemoteLogSegmentFileset.class);
+
+    private final RemoteTopicPartitionDirectory partitionDirectory;
+    private final RemoteLogSegmentId remoteLogSegmentId;
+    private final Map<RemoteLogSegmentFileType, File> files;
+
+    /**
+     * Creates a new fileset located under the given storage directory for the provided remote log segment id.
+     * The topic-partition directory is created if it does not exist yet. However the files corresponding to
+     * the log segment offloaded are not created on the file system until transfer happens.
+     *
+     * @param storageDir The root directory of the local tiered storage.
+     * @param id Remote log segment id assigned to a log segment in Kafka.
+     * @return A new fileset instance.
+     */
+    public static RemoteLogSegmentFileset openFileset(final File storageDir, final RemoteLogSegmentId id) {
+
+        final RemoteTopicPartitionDirectory tpDir = openTopicPartitionDirectory(id.topicIdPartition(), storageDir);
+        final File partitionDirectory = tpDir.getDirectory();
+        final Uuid uuid = id.id();
+
+        final Map<RemoteLogSegmentFileType, File> files = stream(RemoteLogSegmentFileType.values())
+                .collect(toMap(identity(), type -> new File(partitionDirectory, type.toFilename(uuid))));
+
+        return new RemoteLogSegmentFileset(tpDir, id, files);
+    }
+
+    /**
+     * Creates a fileset instance for the physical set of files located under the given topic-partition directory.
+     * The fileset MUST exist on the file system with the given uuid.
+     *
+     * @param tpDirectory The topic-partition directory which this fileset's segment belongs to.
+     * @param uuid The expected UUID of the fileset.
+     * @return A new fileset instance.
+     */
+    public static RemoteLogSegmentFileset openExistingFileset(final RemoteTopicPartitionDirectory tpDirectory,
+                                                              final Uuid uuid) {
+        final Map<RemoteLogSegmentFileType, File> files =
+                stream(tpDirectory.getDirectory().listFiles())

Review Comment:
   also filter for non-directories



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local file system to store
+ * offloaded log segments and associated data.
+ * <p>
+ * Due to the consistency semantic of POSIX-compliant file systems, this remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the copy to the storage succeeded.
+ * </p>
+ * <p>
+ * In order to guarantee isolation, independence, reproducibility and consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via the storage ID provided to
+ * the constructor, should be limited to a test or well-defined self-contained use-case.
+ * </p>
+ * <p>
+ * The local tiered storage keeps a simple structure of directories mimicking that of Apache Kafka.
+ * <p>
+ * The name of each of the files under the scope of a log segment (the log file, its indexes, etc.)
+ * follows the structure UuidBase64-FileType.
+ * <p>
+ * Given the root directory of the storage, segments and associated files are organized as represented below.
+ * </p>
+ * <code>
+ * / storage-directory  / LWgrMmVrT0a__7a4SasuPA-0-topic / bCqX9U--S-6U8XUM9II25Q-segment
+ * .                                                     . bCqX9U--S-6U8XUM9II25Q-offset_index
+ * .                                                     . bCqX9U--S-6U8XUM9II25Q-time_index
+ * .                                                     . h956soEzTzi9a-NOQ-DvKA-segment
+ * .                                                     . h956soEzTzi9a-NOQ-DvKA-offset_index
+ * .                                                     . h956soEzTzi9a-NOQ-DvKA-segment
+ * .
+ * / LWgrMmVrT0a__7a4SasuPA-1-topic / o8CQPT86QQmbFmi3xRmiHA-segment
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-offset_index
+ * .                                . o8CQPT86QQmbFmi3xRmiHA-time_index
+ * .
+ * / DRagLm_PS9Wl8fz1X43zVg-3-btopic / jvj3vhliTGeU90sIosmp_g-segment
+ * .                                 . jvj3vhliTGeU90sIosmp_g-offset_index
+ * .                                 . jvj3vhliTGeU90sIosmp_g-time_index
+ * </code>
+ */
+public final class LocalTieredStorage implements RemoteStorageManager {
+
+    public static final String STORAGE_CONFIG_PREFIX = "remote.log.storage.local.";
+
+    /**
+     * The root directory of this storage.
+     */
+    public static final String STORAGE_DIR_PROP = "dir";
+
+    /**
+     * Delete all files and directories from this storage on close, substantially removing it
+     * entirely from the file system.
+     */
+    public static final String DELETE_ON_CLOSE_PROP = "delete.on.close";
+
+    /**
+     * The implementation of the transfer of the data of the canonical segment and index files to
+     * this storage. The only reason the "transferer" abstraction exists is to be able to simulate
+     * file copy errors and exercise the associated failure modes.
+     */
+    public static final String TRANSFERER_CLASS_PROP = "transferer";
+
+    /**
+     * Whether the deleteLogSegment() implemented by this storage should actually delete data or behave
+     * as a no-operation. This allows to simulate non-strongly consistent storage systems which do not
+     * guarantee visibility of a successful delete for subsequent read or list operations.
+     */
+    public static final String ENABLE_DELETE_API_PROP = "delete.enable";
+
+    /**
+     * The ID of the broker which owns this instance of {@link LocalTieredStorage}.
+     */
+    public static final String BROKER_ID = "broker.id";
+
+    private static final String ROOT_STORAGES_DIR_NAME = "kafka-tiered-storage";
+
+    private volatile File storageDirectory;
+    private volatile boolean deleteOnClose = false;
+    private volatile boolean deleteEnabled = true;
+    private volatile Transferer transferer = new Transferer() {
+        @Override
+        public void transfer(File from, File to) throws IOException {
+            if (from.exists()) {
+                Files.copy(from.toPath(), to.toPath());
+            }
+        }
+
+        @Override
+        public void transfer(ByteBuffer from, File to) throws IOException {
+            if (from != null && from.hasRemaining()) {
+                try (FileOutputStream fileOutputStream = new FileOutputStream(to, false);
+                     FileChannel channel = fileOutputStream.getChannel()) {
+                    channel.write(from);
+                }
+            }
+        }
+    };
+
+    private volatile int brokerId = -1;
+
+    private volatile Logger logger = LoggerFactory.getLogger(LocalTieredStorage.class);
+
+    /**
+     * Used to explicit a chronological ordering of the events generated by the local tiered storage
+     * which this instance gives access to.
+     */
+    // TODO: Makes this timestamp only dependent on the assigned broker, not the class instance.
+    private final AtomicInteger eventTimestamp = new AtomicInteger(-1);
+
+    /**
+     * Used to notify users of this storage of internal updates - new topic-partition recorded (upon directory
+     * creation) and segment file written (upon segment file write(2)).
+     */
+    private final LocalTieredStorageListeners storageListeners = new LocalTieredStorageListeners();
+
+    private final LocalTieredStorageHistory history = new LocalTieredStorageHistory();
+
+    public LocalTieredStorage() {
+        history.listenTo(this);
+    }
+
+    /**
+     * Walks through this storage and notify the traverser of every topic-partition, segment and record discovered.
+     * <p>
+     * - The order of traversal of the topic-partition is not specified.
+     * - The order of traversal of the segments within a topic-partition is in ascending order
+     * of the modified timestamp of the segment file.
+     * - The order of traversal of records within a segment corresponds to the insertion
+     * order of these records in the original segment from which the segment in this storage
+     * was transferred from.
+     * <p>
+     * This method is NOT an atomic operation w.r.t the local tiered storage. This storage may change while
+     * being traversed topic-partitions, segments and records are communicated to the traverser. There is
+     * no guarantee updates to the storage which happens during traversal will be communicated to the traverser.
+     * Especially, in case of concurrent read and write/delete to a topic-partition, a segment or a record,
+     * the behaviour depends on the underlying file system.
+     *
+     * @param traverser User-specified object to which this storage communicates the topic-partitions,
+     *                  segments and records as they are discovered.
+     */
+    public void traverse(final LocalTieredStorageTraverser traverser) {
+        Objects.requireNonNull(traverser);
+
+        final File[] files = storageDirectory.listFiles();
+        if (files == null) {
+            // files can be null if the directory is empty.
+            return;
+        }
+
+        Arrays.stream(files)
+                .filter(File::isDirectory)
+                .forEach(dir ->
+                        openExistingTopicPartitionDirectory(dir.getName(), storageDirectory).traverse(traverser));
+    }
+
+    public void addListener(final LocalTieredStorageListener listener) {
+        this.storageListeners.add(listener);
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        if (storageDirectory != null) {
+            throw new InvalidConfigurationException(format("This instance of local remote storage" +
+                    "is already configured. The existing storage directory is %s. Ensure the method " +
+                    "configure() is only called once.", storageDirectory.getAbsolutePath()));
+        }
+
+        final String storageDir = (String) configs.get(STORAGE_DIR_PROP);
+        final String shouldDeleteOnClose = (String) configs.get(DELETE_ON_CLOSE_PROP);
+        final String transfererClass = (String) configs.get(TRANSFERER_CLASS_PROP);
+        final String isDeleteEnabled = (String) configs.get(ENABLE_DELETE_API_PROP);
+        final Integer brokerIdInt = (Integer) configs.get(BROKER_ID);
+
+        if (brokerIdInt == null) {
+            throw new InvalidConfigurationException(
+                    "Broker ID is required to configure the LocalTieredStorage manager.");
+        }
+
+        brokerId = brokerIdInt;
+        logger = new LogContext(format("[LocalTieredStorage Id=%d] ", brokerId)).logger(this.getClass());
+
+        if (shouldDeleteOnClose != null) {
+            deleteOnClose = Boolean.parseBoolean(shouldDeleteOnClose);
+        }
+
+        if (isDeleteEnabled != null) {
+            deleteEnabled = Boolean.parseBoolean(isDeleteEnabled);
+        }
+
+        if (transfererClass != null) {
+            try {
+                transferer = (Transferer) getClass().getClassLoader().loadClass(transfererClass).newInstance();
+
+            } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | ClassCastException e) {
+                throw new RuntimeException(format("Cannot create transferer from class '%s'", transfererClass), e);
+            }
+        }
+
+
+        if (storageDir == null) {
+            storageDirectory = TestUtils.tempDirectory(ROOT_STORAGES_DIR_NAME + "-");
+
+            logger.debug("No storage directory specified, created temporary directory: {}",
+                    storageDirectory.getAbsolutePath());
+
+        } else {
+            storageDirectory = new File(storageDir + "/" + ROOT_STORAGES_DIR_NAME);

Review Comment:
   You don't need to specify the "/" as it can be OS dependent. File API takes care of that for you with File(parent,child) API: https://docs.oracle.com/javase/8/docs/api/java/io/File.html#File-java.io.File-java.lang.String-



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;

Review Comment:
   is this the right place to place these files? Asking because all other TS related files are in `kafka.log.remote`



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static java.lang.String.format;
+import static java.util.Arrays.stream;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
+import static java.util.regex.Pattern.compile;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.getFileType;
+import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Represents the set of files offloaded to the local tiered storage for a single log segment.
+ * A {@link RemoteLogSegmentFileset} corresponds to the leaves of the file system structure of
+ * the local tiered storage:
+ *
+ * <code>
+ * / storage-directory / uuidBase64-partition-topic / oAtiIQ95REujbuzNd_lkLQ-segment
+ *                                                  . oAtiIQ95REujbuzNd_lkLQ-offset_index
+ *                                                  . oAtiIQ95REujbuzNd_lkLQ-time_index
+ * </code>
+ */
+public final class RemoteLogSegmentFileset {
+
+    /**
+     * The format of a file which belongs to the fileset, i.e. a file which is assigned to a log segment in
+     * Kafka's log directory.
+     *
+     * The name of each of the files under the scope of a log segment (the log file, its indexes, etc.)
+     * follows the structure UUID-FileType.
+     */
+    private static final String UUID_LEGAL_CHARS = "[a-zA-Z0-9_-]{22}";
+    private static final Pattern FILENAME_FORMAT = compile("(" + UUID_LEGAL_CHARS + ")-([a-z_]+)");
+    private static final int GROUP_UUID = 1;
+    private static final int GROUP_FILE_TYPE = 2;
+
+    /**
+     * Characterises the type of a file in the local tiered storage copied from Apache Kafka's standard storage.
+     */
+    public enum RemoteLogSegmentFileType {
+        SEGMENT(false),
+        OFFSET_INDEX(false),
+        TIME_INDEX(false),
+        TRANSACTION_INDEX(true),
+        LEADER_EPOCH_CHECKPOINT(false),
+        PRODUCER_SNAPSHOT(true);
+
+        private final boolean optional;
+
+        RemoteLogSegmentFileType(boolean optional) {
+            this.optional = optional;
+        }
+
+        /**
+         * Provides the name of the file of this type for the given UUID in the local tiered storage,
+         * e.g. uuid-segment.
+         */
+        public String toFilename(final Uuid uuid) {
+            return format("%s-%s", uuid.toString(), name().toLowerCase(Locale.ROOT));
+        }
+
+        /**
+         * Returns the nature of the data stored in the file with the provided name.
+         */
+        public static RemoteLogSegmentFileType getFileType(final String filename) {
+            try {
+                return RemoteLogSegmentFileType.valueOf(substr(filename, GROUP_FILE_TYPE).toUpperCase(Locale.ROOT));
+
+            } catch (final RuntimeException e) {
+                throw new IllegalArgumentException(format("Not a remote log segment file: %s", filename), e);
+            }
+        }
+
+        /**
+         * Extract the UUID from the filename. This UUID is that of the remote log segment id which uniquely
+         * identify the log segment which filename's data belongs to (not necessarily segment data, but also
+         * indexes or other associated files).
+         */
+        public static Uuid getUuid(final String filename) {
+            return Uuid.fromString(substr(filename, GROUP_UUID));
+        }
+
+        static String substr(final String filename, final int group) {
+            final Matcher m = FILENAME_FORMAT.matcher(filename);
+            if (!m.matches()) {
+                throw new IllegalArgumentException(format("Not a remote log segment file: %s", filename));
+            }
+            return m.group(group);
+        }
+
+        public boolean isOptional() {
+            return optional;
+        }
+    }
+
+    private static final Logger LOGGER = getLogger(RemoteLogSegmentFileset.class);
+
+    private final RemoteTopicPartitionDirectory partitionDirectory;
+    private final RemoteLogSegmentId remoteLogSegmentId;
+    private final Map<RemoteLogSegmentFileType, File> files;
+
+    /**
+     * Creates a new fileset located under the given storage directory for the provided remote log segment id.
+     * The topic-partition directory is created if it does not exist yet. However the files corresponding to
+     * the log segment offloaded are not created on the file system until transfer happens.
+     *
+     * @param storageDir The root directory of the local tiered storage.
+     * @param id Remote log segment id assigned to a log segment in Kafka.
+     * @return A new fileset instance.
+     */
+    public static RemoteLogSegmentFileset openFileset(final File storageDir, final RemoteLogSegmentId id) {
+
+        final RemoteTopicPartitionDirectory tpDir = openTopicPartitionDirectory(id.topicIdPartition(), storageDir);
+        final File partitionDirectory = tpDir.getDirectory();
+        final Uuid uuid = id.id();
+
+        final Map<RemoteLogSegmentFileType, File> files = stream(RemoteLogSegmentFileType.values())
+                .collect(toMap(identity(), type -> new File(partitionDirectory, type.toFilename(uuid))));
+
+        return new RemoteLogSegmentFileset(tpDir, id, files);
+    }
+
+    /**
+     * Creates a fileset instance for the physical set of files located under the given topic-partition directory.
+     * The fileset MUST exist on the file system with the given uuid.
+     *
+     * @param tpDirectory The topic-partition directory which this fileset's segment belongs to.
+     * @param uuid The expected UUID of the fileset.
+     * @return A new fileset instance.
+     */
+    public static RemoteLogSegmentFileset openExistingFileset(final RemoteTopicPartitionDirectory tpDirectory,
+                                                              final Uuid uuid) {
+        final Map<RemoteLogSegmentFileType, File> files =
+                stream(tpDirectory.getDirectory().listFiles())
+                        .filter(file -> file.getName().startsWith(uuid.toString()))
+                        .collect(toMap(file -> getFileType(file.getName()), identity()));
+
+        final Set<RemoteLogSegmentFileType> expectedFileTypes = stream(RemoteLogSegmentFileType.values())
+                .filter(x -> !x.isOptional()).collect(Collectors.toSet());
+
+        if (!files.keySet().containsAll(expectedFileTypes)) {
+            expectedFileTypes.removeAll(files.keySet());
+            throw new IllegalStateException(format("Invalid fileset, missing files: %s", expectedFileTypes));
+        }
+
+        final RemoteLogSegmentId id = new RemoteLogSegmentId(tpDirectory.getTopicIdPartition(), uuid);
+        return new RemoteLogSegmentFileset(tpDirectory, id, files);
+    }
+
+    public RemoteLogSegmentId getRemoteLogSegmentId() {
+        return remoteLogSegmentId;
+    }
+
+    public File getFile(final RemoteLogSegmentFileType type) {
+        return files.get(type);
+    }
+
+    public boolean delete() {
+        return deleteFilesOnly(files.values());
+    }
+
+    public List<Record> getRecords() throws IOException {
+        return StreamSupport
+                .stream(FileRecords.open(files.get(SEGMENT)).records().spliterator(), false)
+                .collect(Collectors.toList());
+    }
+
+    public void copy(final Transferer transferer, final LogSegmentData data) throws IOException {
+        transferer.transfer(data.logSegment().toFile(), files.get(SEGMENT));
+        transferer.transfer(data.offsetIndex().toFile(), files.get(OFFSET_INDEX));
+        transferer.transfer(data.timeIndex().toFile(), files.get(TIME_INDEX));
+        if (data.transactionIndex().isPresent()) {
+            transferer.transfer(data.transactionIndex().get().toFile(), files.get(TRANSACTION_INDEX));
+        }
+        transferer.transfer(data.leaderEpochIndex(), files.get(LEADER_EPOCH_CHECKPOINT));
+        transferer.transfer(data.producerSnapshotIndex().toFile(), files.get(PRODUCER_SNAPSHOT));
+    }
+
+    public String toString() {
+        final String ls = files.values().stream()
+                .map(file -> "\t" + file.getName() + "\n")
+                .reduce("", (s1, s2) -> s1 + s2);
+
+        return format("%s/\n%s", partitionDirectory.getDirectory().getName(), ls);
+    }
+
+    public static boolean deleteFilesOnly(final Collection<File> files) {
+        final Optional<File> notAFile = files.stream().filter(f -> f.exists() && !f.isFile()).findAny();
+
+        if (notAFile.isPresent()) {
+            LOGGER.warn(format("Found unexpected directory %s. Will not delete.", notAFile.get().getAbsolutePath()));
+            return false;
+        }
+
+        return files.stream().map(RemoteLogSegmentFileset::deleteQuietly).reduce(true, Boolean::logicalAnd);
+    }
+
+    public static boolean deleteQuietly(final File file) {
+        try {
+            LOGGER.trace("Deleting " + file.getAbsolutePath());
+            if (!file.exists()) {
+                return true;
+            }
+            return file.delete();

Review Comment:
   You can use `Utils.delete(File)` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org