You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/11/01 21:34:46 UTC

[kafka] branch trunk updated: KAFKA-7568; Return leader epoch in ListOffsets response (#5855)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fc1dc35  KAFKA-7568; Return leader epoch in ListOffsets response (#5855)
fc1dc35 is described below

commit fc1dc358ee9b956e062b0966e1ed1fef40ab43d8
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Nov 1 14:34:28 2018 -0700

    KAFKA-7568; Return leader epoch in ListOffsets response (#5855)
    
    As part of KIP-320, the ListOffsets API should return the leader epoch of any fetched offset. We either get this epoch from the log itself for a timestamp query or from the epoch cache if we are searching the earliest or latest offset in the log. When handling queries for the latest offset, we have elected to choose the current leader epoch, which is consistent with other handling (e.g. OffsetsForTimes).
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../apache/kafka/common/record/FileRecords.java    | 31 +++++++----
 .../kafka/common/record/FileRecordsTest.java       | 62 ++++++++++++++++++++--
 core/src/main/scala/kafka/cluster/Partition.scala  | 11 ++--
 core/src/main/scala/kafka/log/Log.scala            | 26 +++++++--
 core/src/main/scala/kafka/log/LogSegment.scala     |  8 ++-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 12 +++--
 .../main/scala/kafka/server/ReplicaManager.scala   |  3 +-
 .../kafka/server/epoch/LeaderEpochFileCache.scala  | 17 ++++--
 .../kafka/api/PlaintextConsumerTest.scala          | 23 +++++---
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 28 ++++++++--
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 51 ++++++++++++++++--
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  3 +-
 .../unit/kafka/server/ListOffsetsRequestTest.scala | 44 +++++++++++++++
 13 files changed, 264 insertions(+), 55 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 3537fc3..d723ba0 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -32,6 +32,8 @@ import java.nio.channels.FileChannel;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -320,7 +322,8 @@ public class FileRecords extends AbstractRecords implements Closeable {
                 for (Record record : batch) {
                     long timestamp = record.timestamp();
                     if (timestamp >= targetTimestamp && record.offset() >= startingOffset)
-                        return new TimestampAndOffset(timestamp, record.offset());
+                        return new TimestampAndOffset(timestamp, record.offset(),
+                                maybeLeaderEpoch(batch.partitionLeaderEpoch()));
                 }
             }
         }
@@ -335,15 +338,23 @@ public class FileRecords extends AbstractRecords implements Closeable {
     public TimestampAndOffset largestTimestampAfter(int startingPosition) {
         long maxTimestamp = RecordBatch.NO_TIMESTAMP;
         long offsetOfMaxTimestamp = -1L;
+        int leaderEpochOfMaxTimestamp = RecordBatch.NO_PARTITION_LEADER_EPOCH;
 
         for (RecordBatch batch : batchesFrom(startingPosition)) {
             long timestamp = batch.maxTimestamp();
             if (timestamp > maxTimestamp) {
                 maxTimestamp = timestamp;
                 offsetOfMaxTimestamp = batch.lastOffset();
+                leaderEpochOfMaxTimestamp = batch.partitionLeaderEpoch();
             }
         }
-        return new TimestampAndOffset(maxTimestamp, offsetOfMaxTimestamp);
+        return new TimestampAndOffset(maxTimestamp, offsetOfMaxTimestamp,
+                maybeLeaderEpoch(leaderEpochOfMaxTimestamp));
+    }
+
+    private Optional<Integer> maybeLeaderEpoch(int leaderEpoch) {
+        return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
+                Optional.empty() : Optional.of(leaderEpoch);
     }
 
     /**
@@ -492,28 +503,27 @@ public class FileRecords extends AbstractRecords implements Closeable {
     public static class TimestampAndOffset {
         public final long timestamp;
         public final long offset;
+        public final Optional<Integer> leaderEpoch;
 
-        public TimestampAndOffset(long timestamp, long offset) {
+        public TimestampAndOffset(long timestamp, long offset, Optional<Integer> leaderEpoch) {
             this.timestamp = timestamp;
             this.offset = offset;
+            this.leaderEpoch = leaderEpoch;
         }
 
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
-
             TimestampAndOffset that = (TimestampAndOffset) o;
-
-            if (timestamp != that.timestamp) return false;
-            return offset == that.offset;
+            return timestamp == that.timestamp &&
+                    offset == that.offset &&
+                    Objects.equals(leaderEpoch, that.leaderEpoch);
         }
 
         @Override
         public int hashCode() {
-            int result = (int) (timestamp ^ (timestamp >>> 32));
-            result = 31 * result + (int) (offset ^ (offset >>> 32));
-            return result;
+            return Objects.hash(timestamp, offset, leaderEpoch);
         }
 
         @Override
@@ -521,6 +531,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
             return "TimestampAndOffset(" +
                     "timestamp=" + timestamp +
                     ", offset=" + offset +
+                    ", leaderEpoch=" + leaderEpoch +
                     ')';
         }
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 637da93..1945bcc 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 
 import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.utf8;
@@ -41,6 +42,8 @@ import static org.apache.kafka.test.TestUtils.tempFile;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -318,12 +321,12 @@ public class FileRecordsTest {
     @Test
     public void testPreallocateTrue() throws IOException {
         File temp = tempFile();
-        FileRecords fileRecords = FileRecords.open(temp, false, 512 * 1024 * 1024, true);
+        FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true);
         long position = fileRecords.channel().position();
         int size = fileRecords.sizeInBytes();
         assertEquals(0, position);
         assertEquals(0, size);
-        assertEquals(512 * 1024 * 1024, temp.length());
+        assertEquals(1024 * 1024, temp.length());
     }
 
     /**
@@ -332,7 +335,7 @@ public class FileRecordsTest {
     @Test
     public void testPreallocateFalse() throws IOException {
         File temp = tempFile();
-        FileRecords set = FileRecords.open(temp, false, 512 * 1024 * 1024, false);
+        FileRecords set = FileRecords.open(temp, false, 1024 * 1024, false);
         long position = set.channel().position();
         int size = set.sizeInBytes();
         assertEquals(0, position);
@@ -346,7 +349,7 @@ public class FileRecordsTest {
     @Test
     public void testPreallocateClearShutdown() throws IOException {
         File temp = tempFile();
-        FileRecords fileRecords = FileRecords.open(temp, false, 512 * 1024 * 1024, true);
+        FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true);
         append(fileRecords, values);
 
         int oldPosition = (int) fileRecords.channel().position();
@@ -356,7 +359,7 @@ public class FileRecordsTest {
         fileRecords.close();
 
         File tempReopen = new File(temp.getAbsolutePath());
-        FileRecords setReopen = FileRecords.open(tempReopen, true, 512 * 1024 * 1024, true);
+        FileRecords setReopen = FileRecords.open(tempReopen, true, 1024 * 1024, true);
         int position = (int) setReopen.channel().position();
         int size = setReopen.sizeInBytes();
 
@@ -383,6 +386,55 @@ public class FileRecordsTest {
     }
 
     @Test
+    public void testSearchForTimestamp() throws IOException {
+        for (RecordVersion version : RecordVersion.values()) {
+            testSearchForTimestamp(version);
+        }
+    }
+
+    private void testSearchForTimestamp(RecordVersion version) throws IOException {
+        File temp = tempFile();
+        FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true);
+        appendWithOffsetAndTimestamp(fileRecords, version, 10L, 5, 0);
+        appendWithOffsetAndTimestamp(fileRecords, version, 11L, 6, 1);
+
+        assertFoundTimestamp(new FileRecords.TimestampAndOffset(10L, 5, Optional.of(0)),
+                fileRecords.searchForTimestamp(9L, 0, 0L), version);
+        assertFoundTimestamp(new FileRecords.TimestampAndOffset(10L, 5, Optional.of(0)),
+                fileRecords.searchForTimestamp(10L, 0, 0L), version);
+        assertFoundTimestamp(new FileRecords.TimestampAndOffset(11L, 6, Optional.of(1)),
+                fileRecords.searchForTimestamp(11L, 0, 0L), version);
+        assertNull(fileRecords.searchForTimestamp(12L, 0, 0L));
+    }
+
+    private void assertFoundTimestamp(FileRecords.TimestampAndOffset expected,
+                                      FileRecords.TimestampAndOffset actual,
+                                      RecordVersion version) {
+        if (version == RecordVersion.V0) {
+            assertNull("Expected no match for message format v0", actual);
+        } else {
+            assertNotNull("Expected to find timestamp for message format " + version, actual);
+            assertEquals("Expected matching timestamps for message format" + version, expected.timestamp, actual.timestamp);
+            assertEquals("Expected matching offsets for message format " + version, expected.offset, actual.offset);
+            Optional<Integer> expectedLeaderEpoch = version.value >= RecordVersion.V2.value ?
+                    expected.leaderEpoch : Optional.empty();
+            assertEquals("Non-matching leader epoch for version " + version, expectedLeaderEpoch, actual.leaderEpoch);
+        }
+    }
+
+    private void appendWithOffsetAndTimestamp(FileRecords fileRecords,
+                                              RecordVersion recordVersion,
+                                              long timestamp,
+                                              long offset,
+                                              int leaderEpoch) throws IOException {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, recordVersion.value,
+                CompressionType.NONE, TimestampType.CREATE_TIME, offset, timestamp, leaderEpoch);
+        builder.append(new SimpleRecord(timestamp, new byte[0], new byte[0]));
+        fileRecords.append(builder.build());
+    }
+
+    @Test
     public void testConversion() throws IOException {
         doTestConversion(CompressionType.NONE, RecordBatch.MAGIC_VALUE_V0);
         doTestConversion(CompressionType.GZIP, RecordBatch.MAGIC_VALUE_V0);
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 024fdcc..819da2c 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.Errors._
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.requests._
@@ -803,7 +804,7 @@ class Partition(val topicPartition: TopicPartition,
   def fetchOffsetForTimestamp(timestamp: Long,
                               isolationLevel: Option[IsolationLevel],
                               currentLeaderEpoch: Optional[Integer],
-                              fetchOnlyFromLeader: Boolean): TimestampOffset = inReadLock(leaderIsrUpdateLock) {
+                              fetchOnlyFromLeader: Boolean): Option[TimestampAndOffset] = inReadLock(leaderIsrUpdateLock) {
     // decide whether to only fetch from leader
     val localReplica = localReplicaWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
 
@@ -814,16 +815,16 @@ class Partition(val topicPartition: TopicPartition,
     }
 
     if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP) {
-      TimestampOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset)
+      Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch)))
     } else {
-      def allowed(timestampOffset: TimestampOffset): Boolean =
+      def allowed(timestampOffset: TimestampAndOffset): Boolean =
         timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP || timestampOffset.offset < lastFetchableOffset
 
       val fetchedOffset = logManager.getLog(topicPartition).flatMap { log =>
-        log.fetchOffsetsByTimestamp(timestamp)
+        log.fetchOffsetByTimestamp(timestamp)
       }
 
-      fetchedOffset.filter(allowed).getOrElse(TimestampOffset.Unknown)
+      fetchedOffset.filter(allowed)
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index bc328d7..19e2f2f 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -22,6 +22,7 @@ import java.lang.{Long => JLong}
 import java.nio.file.{Files, NoSuchFileException}
 import java.text.NumberFormat
 import java.util.Map.{Entry => JEntry}
+import java.util.Optional
 import java.util.concurrent.atomic._
 import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, TimeUnit}
 import java.util.regex.Pattern
@@ -36,6 +37,7 @@ import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
 import kafka.utils._
 import org.apache.kafka.common.errors._
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.ListOffsetRequest
@@ -1263,7 +1265,7 @@ class Log(@volatile var dir: File,
    * @return The offset of the first message whose timestamp is greater than or equals to the given timestamp.
    *         None if no such message is found.
    */
-  def fetchOffsetsByTimestamp(targetTimestamp: Long): Option[TimestampOffset] = {
+  def fetchOffsetByTimestamp(targetTimestamp: Long): Option[TimestampAndOffset] = {
     maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") {
       debug(s"Searching offset for timestamp $targetTimestamp")
 
@@ -1278,10 +1280,24 @@ class Log(@volatile var dir: File,
       // constant time access while being safe to use with concurrent collections unlike `toArray`.
       val segmentsCopy = logSegments.toBuffer
       // For the earliest and latest, we do not need to return the timestamp.
-      if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
-        return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logStartOffset))
-      else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
-        return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logEndOffset))
+      if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) {
+        // The first cached epoch usually corresponds to the log start offset, but we have to verify this since
+        // it may not be true following a message format version bump as the epoch will not be available for
+        // log entries written in the older format.
+        val earliestEpochEntry = leaderEpochCache.earliestEntry
+        val epochOpt = earliestEpochEntry match {
+          case Some(entry) if entry.startOffset <= logStartOffset => Optional.of[Integer](entry.epoch)
+          case _ => Optional.empty[Integer]()
+        }
+        return Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt))
+      } else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP) {
+        val latestEpoch = leaderEpochCache.latestEpoch
+        val epochOpt = if (latestEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH)
+          Optional.empty[Integer]()
+        else
+          Optional.of[Integer](latestEpoch)
+        return Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOpt))
+      }
 
       val targetSeg = {
         // Get all the segments whose largest timestamp is smaller than target timestamp
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index d910a29..a700aeb 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -27,7 +27,7 @@ import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.{FetchDataInfo, LogOffsetMetadata}
 import kafka.utils._
 import org.apache.kafka.common.errors.CorruptRecordException
-import org.apache.kafka.common.record.FileRecords.LogOffsetPosition
+import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
 
@@ -546,15 +546,13 @@ class LogSegment private[log] (val log: FileRecords,
    * @param startingOffset The starting offset to search.
    * @return the timestamp and offset of the first message that meets the requirements. None will be returned if there is no such message.
    */
-  def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = baseOffset): Option[TimestampOffset] = {
+  def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = baseOffset): Option[TimestampAndOffset] = {
     // Get the index entry with a timestamp less than or equal to the target timestamp
     val timestampOffset = timeIndex.lookup(timestamp)
     val position = offsetIndex.lookup(math.max(timestampOffset.offset, startingOffset)).position
 
     // Search the timestamp
-    Option(log.searchForTimestamp(timestamp, position, startingOffset)).map { timestampAndOffset =>
-      TimestampOffset(timestampAndOffset.timestamp, timestampAndOffset.offset)
-    }
+    Option(log.searchForTimestamp(timestamp, position, startingOffset))
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index e3dc921..be6736d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -817,14 +817,20 @@ class KafkaApis(val requestChannel: RequestChannel,
           else
             None
 
-          val found = replicaManager.fetchOffsetForTimestamp(topicPartition,
+          val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
             partitionData.timestamp,
             isolationLevelOpt,
             partitionData.currentLeaderEpoch,
             fetchOnlyFromLeader)
 
-          (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset,
-            Optional.empty()))
+          val response = foundOpt match {
+            case Some(found) =>
+              new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset, found.leaderEpoch)
+            case None =>
+              new ListOffsetResponse.PartitionData(Errors.NONE, ListOffsetResponse.UNKNOWN_TIMESTAMP,
+                ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
+          }
+          (topicPartition, response)
         } catch {
           // NOTE: These exceptions are special cased since these error messages are typically transient or the client
           // would have received a clear exception and there is no value in logging the entire stack trace for the same
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index e3feb71..1146bef 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -45,6 +45,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 
 import scala.collection.JavaConverters._
 import scala.collection._
@@ -773,7 +774,7 @@ class ReplicaManager(val config: KafkaConfig,
                               timestamp: Long,
                               isolationLevel: Option[IsolationLevel],
                               currentLeaderEpoch: Optional[Integer],
-                              fetchOnlyFromLeader: Boolean): TimestampOffset = {
+                              fetchOnlyFromLeader: Boolean): Option[TimestampAndOffset] = {
     val partition = getPartitionOrException(topicPartition, expectLeader = fetchOnlyFromLeader)
     partition.fetchOffsetForTimestamp(timestamp, isolationLevel, currentLeaderEpoch, fetchOnlyFromLeader)
   }
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index cee6bb6..f47d3bd 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -85,11 +85,9 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
   }
 
   /**
-    * Returns the current Leader Epoch. This is the latest epoch
-    * which has messages assigned to it.
-    *
-    * @return
-    */
+   * Returns the current Leader Epoch. This is the latest epoch
+   * which has messages assigned to it.
+   */
   def latestEpoch: Int = {
     inReadLock(lock) {
       if (epochs.isEmpty) UNDEFINED_EPOCH else epochs.last.epoch
@@ -97,6 +95,15 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
   }
 
   /**
+   * Get the earliest cached entry if one exists.
+   */
+  def earliestEntry: Option[EpochEntry] = {
+    inReadLock(lock) {
+      epochs.headOption
+    }
+  }
+
+  /**
     * Returns the Leader Epoch and the End Offset for a requested Leader Epoch.
     *
     * The Leader Epoch returned is the largest epoch less than or equal to the requested Leader
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 0c63775..42b3984 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1214,16 +1214,27 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     //  topic3Partition0 -> 80,
     //  topic3Partition1 -> 100)
     val timestampOffsets = consumer.offsetsForTimes(timestampsToSearch)
-    assertEquals(0, timestampOffsets.get(new TopicPartition(topic1, 0)).offset())
-    assertEquals(0, timestampOffsets.get(new TopicPartition(topic1, 0)).timestamp())
-    assertEquals(20, timestampOffsets.get(new TopicPartition(topic1, 1)).offset())
-    assertEquals(20, timestampOffsets.get(new TopicPartition(topic1, 1)).timestamp())
+
+    val timestampTopic1P0 = timestampOffsets.get(new TopicPartition(topic1, 0))
+    assertEquals(0, timestampTopic1P0.offset)
+    assertEquals(0, timestampTopic1P0.timestamp)
+    assertEquals(Optional.of(0), timestampTopic1P0.leaderEpoch)
+
+    val timestampTopic1P1 = timestampOffsets.get(new TopicPartition(topic1, 1))
+    assertEquals(20, timestampTopic1P1.offset)
+    assertEquals(20, timestampTopic1P1.timestamp)
+    assertEquals(Optional.of(0), timestampTopic1P1.leaderEpoch)
+
     assertEquals("null should be returned when message format is 0.9.0",
       null, timestampOffsets.get(new TopicPartition(topic2, 0)))
     assertEquals("null should be returned when message format is 0.9.0",
       null, timestampOffsets.get(new TopicPartition(topic2, 1)))
-    assertEquals(80, timestampOffsets.get(new TopicPartition(topic3, 0)).offset())
-    assertEquals(80, timestampOffsets.get(new TopicPartition(topic3, 0)).timestamp())
+
+    val timestampTopic3P0 = timestampOffsets.get(new TopicPartition(topic3, 0))
+    assertEquals(80, timestampTopic3P0.offset)
+    assertEquals(80, timestampTopic3P0.timestamp)
+    assertEquals(Optional.of(0), timestampTopic3P0.leaderEpoch)
+
     assertEquals(null, timestampOffsets.get(new TopicPartition(topic3, 1)))
   }
 
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index b5b271e..a075bd0 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ReplicaNotAvailableException
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest, ListOffsetRequest}
@@ -365,6 +366,21 @@ class PartitionTest {
     assertFetchOffsetError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = true)
   }
 
+  @Test
+  def testFetchLatestOffsetIncludesLeaderEpoch(): Unit = {
+    val leaderEpoch = 5
+    val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
+
+    val timestampAndOffsetOpt = partition.fetchOffsetForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP,
+      isolationLevel = None,
+      currentLeaderEpoch = Optional.empty(),
+      fetchOnlyFromLeader = true)
+
+    assertTrue(timestampAndOffsetOpt.isDefined)
+
+    val timestampAndOffset = timestampAndOffsetOpt.get
+    assertEquals(Optional.of(leaderEpoch), timestampAndOffset.leaderEpoch)
+  }
 
   private def setupPartitionWithMocks(leaderEpoch: Int,
                                       isLeader: Boolean,
@@ -503,18 +519,22 @@ class PartitionTest {
       baseOffset = 0L)
     partition.appendRecordsToLeader(records, isFromClient = true)
 
-    def fetchLatestOffset(isolationLevel: Option[IsolationLevel]): TimestampOffset = {
-      partition.fetchOffsetForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP,
+    def fetchLatestOffset(isolationLevel: Option[IsolationLevel]): TimestampAndOffset = {
+      val res = partition.fetchOffsetForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP,
         isolationLevel = isolationLevel,
         currentLeaderEpoch = Optional.empty(),
         fetchOnlyFromLeader = true)
+      assertTrue(res.isDefined)
+      res.get
     }
 
-    def fetchEarliestOffset(isolationLevel: Option[IsolationLevel]): TimestampOffset = {
-      partition.fetchOffsetForTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP,
+    def fetchEarliestOffset(isolationLevel: Option[IsolationLevel]): TimestampAndOffset = {
+      val res = partition.fetchOffsetForTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP,
         isolationLevel = isolationLevel,
         currentLeaderEpoch = Optional.empty(),
         fetchOnlyFromLeader = true)
+      assertTrue(res.isDefined)
+      res.get
     }
 
     assertEquals(3L, fetchLatestOffset(isolationLevel = None).offset)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 7728998..c37da51 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -20,7 +20,7 @@ package kafka.log
 import java.io._
 import java.nio.ByteBuffer
 import java.nio.file.{Files, Paths}
-import java.util.Properties
+import java.util.{Optional, Properties}
 
 import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
 import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
@@ -30,10 +30,12 @@ import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailure
 import kafka.utils._
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors._
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
+import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.easymock.EasyMock
 import org.junit.Assert._
@@ -1647,13 +1649,52 @@ class LogTest {
     for(i <- 0 until numMessages) {
       assertEquals(i, readLog(log, i, 100).records.batches.iterator.next().lastOffset)
       if (i == 0)
-        assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(mockTime.milliseconds + i * 10).get.offset)
+        assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset)
       else
-        assertEquals(i, log.fetchOffsetsByTimestamp(mockTime.milliseconds + i * 10).get.offset)
+        assertEquals(i, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset)
     }
     log.close()
   }
 
+  @Test
+  def testFetchOffsetByTimestampIncludesLeaderEpoch(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
+    val log = createLog(logDir, logConfig)
+
+    assertEquals(None, log.fetchOffsetByTimestamp(0L))
+
+    val firstTimestamp = mockTime.milliseconds
+    val firstLeaderEpoch = 0
+    log.appendAsLeader(TestUtils.singletonRecords(
+      value = TestUtils.randomBytes(10),
+      timestamp = firstTimestamp),
+      leaderEpoch = firstLeaderEpoch)
+
+    val secondTimestamp = firstTimestamp + 1
+    val secondLeaderEpoch = 1
+    log.appendAsLeader(TestUtils.singletonRecords(
+      value = TestUtils.randomBytes(10),
+      timestamp = secondTimestamp),
+      leaderEpoch = secondLeaderEpoch)
+
+    assertEquals(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))),
+      log.fetchOffsetByTimestamp(firstTimestamp))
+    assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))),
+      log.fetchOffsetByTimestamp(secondTimestamp))
+
+    assertEquals(Some(new TimestampAndOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
+      log.fetchOffsetByTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP))
+    assertEquals(Some(new TimestampAndOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))),
+      log.fetchOffsetByTimestamp(ListOffsetRequest.LATEST_TIMESTAMP))
+
+    // The cache can be updated directly after a leader change.
+    // The new latest offset should reflect the updated epoch.
+    log.leaderEpochCache.assign(2, 2L)
+
+    assertEquals(Some(new TimestampAndOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))),
+      log.fetchOffsetByTimestamp(ListOffsetRequest.LATEST_TIMESTAMP))
+  }
+
   /**
    * Test that if messages format version of the messages in a segment is before 0.10.0, the time index should be empty.
    */
@@ -1715,9 +1756,9 @@ class LogTest {
     for(i <- 0 until numMessages) {
       assertEquals(i, readLog(log, i, 100).records.batches.iterator.next().lastOffset)
       if (i == 0)
-        assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(mockTime.milliseconds + i * 10).get.offset)
+        assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset)
       else
-        assertEquals(i, log.fetchOffsetsByTimestamp(mockTime.milliseconds + i * 10).get.offset)
+        assertEquals(i, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset)
     }
     log.close()
   }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a10800f..dd016f4 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint}
