You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/04/22 23:19:16 UTC

git commit: SAMZA-220; make systemconsumers faster when consuming from a loarge number of partitions.

Repository: incubator-samza
Updated Branches:
  refs/heads/master 73d604c43 -> 5c5a95c33


SAMZA-220; make systemconsumers faster when consuming from a loarge number of partitions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/5c5a95c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/5c5a95c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/5c5a95c3

Branch: refs/heads/master
Commit: 5c5a95c33b4e254108c25eecd6bf0a7786bd4d4f
Parents: 73d604c
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Apr 22 14:18:54 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Apr 22 14:18:54 2014 -0700

----------------------------------------------------------------------
 .../apache/samza/system/SystemConsumers.scala   | 102 ++++-----
 .../chooser/BufferingMessageChooser.scala       | 208 +++++++++++++++++++
 .../chooser/TestBufferingMessageChooser.scala   | 108 ++++++++++
 3 files changed, 353 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5c5a95c3/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index bbbacb5..7624aef 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -25,6 +25,7 @@ import org.apache.samza.serializers.SerdeManager
 import grizzled.slf4j.Logging
 import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.util.DoublingBackOff
+import org.apache.samza.system.chooser.BufferingMessageChooser
 
 /**
  * The SystemConsumers class coordinates between all SystemConsumers, the
@@ -59,16 +60,7 @@ class SystemConsumers(
   /**
    * The maximum number of messages to poll from a single SystemStreamPartition.
    */
-  maxMsgsPerStreamPartition: Int = 1000,
-
-  /**
-   * A percentage threshold that determines when a SystemStreamPartition
-   * should be polled again. 0.0 means poll for more messages only when
-   * SystemConsumer's buffer is totally empty. 0.2 means poll for more messages
-   * when SystemConsumers' buffer is 80% empty. SystemConsumers' buffer size
-   * is determined by maxMsgsPerStreamPartition.
-   */
-  fetchThresholdPct: Float = 0f,
+  maxMsgsPerStreamPartition: Int = 10000,
 
   /**
    * If MessageChooser returns null when it's polled, SystemConsumers will
@@ -80,18 +72,9 @@ class SystemConsumers(
   noNewMessagesTimeout: Long = 10) extends Logging {
 
   /**
-   * A buffer of incoming messages grouped by SystemStreamPartition.
-   */
-  var unprocessedMessages = Map[SystemStreamPartition, Queue[IncomingMessageEnvelope]]()
-
-  /**
-   * The MessageChooser only gets updated with one message-per-SystemStreamPartition
-   * at a time. The MessageChooser will not receive a second message from the
-   * same SystemStreamPartition until the first message that it received has
-   * been returned to SystemConsumers. This set keeps track of which
-   * SystemStreamPartitions are valid to give to the MessageChooser.
+   * The buffer where SystemConsumers stores all incoming message envelopes.
    */
-  var neededByChooser = Set[SystemStreamPartition]()
+  val buffer = new BufferingMessageChooser(chooser)
 
   /**
    * A map of every SystemStreamPartition that SystemConsumers is responsible
@@ -122,33 +105,26 @@ class SystemConsumers(
   var timeout = noNewMessagesTimeout
 
   /**
-   * Used to determine when the next poll should take place for a given
-   * SystemStreamPartition. SystemConsumers inspects the value of fetchMap for each
-   * SystemStreamPartition, and decides to poll for the SystemStreamPartition
-   * if the fetchMap value is greater than or equal to the
-   * depletedQueueSizeThreshold. For example, suppose the fetchThresholdPct is
-   * 0.2, and the maxMsgsPerStreamPartition is 1000. This would result in
-   * depletedQueueSizeThreshold being 800. This a SystemStreamPartition with a
-   * fetchMap value of 936 (164 messages in the buffer is less than 20% of
-   * 1000) would be polled for more messages, while a SystemStream partition
-   * with a fetchMap value of 548 would not be polled for more messages (452
-   * messages in the buffer is greater than 20% of 1000).
-   */
-  val depletedQueueSizeThreshold = (maxMsgsPerStreamPartition * (1 - fetchThresholdPct)).toInt
-
-  /** 
    * Make the maximum backoff proportional to the number of streams we're consuming.
    * For a few streams, make the max back off 1, but for hundreds make it up to 1k,
    * which experimentally has shown to be the most performant.
    */
   var maxBackOff = 0
