You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/10/18 22:30:44 UTC
samza git commit: SAMZA-1465: Performance regression for
KafkaCheckpointManager
Repository: samza
Updated Branches:
refs/heads/master 343712e30 -> fd9f0802f
SAMZA-1465: Performance regression for KafkaCheckpointManager
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>, Boris Shkolnik <bo...@apache.org>, Fred Ji <fr...@yahoo.com>
Closes #331 from jmakes/master
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fd9f0802
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fd9f0802
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fd9f0802
Branch: refs/heads/master
Commit: fd9f0802f9617b03542ee4f42ab8966e6c6df145
Parents: 343712e
Author: Jacob Maes <jm...@linkedin.com>
Authored: Wed Oct 18 15:30:34 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Wed Oct 18 15:30:34 2017 -0700
----------------------------------------------------------------------
.../kafka/KafkaCheckpointManager.scala | 105 +++++++------------
1 file changed, 40 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/fd9f0802/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 4eb6666..75b4700 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -43,6 +43,9 @@ import scala.collection.mutable
* keyed to that taskName. If there is no such message, no checkpoint data
* exists. The underlying log has a single partition into which all
* checkpoints and TaskName to changelog partition mappings are written.
+ *
+ * This class is thread safe for writing but not for reading checkpoints.
+ * This is currently OK since checkpoints are only read on the main thread.
*/
class KafkaCheckpointManager(
clientId: String,
@@ -64,14 +67,14 @@ class KafkaCheckpointManager(
checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging {
var taskNames = Set[TaskName]()
- @volatile var systemProducer: SystemProducer = null
+ @volatile var systemProducer: SystemProducer = null
+ var systemConsumer: SystemConsumer = null
var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
val systemAdmin = getSystemAdmin()
val kafkaUtil: KafkaUtil = new KafkaUtil(retryBackoff, connectZk)
-
KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString)
info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName))
@@ -103,7 +106,7 @@ class KafkaCheckpointManager(
systemProducer.send(taskName.getTaskName, envelope)
systemProducer.flush(taskName.getTaskName) // make sure it is written
- info("Completed writing checkpoint=%s into %s topic for system %s." format(checkpoint, checkpointTopic, systemName) )
+ debug("Completed writing checkpoint=%s into %s topic for system %s." format(checkpoint, checkpointTopic, systemName) )
loop.done
},
@@ -181,73 +184,40 @@ class KafkaCheckpointManager(
*/
private def readLog(shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = {
-
- val UNKNOWN_OFFSET = "-1"
- var attempts = 10
- val POLL_TIMEOUT = 1000L
+ info("Reading from checkpoint system:%s topic:%s" format(systemName, checkpointTopic))
val ssp: SystemStreamPartition = new SystemStreamPartition(systemName, checkpointTopic, new Partition(0))
- val systemConsumer = getSystemConsumer()
- val partitionMetadata = getSSPMetadata(checkpointTopic, new Partition(0))
- // offsets returned are strings
- val newestOffset = if (partitionMetadata.getNewestOffset == null) UNKNOWN_OFFSET else partitionMetadata.getNewestOffset
- val oldestOffset = partitionMetadata.getOldestOffset
- systemConsumer.register(ssp, oldestOffset) // checkpoint stream should always be read from the beginning
- systemConsumer.start()
- var msgCount = 0
- try {
- val emptyEnvelopes = util.Collections.emptyMap[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]
- // convert offsets to long
- var currentOffset = UNKNOWN_OFFSET.toLong
- val newestOffsetLong = newestOffset.toLong
- val sspToPoll = Collections.singleton(ssp)
- while (currentOffset < newestOffsetLong) {
-
- val envelopes: java.util.Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]] =
- try {
- systemConsumer.poll(sspToPoll, POLL_TIMEOUT)
- } catch {
- case e: Exception => {
- // these exceptions are most likely intermediate
- warn("Got %s exception while polling the consumer for checkpoints." format e)
- if (attempts == 0) throw new SamzaException("Multiple attempts failed while reading the checkpoints. Giving up.", e)
- attempts -= 1
- emptyEnvelopes
- }
- }
+ if (systemConsumer == null) {
+ val partitionMetadata = getSSPMetadata(checkpointTopic, new Partition(0))
+ val oldestOffset = partitionMetadata.getOldestOffset
- val messages: util.List[IncomingMessageEnvelope] = envelopes.get(ssp)
- val messagesNum = if (messages != null) messages.size else 0
- info("CheckpointMgr read %s envelopes (%s messages) from ssp %s. Current offset is %s, newest is %s"
- format (envelopes.size(), messagesNum, ssp, currentOffset, newestOffset))
- if (envelopes.isEmpty || messagesNum <= 0) {
- info("Got empty/null list of messages")
- } else {
- msgCount += messages.size()
- // check the key
- for (msg: IncomingMessageEnvelope <- messages) {
- val key = msg.getKey.asInstanceOf[Array[Byte]]
- currentOffset = msg.getOffset().toLong
- if (key == null) {
- throw new KafkaUtilException("While reading checkpoint (currentOffset=%s) stream encountered message without key."
- format currentOffset)
- }
+ systemConsumer = getSystemConsumer()
+ systemConsumer.register(ssp, oldestOffset)
+ systemConsumer.start()
+ }
- val checkpointKey = KafkaCheckpointLogKey.fromBytes(key)
+ val iterator = new SystemStreamPartitionIterator(systemConsumer, ssp);
+ var msgCount = 0
+ while (iterator.hasNext) {
+ val msg = iterator.next
+ msgCount += 1
+
+ val offset = msg.getOffset
+ val key = msg.getKey.asInstanceOf[Array[Byte]]
+ if (key == null) {
+ throw new KafkaUtilException(
+ "While reading checkpoint (currentOffset=%s) stream encountered message without key." format offset)
+ }
- if (!shouldHandleEntry(checkpointKey)) {
- info("Skipping checkpoint log entry at offset %s with key %s." format(currentOffset, checkpointKey))
- } else {
- // handleEntry requires ByteBuffer
- val checkpointPayload = ByteBuffer.wrap(msg.getMessage.asInstanceOf[Array[Byte]])
- handleEntry(checkpointPayload, checkpointKey)
- }
- }
- }
+ val checkpointKey = KafkaCheckpointLogKey.fromBytes(key)
+
+ if (!shouldHandleEntry(checkpointKey)) {
+ info("Skipping checkpoint log entry at offset %s with key %s." format(offset, checkpointKey))
+ } else {
+ val checkpointPayload = ByteBuffer.wrap(msg.getMessage.asInstanceOf[Array[Byte]])
+ handleEntry(checkpointPayload, checkpointKey)
}
- } finally {
- systemConsumer.stop()
}
info("Done reading %s messages from checkpoint system:%s topic:%s" format(msgCount, systemName, checkpointTopic))
}
@@ -282,12 +252,17 @@ class KafkaCheckpointManager(
def stop = {
- synchronized (
+ synchronized {
if (systemProducer != null) {
systemProducer.stop
systemProducer = null
}
- )
+
+ if (systemConsumer != null) {
+ systemConsumer.stop
+ systemConsumer = null
+ }
+ }
}