You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/04/22 23:19:16 UTC
git commit: SAMZA-220;
make systemconsumers faster when consuming from a loarge number of
partitions.
Repository: incubator-samza
Updated Branches:
refs/heads/master 73d604c43 -> 5c5a95c33
SAMZA-220; make systemconsumers faster when consuming from a loarge number of partitions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/5c5a95c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/5c5a95c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/5c5a95c3
Branch: refs/heads/master
Commit: 5c5a95c33b4e254108c25eecd6bf0a7786bd4d4f
Parents: 73d604c
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Apr 22 14:18:54 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Apr 22 14:18:54 2014 -0700
----------------------------------------------------------------------
.../apache/samza/system/SystemConsumers.scala | 102 ++++-----
.../chooser/BufferingMessageChooser.scala | 208 +++++++++++++++++++
.../chooser/TestBufferingMessageChooser.scala | 108 ++++++++++
3 files changed, 353 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5c5a95c3/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index bbbacb5..7624aef 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -25,6 +25,7 @@ import org.apache.samza.serializers.SerdeManager
import grizzled.slf4j.Logging
import org.apache.samza.system.chooser.MessageChooser
import org.apache.samza.util.DoublingBackOff
+import org.apache.samza.system.chooser.BufferingMessageChooser
/**
* The SystemConsumers class coordinates between all SystemConsumers, the
@@ -59,16 +60,7 @@ class SystemConsumers(
/**
* The maximum number of messages to poll from a single SystemStreamPartition.
*/
- maxMsgsPerStreamPartition: Int = 1000,
-
- /**
- * A percentage threshold that determines when a SystemStreamPartition
- * should be polled again. 0.0 means poll for more messages only when
- * SystemConsumer's buffer is totally empty. 0.2 means poll for more messages
- * when SystemConsumers' buffer is 80% empty. SystemConsumers' buffer size
- * is determined by maxMsgsPerStreamPartition.
- */
- fetchThresholdPct: Float = 0f,
+ maxMsgsPerStreamPartition: Int = 10000,
/**
* If MessageChooser returns null when it's polled, SystemConsumers will
@@ -80,18 +72,9 @@ class SystemConsumers(
noNewMessagesTimeout: Long = 10) extends Logging {
/**
- * A buffer of incoming messages grouped by SystemStreamPartition.
- */
- var unprocessedMessages = Map[SystemStreamPartition, Queue[IncomingMessageEnvelope]]()
-
- /**
- * The MessageChooser only gets updated with one message-per-SystemStreamPartition
- * at a time. The MessageChooser will not receive a second message from the
- * same SystemStreamPartition until the first message that it received has
- * been returned to SystemConsumers. This set keeps track of which
- * SystemStreamPartitions are valid to give to the MessageChooser.
+ * The buffer where SystemConsumers stores all incoming message envelopes.
*/
- var neededByChooser = Set[SystemStreamPartition]()
+ val buffer = new BufferingMessageChooser(chooser)
/**
* A map of every SystemStreamPartition that SystemConsumers is responsible
@@ -122,33 +105,26 @@ class SystemConsumers(
var timeout = noNewMessagesTimeout
/**
- * Used to determine when the next poll should take place for a given
- * SystemStreamPartition. SystemConsumers inspects the value of fetchMap for each
- * SystemStreamPartition, and decides to poll for the SystemStreamPartition
- * if the fetchMap value is greater than or equal to the
- * depletedQueueSizeThreshold. For example, suppose the fetchThresholdPct is
- * 0.2, and the maxMsgsPerStreamPartition is 1000. This would result in
- * depletedQueueSizeThreshold being 800. This a SystemStreamPartition with a
- * fetchMap value of 936 (164 messages in the buffer is less than 20% of
- * 1000) would be polled for more messages, while a SystemStream partition
- * with a fetchMap value of 548 would not be polled for more messages (452
- * messages in the buffer is greater than 20% of 1000).
- */
- val depletedQueueSizeThreshold = (maxMsgsPerStreamPartition * (1 - fetchThresholdPct)).toInt
-
- /**
* Make the maximum backoff proportional to the number of streams we're consuming.
* For a few streams, make the max back off 1, but for hundreds make it up to 1k,
* which experimentally has shown to be the most performant.
*/
var maxBackOff = 0
-
+
+ /**
+ * How low totalUnprocessedMessages has to get before the consumers are
+ * polled for more data. This is defined to be 10% of
+ * maxMsgsPerStreamPartition. Since maxMsgsPerStreamPartition defaults to
+ * 10000, the default refreshThreshold is 1000.
+ */
+ val refreshThreshold = maxMsgsPerStreamPartition * .1
+
debug("Got stream consumers: %s" format consumers)
debug("Got max messages per stream: %s" format maxMsgsPerStreamPartition)
debug("Got no new message timeout: %s" format noNewMessagesTimeout)
- metrics.setUnprocessedMessages(() => fetchMap.values.map(maxMsgsPerStreamPartition - _.intValue).sum)
- metrics.setNeededByChooser(() => neededByChooser.size)
+ metrics.setUnprocessedMessages(() => buffer.unprocessedMessages.size)
+ metrics.setNeededByChooser(() => buffer.neededByChooser.size)
metrics.setTimeout(() => timeout)
metrics.setMaxMessagesPerStreamPartition(() => maxMsgsPerStreamPartition)
metrics.setNoNewMessagesTimeout(() => noNewMessagesTimeout)
@@ -159,14 +135,14 @@ class SystemConsumers(
maxBackOff = scala.math.pow(10, scala.math.log10(fetchMap.size).toInt).toInt
debug("Got maxBackOff: " + maxBackOff)
-
+
consumers
.keySet
.foreach(metrics.registerSystem)
consumers.values.foreach(_.start)
- chooser.start
+ buffer.start
}
def stop {
@@ -174,18 +150,16 @@ class SystemConsumers(
consumers.values.foreach(_.stop)
- chooser.stop
+ buffer.stop
}
def register(systemStreamPartition: SystemStreamPartition, offset: String) {
debug("Registering stream: %s, %s" format (systemStreamPartition, offset))
metrics.registerSystemStream(systemStreamPartition.getSystemStream)
- neededByChooser += systemStreamPartition
+ buffer.register(systemStreamPartition, offset)
updateFetchMap(systemStreamPartition, maxMsgsPerStreamPartition)
- unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]()
consumers(systemStreamPartition.getSystem).register(systemStreamPartition, offset)
- chooser.register(systemStreamPartition, offset)
}
/**
@@ -202,7 +176,7 @@ class SystemConsumers(
}
def choose: IncomingMessageEnvelope = {
- val envelopeFromChooser = chooser.choose
+ val envelopeFromChooser = buffer.choose
if (envelopeFromChooser == null) {
debug("Chooser returned null.")
@@ -219,17 +193,19 @@ class SystemConsumers(
// Don't block if we have a message to process.
timeout = 0
- // Ok to give the chooser a new message from this stream.
- neededByChooser += envelopeFromChooser.getSystemStreamPartition
-
metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition.getSystemStream).inc
}
- refresh.maybeCall()
+ // Always refresh if we got nothing from the chooser. Otherwise, just
+ // refresh when the buffer is low.
+ if (envelopeFromChooser == null || buffer.totalUnprocessedMessages <= refreshThreshold) {
+ refresh.maybeCall()
+ }
+
updateMessageChooser
envelopeFromChooser
}
-
+
/**
* Poll a system for new messages from SystemStreamPartitions that have
* dipped below the depletedQueueSizeThreshold threshold. Return true if
@@ -260,15 +236,13 @@ class SystemConsumers(
incomingEnvelopes.foreach(envelope => {
val systemStreamPartition = envelope.getSystemStreamPartition
+ buffer.update(serdeManager.fromBytes(envelope))
+
debug("Got message for: %s, %s" format (systemStreamPartition, envelope))
updateFetchMap(systemStreamPartition, -1)
debug("Updated fetch map for: %s, %s" format (systemStreamPartition, fetchMap))
-
- unprocessedMessages(envelope.getSystemStreamPartition).enqueue(serdeManager.fromBytes(envelope))
-
- debug("Updated unprocessed messages for: %s, %s" format (systemStreamPartition, unprocessedMessages))
})
!incomingEnvelopes.isEmpty
@@ -284,7 +258,7 @@ class SystemConsumers(
val systemName = systemStreamPartition.getSystem
var systemFetchMap = systemFetchMapCache.getOrElse(systemName, Map())
- if (fetchSize >= depletedQueueSizeThreshold) {
+ if (fetchSize >= refreshThreshold) {
systemFetchMap += systemStreamPartition -> fetchSize
} else {
systemFetchMap -= systemStreamPartition
@@ -293,18 +267,16 @@ class SystemConsumers(
fetchMap += systemStreamPartition -> fetchSize
systemFetchMapCache += systemName -> systemFetchMap
}
-
+
/**
- * A helper method that updates MessageChooser. This should be called in
+ * A helper method that updates MessageChooser. This should be called in
* "choose" method after we try to consume a message from MessageChooser.
*/
private def updateMessageChooser {
- neededByChooser.foreach(systemStreamPartition =>
- // If we have messages for a stream that the chooser needs, then update.
- if (fetchMap(systemStreamPartition).intValue < maxMsgsPerStreamPartition) {
- chooser.update(unprocessedMessages(systemStreamPartition).dequeue)
- updateFetchMap(systemStreamPartition)
- neededByChooser -= systemStreamPartition
- })
+ buffer
+ .flush
+ // Let the fetchMap know of any SSPs that were given to the chooser, so
+ // a new fetch can be triggered if the buffer is low.
+ .foreach(updateFetchMap(_))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5c5a95c3/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala
new file mode 100644
index 0000000..c7ef6ef
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import scala.collection.mutable.Queue
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.IncomingMessageEnvelope
+
+/**
+ * This buffer is responsible for storing new unprocessed
+ * IncomingMessageEnvelopes, feeding the envelopes to the message chooser,
+ * and coordinating with the chooser to pick the next message to be processed.
+ */
+class BufferingMessageChooser(chooser: MessageChooser) extends MessageChooser {
+ import ChooserStatus._
+
+ /**
+ * A buffer of incoming messages grouped by SystemStreamPartition.
+ */
+ var unprocessedMessages = Map[SystemStreamPartition, Queue[IncomingMessageEnvelope]]()
+
+ /**
+ * A count of all messages sitting in SystemConsumers' unprocessedMessages
+ * buffer, and inside the MessageChooser.
+ */
+ var totalUnprocessedMessages = 0
+
+ /**
+ * A map that contains the current status for each SSP that's been
+ * registered to the coordinator.
+ */
+ var statuses = Map[SystemStreamPartition, ChooserStatus]()
+
+ /**
+ * This is a cache of all SSPs that are currently in the "NeededByChooser"
+ * state. It's simply here to improve performance, since it means we don't
+ * need to iterate over all SSPs in the statuses map in order to determine
+ * which SSPs are currently needed by the chooser.
+ */
+ var neededByChooser = Set[SystemStreamPartition]()
+
+ /**
+ * Start the chooser that this buffer is managing.
+ */
+ def start = chooser.start
+
+ /**
+ * Stop the chooser that this buffer is managing.
+ */
+ def stop = chooser.stop
+
+ /**
+ * Register a new SystemStreamPartition with this buffer, as well as the
+ * underlying chooser.
+ */
+ def register(systemStreamPartition: SystemStreamPartition, offset: String) {
+ unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]()
+ chooser.register(systemStreamPartition, offset)
+ setStatus(systemStreamPartition, NeededByChooser)
+ }
+
+ /**
+ * Add a new unprocessed IncomingMessageEnvelope to the buffer. The buffer
+ * will hold on to this envelope, and eventually feed it to the underlying chooser.
+ */
+ def update(envelope: IncomingMessageEnvelope) {
+ val systemStreamPartition = envelope.getSystemStreamPartition
+
+ unprocessedMessages(envelope.getSystemStreamPartition).enqueue(envelope)
+ totalUnprocessedMessages += 1
+
+ if (statuses(systemStreamPartition).equals(SkippingChooser)) {
+ // If we were skipping messages from this SystemStreamPartition, update
+ // neededByChooser since we've got messages for it now.
+ setStatus(systemStreamPartition, NeededByChooser)
+ }
+ }
+
+ /**
+ * Tell the buffer to update the underlying chooser with any SSPs that the
+ * chooser needs.
+ *
+ * @return A set of SystemStreamPartitions that were updated in the chooser
+ * as a result of this method invocation.
+ */
+ def flush: Set[SystemStreamPartition] = {
+ var updatedSystemStreamPartitions = Set[SystemStreamPartition]()
+
+ neededByChooser.foreach(systemStreamPartition => {
+ if (unprocessedMessages(systemStreamPartition).size > 0) {
+ // If we have messages for a stream that the chooser needs, then update.
+ chooser.update(unprocessedMessages(systemStreamPartition).dequeue)
+ updatedSystemStreamPartitions += systemStreamPartition
+ setStatus(systemStreamPartition, InChooser)
+ } else {
+ // If we find that we have no messages for this SystemStreamPartition,
+ // rather than continue trying to update the chooser with this
+ // SystemStreamPartition, add it to the skip set and remove it from
+ // the neededByChooser set (see below).
+ setStatus(systemStreamPartition, SkippingChooser)
+ }
+ })
+
+ updatedSystemStreamPartitions
+ }
+
+ /**
+ * Choose a message from the underlying chooser, and return it.
+ *
+ * @return The IncomingMessageEnvelope that the chooser has picked, or null
+ * if the chooser didn't pick anything.
+ */
+ def choose = {
+ val envelope = chooser.choose
+
+ if (envelope != null) {
+ setStatus(envelope.getSystemStreamPartition, NeededByChooser)
+
+ // Chooser picked a message, so we've got one less unprocessed message.
+ totalUnprocessedMessages -= 1
+ }
+
+ envelope
+ }
+
+ /**
+ * Update the status of a SystemStreamPartition.
+ */
+ private def setStatus(systemStreamPartition: SystemStreamPartition, status: ChooserStatus) {
+ statuses += systemStreamPartition -> status
+
+ if (status.equals(NeededByChooser)) {
+ neededByChooser += systemStreamPartition
+ } else {
+ neededByChooser -= systemStreamPartition
+ }
+ }
+}
+
+/**
+ * ChooserStatus denotes the current state of a SystemStreamPartition for a
+ * MessageChooser. This state is used to improve performance in the buffer.
+ * To update a MessageChooser, we first check if an envelope exists for each
+ * SSP that the buffer needs. If the buffer contains a lot of empty queues,
+ * then the operation of iterating over all needed SSPs, and discovering that
+ * their queues are empty is a waste of time, since they'll remain empty until
+ * a new envelope is added to the queue, which the buffer knows by virtue of
+ * having access to the enqueue method. Instead, we stop checking for an empty
+ * SSP (SkippingChooser), until a new envelope is added via the enqueue method.
+ */
+object ChooserStatus extends Enumeration {
+ type ChooserStatus = Value
+
+ /**
+ * When an envelope has been updated for the MessageChooser, the
+ * SystemStreamPartition for the envelope should be set to the InChooser
+ * state. The state will remain this way until the MessageChooser returns an
+ * envelope with the same SystemStreamPartition, at which point, the
+ * SystemStreamPartition's state should be transitioned to NeededByChooser
+ * (see below).
+ */
+ val InChooser = Value
+
+ /**
+ * If a SystemStreamPartition is not in the InChooser state, and it's
+ * unclear if the buffer has more messages available for the SSP, the SSP
+ * should be in the NeededByChooser state. This state means that the chooser
+ * should be updated with a new message from the SSP, if one is available.
+ */
+ val NeededByChooser = Value
+
+ /**
+ * When a SystemStreamPartition is in the NeededByChooser state, and we try
+ * to update the message chooser with a new envelope from the buffer for the
+ * SSP, there are two potential outcomes. One is that there is an envelope
+ * in the buffer for the SSP. In this case, the state will be transitioned
+ * to InChooser. The other possibility is that there is no envelope in the
+ * buffer at the time the update is trying to occur. In the latter case, the
+ * SSP is transitioned to the SkippingChooser state. This state means that
+ * the buffer will cease to update the chooser with envelopes from this SSP
+ * until a new envelope for the SSP is added to the buffer again.
+ *
+ * <br/><br/>
+ *
+ * The reason that this state exists is purely to improve performance. If we
+ * simply leave SSPs in the NeededByChooser state, we will try and update the
+ * chooser on every updateChooser call for all SSPs. This is a waste of time
+ * if the buffer contains a large number of empty SSPs.
+ */
+ val SkippingChooser = Value
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5c5a95c3/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala
new file mode 100644
index 0000000..c96c53b
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.chooser
+
+import org.apache.samza.system.chooser.ChooserStatus._
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+import org.junit.Assert._
+import org.junit.Test
+
+class TestBufferingMessageChooser {
+ @Test
+ def testShouldBufferAndFlush {
+ val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+ val chooser = new MockMessageChooser
+ val buffer = new BufferingMessageChooser(chooser)
+ val envelope1 = new IncomingMessageEnvelope(ssp1, "1", null, null)
+ buffer.register(ssp1, "1")
+ assertEquals(1, chooser.registers.size)
+ assertEquals("1", chooser.registers.getOrElse(ssp1, "0"))
+ buffer.start
+ assertEquals(1, chooser.starts)
+ assertEquals(null, buffer.choose)
+ buffer.update(envelope1)
+ // Should buffer this update, rather than passing it to the wrapped chooser.
+ assertEquals(null, buffer.choose)
+ buffer.flush
+ assertEquals(envelope1, buffer.choose)
+ assertEquals(null, buffer.choose)
+ buffer.stop
+ assertEquals(1, chooser.stops)
+ }
+
+ @Test
+ def testBufferShouldSkipCheckedSSPs {
+ val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+ val chooser = new MockMessageChooser
+ val buffer = new BufferingMessageChooser(chooser)
+ val envelope1 = new IncomingMessageEnvelope(ssp1, "1", null, null)
+ buffer.register(ssp1, "1")
+ buffer.start
+ checkChooserStatus(NeededByChooser, ssp1, buffer)
+
+ // Buffer first message. Still needed.
+ buffer.update(envelope1)
+ checkChooserStatus(NeededByChooser, ssp1, buffer)
+ assertEquals(null, buffer.choose)
+ checkChooserStatus(NeededByChooser, ssp1, buffer)
+
+ // Flush first message. Now in chooser.
+ buffer.flush
+ checkChooserStatus(InChooser, ssp1, buffer)
+ assertEquals(envelope1, buffer.choose)
+ checkChooserStatus(NeededByChooser, ssp1, buffer)
+
+ // Now flush with no messages. Should begin skipping chooser since no
+ // messages are available.
+ assertEquals(null, buffer.choose)
+ checkChooserStatus(NeededByChooser, ssp1, buffer)
+ buffer.flush
+ checkChooserStatus(SkippingChooser, ssp1, buffer)
+
+ // Now check that we can get back to NeededByChooser when a new message
+ // arrives.
+ buffer.update(envelope1)
+ checkChooserStatus(NeededByChooser, ssp1, buffer)
+ assertEquals(0, chooser.envelopes.size)
+ assertEquals(null, buffer.choose)
+
+ // And check that we can get back into the InChooser state.
+ buffer.flush
+ checkChooserStatus(InChooser, ssp1, buffer)
+ assertEquals(envelope1, buffer.choose)
+ checkChooserStatus(NeededByChooser, ssp1, buffer)
+
+ // Shutdown.
+ buffer.stop
+ assertEquals(1, chooser.stops)
+ }
+
+ private def checkChooserStatus(status: ChooserStatus, systemStreamPartition: SystemStreamPartition, buffer: BufferingMessageChooser) {
+ if (status.equals(NeededByChooser)) {
+ assertEquals(Set(systemStreamPartition), buffer.neededByChooser)
+ } else {
+ assertTrue(!buffer.neededByChooser.contains(systemStreamPartition))
+ }
+
+ assertEquals(status, buffer.statuses.getOrElse(systemStreamPartition, null))
+ }
+}
\ No newline at end of file