@@ -464,7 +465,7 @@ class KafkaApisTest {
 
     EasyMock.expect(replicaManager.fetchOffsetForTimestamp(tp, ListOffsetRequest.LATEST_TIMESTAMP,
       Some(isolationLevel), currentLeaderEpoch, fetchOnlyFromLeader = true))
-      .andReturn(TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, latestOffset))
+      .andReturn(Some(new TimestampAndOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, latestOffset, currentLeaderEpoch)))
 
     val capturedResponse = expectNoThrottling()
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 7f9b3e4..9c97c1a 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -111,6 +111,50 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch - 1))
   }
 
+  @Test
+  def testResponseIncludesLeaderEpoch(): Unit = {
+    val topic = "topic"
+    val topicPartition = new TopicPartition(topic, 0)
+    val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
+    val firstLeaderId = partitionToLeader(topicPartition.partition)
+
+    TestUtils.generateAndProduceMessages(servers, topic, 10)
+
+    def fetchOffsetAndEpoch(serverId: Int,
+                            timestamp: Long): (Long, Int) = {
+      val targetTimes = Map(topicPartition -> new ListOffsetRequest.PartitionData(
+        timestamp, Optional.empty[Integer]())).asJava
+
+      val request = ListOffsetRequest.Builder
+        .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+        .setTargetTimes(targetTimes)
+        .build()
+
+      val response = sendRequest(serverId, request)
+      val partitionData = response.responseData.get(topicPartition)
+      val epochOpt = partitionData.leaderEpoch
+      assertTrue(epochOpt.isPresent)
+
+      (partitionData.offset, epochOpt.get)
+    }
+
+    assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L))
+    assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP))
+    assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP))
+
+    // Kill the first leader so that we can verify the epoch change when fetching the latest offset
+    killBroker(firstLeaderId)
+    val secondLeaderId = TestUtils.awaitLeaderChange(servers, topicPartition, firstLeaderId)
+    val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, topicPartition, servers)
+
+    // No changes to written data
+    assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L))
+    assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP))
+
+    // The latest offset reflects the updated epoch
+    assertEquals((10L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetRequest.LATEST_TIMESTAMP))
+  }
+
   private def assertResponseError(error: Errors, brokerId: Int, request: ListOffsetRequest): Unit = {
     val response = sendRequest(brokerId, request)
     assertEquals(request.partitionTimestamps.size, response.responseData.size)