-  
+
+  /**
+   * How low totalUnprocessedMessages has to get before the consumers are
+   * polled for more data. This is defined to be 10% of
+   * maxMsgsPerStreamPartition. Since maxMsgsPerStreamPartition defaults to
+   * 10000, the default refreshThreshold is 1000.
+   */
+  val refreshThreshold = maxMsgsPerStreamPartition * .1
+
   debug("Got stream consumers: %s" format consumers)
   debug("Got max messages per stream: %s" format maxMsgsPerStreamPartition)
   debug("Got no new message timeout: %s" format noNewMessagesTimeout)
 
-  metrics.setUnprocessedMessages(() => fetchMap.values.map(maxMsgsPerStreamPartition - _.intValue).sum)
-  metrics.setNeededByChooser(() => neededByChooser.size)
+  metrics.setUnprocessedMessages(() => buffer.unprocessedMessages.size)
+  metrics.setNeededByChooser(() => buffer.neededByChooser.size)
   metrics.setTimeout(() => timeout)
   metrics.setMaxMessagesPerStreamPartition(() => maxMsgsPerStreamPartition)
   metrics.setNoNewMessagesTimeout(() => noNewMessagesTimeout)
@@ -159,14 +135,14 @@ class SystemConsumers(
     maxBackOff = scala.math.pow(10, scala.math.log10(fetchMap.size).toInt).toInt
 
     debug("Got maxBackOff: " + maxBackOff)
-    
+
     consumers
       .keySet
       .foreach(metrics.registerSystem)
 
     consumers.values.foreach(_.start)
 
-    chooser.start
+    buffer.start
   }
 
   def stop {
@@ -174,18 +150,16 @@ class SystemConsumers(
 
     consumers.values.foreach(_.stop)
 
-    chooser.stop
+    buffer.stop
   }
 
   def register(systemStreamPartition: SystemStreamPartition, offset: String) {
     debug("Registering stream: %s, %s" format (systemStreamPartition, offset))
 
     metrics.registerSystemStream(systemStreamPartition.getSystemStream)
-    neededByChooser += systemStreamPartition
+    buffer.register(systemStreamPartition, offset)
     updateFetchMap(systemStreamPartition, maxMsgsPerStreamPartition)
-    unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]()
     consumers(systemStreamPartition.getSystem).register(systemStreamPartition, offset)
-    chooser.register(systemStreamPartition, offset)
   }
 
   /**
@@ -202,7 +176,7 @@ class SystemConsumers(
   }
 
   def choose: IncomingMessageEnvelope = {
-    val envelopeFromChooser = chooser.choose
+    val envelopeFromChooser = buffer.choose
 
     if (envelopeFromChooser == null) {
       debug("Chooser returned null.")
@@ -219,17 +193,19 @@ class SystemConsumers(
       // Don't block if we have a message to process.
       timeout = 0
 
-      // Ok to give the chooser a new message from this stream.
-      neededByChooser += envelopeFromChooser.getSystemStreamPartition
-
       metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition.getSystemStream).inc
     }
 
-    refresh.maybeCall()
+    // Always refresh if we got nothing from the chooser. Otherwise, just 
+    // refresh when the buffer is low. 
+    if (envelopeFromChooser == null || buffer.totalUnprocessedMessages <= refreshThreshold) {
+      refresh.maybeCall()
+    }
+
     updateMessageChooser
     envelopeFromChooser
   }
-  
+
   /**
    * Poll a system for new messages from SystemStreamPartitions that have
    * dipped below the depletedQueueSizeThreshold threshold.  Return true if
@@ -260,15 +236,13 @@ class SystemConsumers(
     incomingEnvelopes.foreach(envelope => {
       val systemStreamPartition = envelope.getSystemStreamPartition
 
+      buffer.update(serdeManager.fromBytes(envelope))
+
       debug("Got message for: %s, %s" format (systemStreamPartition, envelope))
 
       updateFetchMap(systemStreamPartition, -1)
 
       debug("Updated fetch map for: %s, %s" format (systemStreamPartition, fetchMap))
-
-      unprocessedMessages(envelope.getSystemStreamPartition).enqueue(serdeManager.fromBytes(envelope))
-
-      debug("Updated unprocessed messages for: %s, %s" format (systemStreamPartition, unprocessedMessages))
     })
 
     !incomingEnvelopes.isEmpty
@@ -284,7 +258,7 @@ class SystemConsumers(
     val systemName = systemStreamPartition.getSystem
     var systemFetchMap = systemFetchMapCache.getOrElse(systemName, Map())
 
-    if (fetchSize >= depletedQueueSizeThreshold) {
+    if (fetchSize >= refreshThreshold) {
       systemFetchMap += systemStreamPartition -> fetchSize
     } else {
       systemFetchMap -= systemStreamPartition
@@ -293,18 +267,16 @@ class SystemConsumers(
     fetchMap += systemStreamPartition -> fetchSize
     systemFetchMapCache += systemName -> systemFetchMap
   }
-  
+
   /**
-   * A helper method that updates MessageChooser. This should be called in 
+   * A helper method that updates MessageChooser. This should be called in
    * "choose" method after we try to consume a message from MessageChooser.
    */
   private def updateMessageChooser {
-    neededByChooser.foreach(systemStreamPartition =>
-      // If we have messages for a stream that the chooser needs, then update.
-      if (fetchMap(systemStreamPartition).intValue < maxMsgsPerStreamPartition) {
-        chooser.update(unprocessedMessages(systemStreamPartition).dequeue)
-        updateFetchMap(systemStreamPartition)
-        neededByChooser -= systemStreamPartition
-      })
+    buffer
+      .flush
+      // Let the fetchMap know of any SSPs that were given to the chooser, so 
+      // a new fetch can be triggered if the buffer is low.
+      .foreach(updateFetchMap(_))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5c5a95c3/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala
new file mode 100644
index 0000000..c7ef6ef
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import scala.collection.mutable.Queue
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.IncomingMessageEnvelope
+
+/**
+ * This buffer is responsible for storing new unprocessed
+ * IncomingMessageEnvelopes, feeding the envelopes to the message chooser,
+ * and coordinating with the chooser to pick the next message to be processed.
+ */
+class BufferingMessageChooser(chooser: MessageChooser) extends MessageChooser {
+  import ChooserStatus._
+
+  /**
+   * A buffer of incoming messages grouped by SystemStreamPartition.
+   */
+  var unprocessedMessages = Map[SystemStreamPartition, Queue[IncomingMessageEnvelope]]()
+
+  /**
+   * A count of all messages sitting in SystemConsumers' unprocessedMessages
+   * buffer, and inside the MessageChooser.
+   */
+  var totalUnprocessedMessages = 0
+
+  /**
+   * A map that contains the current status for each SSP that's been
+   * registered to the coordinator.
+   */
+  var statuses = Map[SystemStreamPartition, ChooserStatus]()
+
+  /**
+   * This is a cache of all SSPs that are currently in the "NeededByChooser"
+   * state. It's simply here to improve performance, since it means we don't
+   * need to iterate over all SSPs in the statuses map in order to determine
+   * which SSPs are currently needed by the chooser.
+   */
+  var neededByChooser = Set[SystemStreamPartition]()
+
+  /**
+   * Start the chooser that this buffer is managing.
+   */
+  def start = chooser.start
+
+  /**
+   * Stop the chooser that this buffer is managing.
+   */
+  def stop = chooser.stop
+
+  /**
+   * Register a new SystemStreamPartition with this buffer, as well as the
+   * underlying chooser.
+   */
+  def register(systemStreamPartition: SystemStreamPartition, offset: String) {
+    unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]()
+    chooser.register(systemStreamPartition, offset)
+    setStatus(systemStreamPartition, NeededByChooser)
+  }
+
+  /**
+   * Add a new unprocessed IncomingMessageEnvelope to the buffer. The buffer
+   * will hold on to this envelope, and eventually feed it to the underlying chooser.
+   */
+  def update(envelope: IncomingMessageEnvelope) {
+    val systemStreamPartition = envelope.getSystemStreamPartition
+
+    unprocessedMessages(envelope.getSystemStreamPartition).enqueue(envelope)
+    totalUnprocessedMessages += 1
+
+    if (statuses(systemStreamPartition).equals(SkippingChooser)) {
+      // If we were skipping messages from this SystemStreamPartition, update 
+      // neededByChooser since we've got messages for it now.
+      setStatus(systemStreamPartition, NeededByChooser)
+    }
+  }
+
+  /**
+   * Tell the buffer to update the underlying chooser with any SSPs that the
+   * chooser needs.
+   *
+   * @return A set of SystemStreamPartitions that were updated in the chooser
+   *         as a result of this method invocation.
+   */
+  def flush: Set[SystemStreamPartition] = {
+    var updatedSystemStreamPartitions = Set[SystemStreamPartition]()
+
+    neededByChooser.foreach(systemStreamPartition => {
+      if (unprocessedMessages(systemStreamPartition).size > 0) {
+        // If we have messages for a stream that the chooser needs, then update.
+        chooser.update(unprocessedMessages(systemStreamPartition).dequeue)
+        updatedSystemStreamPartitions += systemStreamPartition
+        setStatus(systemStreamPartition, InChooser)
+      } else {
+        // If we find that we have no messages for this SystemStreamPartition, 
+        // rather than continue trying to update the chooser with this 
+        // SystemStreamPartition, add it to the skip set and remove it from 
+        // the neededByChooser set (see below).
+        setStatus(systemStreamPartition, SkippingChooser)
+      }
+    })
+
+    updatedSystemStreamPartitions
+  }
+
+  /**
+   * Choose a message from the underlying chooser, and return it.
+   *
+   * @return The IncomingMessageEnvelope that the chooser has picked, or null
+   *         if the chooser didn't pick anything.
+   */
+  def choose = {
+    val envelope = chooser.choose
+
+    if (envelope != null) {
+      setStatus(envelope.getSystemStreamPartition, NeededByChooser)
+
+      // Chooser picked a message, so we've got one less unprocessed message.
+      totalUnprocessedMessages -= 1
+    }
+
+    envelope
+  }
+
+  /**
+   * Update the status of a SystemStreamPartition.
+   */
+  private def setStatus(systemStreamPartition: SystemStreamPartition, status: ChooserStatus) {
+    statuses += systemStreamPartition -> status
+
+    if (status.equals(NeededByChooser)) {
+      neededByChooser += systemStreamPartition
+    } else {
+      neededByChooser -= systemStreamPartition
+    }
+  }
+}
+
+/**
+ * ChooserStatus denotes the current state of a SystemStreamPartition for a
+ * MessageChooser. This state is used to improve performance in the buffer.
+ * To update a MessageChooser, we first check if an envelope exists for each
+ * SSP that the buffer needs. If the buffer contains a lot of empty queues,
+ * then the operation of iterating over all needed SSPs, and discovering that
+ * their queues are empty is a waste of time, since they'll remain empty until
+ * a new envelope is added to the queue, which the buffer knows by virtue of
+ * having access to the enqueue method. Instead, we stop checking for an empty
+ * SSP (SkippingChooser), until a new envelope is added via the enqueue method.
+ */
+object ChooserStatus extends Enumeration {
+  type ChooserStatus = Value
+
+  /**
+   * When an envelope has been updated for the MessageChooser, the
+   * SystemStreamPartition for the envelope should be set to the InChooser
+   * state. The state will remain this way until the MessageChooser returns an
+   * envelope with the same SystemStreamPartition, at which point, the
+   * SystemStreamPartition's state should be transitioned to NeededByChooser
+   * (see below).
+   */
+  val InChooser = Value
+
+  /**
+   * If a SystemStreamPartition is not in the InChooser state, and it's
+   * unclear if the buffer has more messages available for the SSP, the SSP
+   * should be in the NeededByChooser state. This state means that the chooser
+   * should be updated with a new message from the SSP, if one is available.
+   */
+  val NeededByChooser = Value
+
+  /**
+   * When a SystemStreamPartition is in the NeededByChooser state, and we try
+   * to update the message chooser with a new envelope from the buffer for the
+   * SSP, there are two potential outcomes. One is that there is an envelope
+   * in the buffer for the SSP. In this case, the state will be transitioned
+   * to InChooser. The other possibility is that there is no envelope in the
+   * buffer at the time the update is trying to occur. In the latter case, the
+   * SSP is transitioned to the SkippingChooser state. This state means that
+   * the buffer will cease to update the chooser with envelopes from this SSP
+   * until a new envelope for the SSP is added to the buffer again.
+   *
+   * <br/><br/>
+   *
+   * The reason that this state exists is purely to improve performance. If we
+   * simply leave SSPs in the NeededByChooser state, we will try and update the
+   * chooser on every updateChooser call for all SSPs. This is a waste of time
+   * if the buffer contains a large number of empty SSPs.
+   */
+  val SkippingChooser = Value
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5c5a95c3/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala
new file mode 100644
index 0000000..c96c53b
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import org.apache.samza.system.chooser.ChooserStatus._
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+import org.junit.Assert._
+import org.junit.Test
+
+class TestBufferingMessageChooser {
+  @Test
+  def testShouldBufferAndFlush {
+    val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+    val chooser = new MockMessageChooser
+    val buffer = new BufferingMessageChooser(chooser)
+    val envelope1 = new IncomingMessageEnvelope(ssp1, "1", null, null)
+    buffer.register(ssp1, "1")
+    assertEquals(1, chooser.registers.size)
+    assertEquals("1", chooser.registers.getOrElse(ssp1, "0"))
+    buffer.start
+    assertEquals(1, chooser.starts)
+    assertEquals(null, buffer.choose)
+    buffer.update(envelope1)
+    // Should buffer this update, rather than passing it to the wrapped chooser.
+    assertEquals(null, buffer.choose)
+    buffer.flush
+    assertEquals(envelope1, buffer.choose)
+    assertEquals(null, buffer.choose)
+    buffer.stop
+    assertEquals(1, chooser.stops)
+  }
+
+  @Test
+  def testBufferShouldSkipCheckedSSPs {
+    val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+    val chooser = new MockMessageChooser
+    val buffer = new BufferingMessageChooser(chooser)
+    val envelope1 = new IncomingMessageEnvelope(ssp1, "1", null, null)
+    buffer.register(ssp1, "1")
+    buffer.start
+    checkChooserStatus(NeededByChooser, ssp1, buffer)
+
+    // Buffer first message. Still needed.
+    buffer.update(envelope1)
+    checkChooserStatus(NeededByChooser, ssp1, buffer)
+    assertEquals(null, buffer.choose)
+    checkChooserStatus(NeededByChooser, ssp1, buffer)
+
+    // Flush first message. Now in chooser.
+    buffer.flush
+    checkChooserStatus(InChooser, ssp1, buffer)
+    assertEquals(envelope1, buffer.choose)
+    checkChooserStatus(NeededByChooser, ssp1, buffer)
+
+    // Now flush with no messages. Should begin skipping chooser since no 
+    // messages are available.
+    assertEquals(null, buffer.choose)
+    checkChooserStatus(NeededByChooser, ssp1, buffer)
+    buffer.flush
+    checkChooserStatus(SkippingChooser, ssp1, buffer)
+
+    // Now check that we can get back to NeededByChooser when a new message 
+    // arrives.
+    buffer.update(envelope1)
+    checkChooserStatus(NeededByChooser, ssp1, buffer)
+    assertEquals(0, chooser.envelopes.size)
+    assertEquals(null, buffer.choose)
+
+    // And check that we can get back into the InChooser state.
+    buffer.flush
+    checkChooserStatus(InChooser, ssp1, buffer)
+    assertEquals(envelope1, buffer.choose)
+    checkChooserStatus(NeededByChooser, ssp1, buffer)
+
+    // Shutdown.
+    buffer.stop
+    assertEquals(1, chooser.stops)
+  }
+
+  private def checkChooserStatus(status: ChooserStatus, systemStreamPartition: SystemStreamPartition, buffer: BufferingMessageChooser) {
+    if (status.equals(NeededByChooser)) {
+      assertEquals(Set(systemStreamPartition), buffer.neededByChooser)
+    } else {
+      assertTrue(!buffer.neededByChooser.contains(systemStreamPartition))
+    }
+
+    assertEquals(status, buffer.statuses.getOrElse(systemStreamPartition, null))
+  }
+}
\ No newline at end of file