You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/09/11 17:25:58 UTC

[kafka] branch 2.0 updated (f5b3f06 -> abf45d1)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a change to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from f5b3f06  KAFKA-7385; Fix log cleaner behavior when only empty batches are retained (#5623)
     new 518758f  KAFKA-7286; Avoid getting stuck loading large metadata records (#5500)
     new abf45d1  KAFKA-7044; Fix Fetcher.fetchOffsetsByTimes and NPE in describe consumer group (#5627)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 checkstyle/suppressions.xml                        |  2 +-
 .../kafka/clients/consumer/internals/Fetcher.java  | 28 ++++++++++----
 .../clients/consumer/internals/FetcherTest.java    | 43 ++++++++++++++++++++++
 .../scala/kafka/admin/ConsumerGroupCommand.scala   | 20 +++++-----
 .../coordinator/group/GroupMetadataManager.scala   | 19 +++++++++-
 .../transaction/TransactionStateManager.scala      | 16 +++++++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  4 +-
 .../group/GroupMetadataManagerTest.scala           | 37 ++++++++++++++++++-
 .../TransactionCoordinatorConcurrencyTest.scala    |  2 +
 .../transaction/TransactionStateManagerTest.scala  |  2 +
 10 files changed, 149 insertions(+), 24 deletions(-)


[kafka] 01/02: KAFKA-7286; Avoid getting stuck loading large metadata records (#5500)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 518758fdcefbbf16efce0f3f1d1b36f8551b1161
Author: Flavien Raynaud <fl...@gmail.com>
AuthorDate: Sun Sep 9 08:05:49 2018 +0100

    KAFKA-7286; Avoid getting stuck loading large metadata records (#5500)
    
    If a group metadata record size is higher than offsets.load.buffer.size,
    loading offsets and group metadata from __consumer_offsets would hang
    forever. This was due to the buffer being too small to fit any message
    bigger than the maximum configuration. This patch grows the buffer
    as needed so the large records will fit and the loading can move on.
    A similar change was made to the logic for state loading in the transaction
    coordinator.
    
    Reviewers: John Roesler <vv...@users.noreply.github.com>, lambdaliu <la...@users.noreply.github.com>, Dhruvil Shah <dh...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
 .../coordinator/group/GroupMetadataManager.scala   | 19 +++++++++--
 .../transaction/TransactionStateManager.scala      | 16 +++++++++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  4 +--
 .../group/GroupMetadataManagerTest.scala           | 37 +++++++++++++++++++++-
 .../TransactionCoordinatorConcurrencyTest.scala    |  2 ++
 .../transaction/TransactionStateManagerTest.scala  |  2 ++
 6 files changed, 74 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 02ba13a..4830a35 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -517,7 +517,9 @@ class GroupMetadataManager(brokerId: Int,
 
       case Some(log) =>
         var currOffset = log.logStartOffset
-        lazy val buffer = ByteBuffer.allocate(config.loadBufferSize)
+
+        // buffer may not be needed if records are read from memory
+        var buffer = ByteBuffer.allocate(0)
 
         // loop breaks if leader changes at any time during the load, since getHighWatermark is -1
         val loadedOffsets = mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]()
@@ -531,7 +533,20 @@ class GroupMetadataManager(brokerId: Int,
           val memRecords = fetchDataInfo.records match {
             case records: MemoryRecords => records
             case fileRecords: FileRecords =>
-              buffer.clear()
+              val sizeInBytes = fileRecords.sizeInBytes
+              val bytesNeeded = Math.max(config.loadBufferSize, sizeInBytes)
+
+              // minOneMessage = true in the above log.read means that the buffer may need to be grown to ensure progress can be made
+              if (buffer.capacity < bytesNeeded) {
+                if (config.loadBufferSize < bytesNeeded)
+                  warn(s"Loaded offsets and group metadata from $topicPartition with buffer larger ($bytesNeeded bytes) than " +
+                    s"configured offsets.load.buffer.size (${config.loadBufferSize} bytes)")
+
+                buffer = ByteBuffer.allocate(bytesNeeded)
+              } else {
+                buffer.clear()
+              }
+
               fileRecords.readInto(buffer, 0)
               MemoryRecords.readableRecords(buffer)
           }
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index a358515..50d96c3 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -296,7 +296,8 @@ class TransactionStateManager(brokerId: Int,
         warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log")
 
       case Some(log) =>
-        lazy val buffer = ByteBuffer.allocate(config.transactionLogLoadBufferSize)
+        // buffer may not be needed if records are read from memory
+        var buffer = ByteBuffer.allocate(0)
 
         // loop breaks if leader changes at any time during the load, since getHighWatermark is -1
         var currOffset = log.logStartOffset
@@ -311,6 +312,19 @@ class TransactionStateManager(brokerId: Int,
             val memRecords = fetchDataInfo.records match {
               case records: MemoryRecords => records
               case fileRecords: FileRecords =>
+                val sizeInBytes = fileRecords.sizeInBytes
+                val bytesNeeded = Math.max(config.transactionLogLoadBufferSize, sizeInBytes)
+
+                // minOneMessage = true in the above log.read means that the buffer may need to be grown to ensure progress can be made
+                if (buffer.capacity < bytesNeeded) {
+                  if (config.transactionLogLoadBufferSize < bytesNeeded)
+                    warn(s"Loaded offsets and group metadata from $topicPartition with buffer larger ($bytesNeeded bytes) than " +
+                      s"configured transaction.state.log.load.buffer.size (${config.transactionLogLoadBufferSize} bytes)")
+
+                  buffer = ByteBuffer.allocate(bytesNeeded)
+                } else {
+                  buffer.clear()
+                }
                 buffer.clear()
                 fileRecords.readInto(buffer, 0)
                 MemoryRecords.readableRecords(buffer)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2760def..744442f 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -651,7 +651,7 @@ object KafkaConfig {
   val GroupInitialRebalanceDelayMsDoc = "The amount of time the group coordinator will wait for more consumers to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins."
   /** ********* Offset management configuration ***********/
   val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry associated with an offset commit"
-  val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets segments when loading offsets into the cache."
+  val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets segments when loading offsets into the cache (soft-limit, overridden if records are too large)."
   val OffsetsTopicReplicationFactorDoc = "The replication factor for the offsets topic (set higher to ensure availability). " +
   "Internal topic creation will fail until the cluster size meets this replication factor requirement."
   val OffsetsTopicPartitionsDoc = "The number of partitions for the offset commit topic (should not change after deployment)"
@@ -667,7 +667,7 @@ object KafkaConfig {
   val TransactionsMaxTimeoutMsDoc = "The maximum allowed timeout for transactions. " +
     "If a client’s requested transaction time exceed this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction."
   val TransactionsTopicMinISRDoc = "Overridden " + MinInSyncReplicasProp + " config for the transaction topic."
-  val TransactionsLoadBufferSizeDoc = "Batch size for reading from the transaction log segments when loading producer ids and transactions into the cache."
+  val TransactionsLoadBufferSizeDoc = "Batch size for reading from the transaction log segments when loading producer ids and transactions into the cache (soft-limit, overridden if records are too large)."
   val TransactionsTopicReplicationFactorDoc = "The replication factor for the transaction topic (set higher to ensure availability). " +
     "Internal topic creation will fail until the cluster size meets this replication factor requirement."
   val TransactionsTopicPartitionsDoc = "The number of partitions for the transaction topic (should not change after deployment)."
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 3bfacab..9c8ef2f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -640,6 +640,38 @@ class GroupMetadataManagerTest {
   }
 
   @Test
+  def testLoadGroupWithLargeGroupMetadataRecord() {
+    val groupMetadataTopicPartition = groupTopicPartition
+    val startOffset = 15L
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    // create a GroupMetadata record larger then offsets.load.buffer.size (here at least 16 bytes larger)
+    val assignmentSize = OffsetConfig.DefaultLoadBufferSize + 16
+    val memberId = "98098230493"
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15,
+      protocolType = "consumer", protocol = "range", memberId, assignmentSize)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+
+    expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
+  @Test
   def testOffsetWriteAfterGroupRemoved(): Unit = {
     // this test case checks the following scenario:
     // 1. the group exists at some point in time, but is later removed (because all members left)
@@ -1452,7 +1484,8 @@ class GroupMetadataManagerTest {
   private def buildStableGroupRecordWithMember(generation: Int,
                                                protocolType: String,
                                                protocol: String,
-                                               memberId: String): SimpleRecord = {
+                                               memberId: String,
+                                               assignmentSize: Int = 0): SimpleRecord = {
     val memberProtocols = List((protocol, Array.emptyByteArray))
     val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000, protocolType, memberProtocols)
     val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol,
@@ -1495,6 +1528,8 @@ class GroupMetadataManagerTest {
       EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
       .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
 
+    EasyMock.expect(fileRecordsMock.sizeInBytes()).andStubReturn(records.sizeInBytes)
+
     val bufferCapture = EasyMock.newCapture[ByteBuffer]
     fileRecordsMock.readInto(EasyMock.capture(bufferCapture), EasyMock.anyInt())
     EasyMock.expectLastCall().andAnswer(new IAnswer[Unit] {
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 873b88d..060e07e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -259,6 +259,8 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
       EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
       .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
 
+    EasyMock.expect(fileRecordsMock.sizeInBytes()).andStubReturn(records.sizeInBytes)
+
     val bufferCapture = EasyMock.newCapture[ByteBuffer]
     fileRecordsMock.readInto(EasyMock.capture(bufferCapture), EasyMock.anyInt())
     EasyMock.expectLastCall().andAnswer(new IAnswer[Unit] {
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 34b82d9..74bbe33 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -586,6 +586,8 @@ class TransactionStateManagerTest {
       EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
       .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
 
+    EasyMock.expect(fileRecordsMock.sizeInBytes()).andStubReturn(records.sizeInBytes)
+
     val bufferCapture = EasyMock.newCapture[ByteBuffer]
     fileRecordsMock.readInto(EasyMock.capture(bufferCapture), EasyMock.anyInt())
     EasyMock.expectLastCall().andAnswer(new IAnswer[Unit] {


[kafka] 02/02: KAFKA-7044; Fix Fetcher.fetchOffsetsByTimes and NPE in describe consumer group (#5627)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit abf45d185d5848e017b8aadff4cbda133421d1f8
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Tue Sep 11 10:10:42 2018 -0700

    KAFKA-7044; Fix Fetcher.fetchOffsetsByTimes and NPE in describe consumer group (#5627)
    
    A call to `kafka-consumer-groups --describe --group ...` can result in NullPointerException for two reasons:
    1)  `Fetcher.fetchOffsetsByTimes()` may return too early, without sending list offsets request for topic partitions that are not in cached metadata.
    2) `ConsumerGroupCommand.getLogEndOffsets()` and `getLogStartOffsets()` assumed that endOffsets()/beginningOffsets() which eventually call Fetcher.fetchOffsetsByTimes(), would return a map with all the topic partitions passed to endOffsets()/beginningOffsets() and that values are not null. Because of (1), null values were possible if some of the topic partitions were already known (in metadata cache) and some not (metadata cache did not have entries for some of the topic partitions).  [...]
    
    Testing:
    -- added unit test to verify fix in Fetcher.fetchOffsetsByTimes()
    -- did some manual testing with `kafka-consumer-groups --describe`, causing NPE. Was not able to reproduce any NPE cases with DescribeConsumerGroupTest.scala,
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../kafka/clients/consumer/internals/Fetcher.java  | 28 ++++++++++----
 .../clients/consumer/internals/FetcherTest.java    | 43 ++++++++++++++++++++++
 .../scala/kafka/admin/ConsumerGroupCommand.scala   | 20 +++++-----
 4 files changed, 75 insertions(+), 18 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index e80d5bf..8099324 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -70,7 +70,7 @@
               files="MockAdminClient.java"/>
 
     <suppress checks="JavaNCSS"
-              files="RequestResponseTest.java"/>
+              files="RequestResponseTest.java|FetcherTest.java"/>
 
     <suppress checks="NPathComplexity"
               files="MemoryRecordsTest|MetricsTest"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index dd412ab..d1ec117 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -422,7 +422,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                 if (value.partitionsToRetry.isEmpty())
                     return result;
 
-                remainingToSearch.keySet().removeAll(result.fetchedOffsets.keySet());
+                remainingToSearch.keySet().retainAll(value.partitionsToRetry);
             } else if (!future.isRetriable()) {
                 throw future.exception();
             }
@@ -590,7 +590,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         for (TopicPartition tp : partitionResetTimestamps.keySet())
             metadata.add(tp.topic());
 
-        Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = groupListOffsetRequests(partitionResetTimestamps);
+        Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode =
+                groupListOffsetRequests(partitionResetTimestamps, new HashSet<>());
         for (Map.Entry<Node, Map<TopicPartition, Long>> entry : timestampsToSearchByNode.entrySet()) {
             Node node = entry.getKey();
             final Map<TopicPartition, Long> resetTimestamps = entry.getValue();
@@ -639,18 +640,19 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         for (TopicPartition tp : timestampsToSearch.keySet())
             metadata.add(tp.topic());
 
-        Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = groupListOffsetRequests(timestampsToSearch);
+        final Set<TopicPartition> partitionsToRetry = new HashSet<>();
+        Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode =
+                groupListOffsetRequests(timestampsToSearch, partitionsToRetry);
         if (timestampsToSearchByNode.isEmpty())
             return RequestFuture.failure(new StaleMetadataException());
 
         final RequestFuture<ListOffsetResult> listOffsetRequestsFuture = new RequestFuture<>();
         final Map<TopicPartition, OffsetData> fetchedTimestampOffsets = new HashMap<>();
-        final Set<TopicPartition> partitionsToRetry = new HashSet<>();
         final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size());
 
         for (Map.Entry<Node, Map<TopicPartition, Long>> entry : timestampsToSearchByNode.entrySet()) {
             RequestFuture<ListOffsetResult> future =
-                    sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps);
+                sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps);
             future.addListener(new RequestFutureListener<ListOffsetResult>() {
                 @Override
                 public void onSuccess(ListOffsetResult partialResult) {
@@ -677,7 +679,16 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         return listOffsetRequestsFuture;
     }
 
-    private Map<Node, Map<TopicPartition, Long>> groupListOffsetRequests(Map<TopicPartition, Long> timestampsToSearch) {
+        /**
+         * Groups timestamps to search by node for topic partitions in `timestampsToSearch` that have
+         * leaders available. Topic partitions from `timestampsToSearch` that do not have their leader
+         * available are added to `partitionsToRetry`
+         * @param timestampsToSearch The mapping from partitions ot the target timestamps
+         * @param partitionsToRetry A set of topic partitions that will be extended with partitions
+         *                          that need metadata update or re-connect to the leader.
+         */
+    private Map<Node, Map<TopicPartition, Long>> groupListOffsetRequests(
+            Map<TopicPartition, Long> timestampsToSearch, Set<TopicPartition> partitionsToRetry) {
         final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = new HashMap<>();
         for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
             TopicPartition tp  = entry.getKey();
@@ -686,9 +697,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                 metadata.add(tp.topic());
                 log.debug("Leader for partition {} is unknown for fetching offset", tp);
                 metadata.requestUpdate();
+                partitionsToRetry.add(tp);
             } else if (info.leader() == null) {
                 log.debug("Leader for partition {} is unavailable for fetching offset", tp);
                 metadata.requestUpdate();
+                partitionsToRetry.add(tp);
             } else if (client.isUnavailable(info.leader())) {
                 client.maybeThrowAuthFailure(info.leader());
 
@@ -696,7 +709,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                 // try again. No need to request a metadata update since the disconnect will have
                 // done so already.
                 log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires",
-                        info.leader(), tp);
+                          info.leader(), tp);
+                partitionsToRetry.add(tp);
             } else {
                 Node node = info.leader();
                 Map<TopicPartition, Long> topicData = timestampsToSearchByNode.get(node);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index a734b3e..b67d48e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -108,6 +108,7 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -1838,6 +1839,48 @@ public class FetcherTest {
         testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L);
     }
 
+    @Test
+    public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersNotKnownInitially() {
+        final String anotherTopic = "another-topic";
+        final TopicPartition t2p0 = new TopicPartition(anotherTopic, 0);
+
+        client.reset();
+
+        // Metadata initially has one topic
+        Cluster cluster = TestUtils.clusterWith(3, topicName, 2);
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        // The first metadata refresh should contain one topic
+        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet(), false);
+        client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, 1000L, 11L), cluster.leaderFor(tp0));
+        client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, 1000L, 32L), cluster.leaderFor(tp1));
+
+        // Second metadata refresh should contain two topics
+        Map<String, Integer> partitionNumByTopic = new HashMap<>();
+        partitionNumByTopic.put(topicName, 2);
+        partitionNumByTopic.put(anotherTopic, 1);
+        Cluster updatedCluster = TestUtils.clusterWith(3, partitionNumByTopic);
+        client.prepareMetadataUpdate(updatedCluster, Collections.<String>emptySet(), false);
+        client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, 1000L, 54L), cluster.leaderFor(t2p0));
+
+        Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
+        timestampToSearch.put(tp0, ListOffsetRequest.LATEST_TIMESTAMP);
+        timestampToSearch.put(tp1, ListOffsetRequest.LATEST_TIMESTAMP);
+        timestampToSearch.put(t2p0, ListOffsetRequest.LATEST_TIMESTAMP);
+        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
+            fetcher.offsetsByTimes(timestampToSearch, Long.MAX_VALUE);
+
+        assertNotNull("Expect Fetcher.offsetsByTimes() to return non-null result for " + tp0,
+                      offsetAndTimestampMap.get(tp0));
+        assertNotNull("Expect Fetcher.offsetsByTimes() to return non-null result for " + tp1,
+                      offsetAndTimestampMap.get(tp1));
+        assertNotNull("Expect Fetcher.offsetsByTimes() to return non-null result for " + t2p0,
+                      offsetAndTimestampMap.get(t2p0));
+        assertEquals(11L, offsetAndTimestampMap.get(tp0).offset());
+        assertEquals(32L, offsetAndTimestampMap.get(tp1).offset());
+        assertEquals(54L, offsetAndTimestampMap.get(t2p0).offset());
+    }
+
     @Test(expected = TimeoutException.class)
     public void testBatchedListOffsetsMetadataErrors() {
         Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index d5a57ee..2b5da4f 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -288,12 +288,8 @@ object ConsumerGroupCommand extends Logging {
       }
 
       getLogEndOffsets(topicPartitions).map {
-        logEndOffsetResult =>
-          logEndOffsetResult._2 match {
-            case LogOffsetResult.LogOffset(logEndOffset) => getDescribePartitionResult(logEndOffsetResult._1, Some(logEndOffset))
-            case LogOffsetResult.Unknown => getDescribePartitionResult(logEndOffsetResult._1, None)
-            case LogOffsetResult.Ignore => null
-          }
+        case (topicPartition, LogOffsetResult.LogOffset(offset)) => getDescribePartitionResult(topicPartition, Some(offset))
+        case (topicPartition, _) => getDescribePartitionResult(topicPartition, None)
       }.toArray
     }
 
@@ -353,16 +349,20 @@ object ConsumerGroupCommand extends Logging {
     private def getLogEndOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = {
       val offsets = getConsumer.endOffsets(topicPartitions.asJava)
       topicPartitions.map { topicPartition =>
-        val logEndOffset = offsets.get(topicPartition)
-        topicPartition -> LogOffsetResult.LogOffset(logEndOffset)
+        Option(offsets.get(topicPartition)) match {
+          case Some(logEndOffset) => topicPartition -> LogOffsetResult.LogOffset(logEndOffset)
+          case _ => topicPartition -> LogOffsetResult.Unknown
+        }
       }.toMap
     }
 
     private def getLogStartOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = {
       val offsets = getConsumer.beginningOffsets(topicPartitions.asJava)
       topicPartitions.map { topicPartition =>
-        val logStartOffset = offsets.get(topicPartition)
-        topicPartition -> LogOffsetResult.LogOffset(logStartOffset)
+        Option(offsets.get(topicPartition)) match {
+          case Some(logStartOffset) => topicPartition -> LogOffsetResult.LogOffset(logStartOffset)
+          case _ => topicPartition -> LogOffsetResult.Unknown
+        }
       }.toMap
     }