You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/10/18 22:30:44 UTC

samza git commit: SAMZA-1465: Performance regression for KafkaCheckpointManager

Repository: samza
Updated Branches:
  refs/heads/master 343712e30 -> fd9f0802f


SAMZA-1465: Performance regression for KafkaCheckpointManager

Author: Jacob Maes <jm...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>, Boris Shkolnik <bo...@apache.org>, Fred Ji <fr...@yahoo.com>

Closes #331 from jmakes/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fd9f0802
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fd9f0802
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fd9f0802

Branch: refs/heads/master
Commit: fd9f0802f9617b03542ee4f42ab8966e6c6df145
Parents: 343712e
Author: Jacob Maes <jm...@linkedin.com>
Authored: Wed Oct 18 15:30:34 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Wed Oct 18 15:30:34 2017 -0700

----------------------------------------------------------------------
 .../kafka/KafkaCheckpointManager.scala          | 105 +++++++------------
 1 file changed, 40 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/fd9f0802/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 4eb6666..75b4700 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -43,6 +43,9 @@ import scala.collection.mutable
  * keyed to that taskName. If there is no such message, no checkpoint data
  * exists.  The underlying log has a single partition into which all
  * checkpoints and TaskName to changelog partition mappings are written.
+ *
+ * This class is thread safe for writing but not for reading checkpoints.
+ * This is currently OK since checkpoints are only read on the main thread.
  */
 class KafkaCheckpointManager(
                               clientId: String,
@@ -64,14 +67,14 @@ class KafkaCheckpointManager(
                               checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging {
 
   var taskNames = Set[TaskName]()
-  @volatile var  systemProducer: SystemProducer = null
+  @volatile var systemProducer: SystemProducer = null
+  var systemConsumer: SystemConsumer = null
   var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
   val systemAdmin = getSystemAdmin()
 
   val kafkaUtil: KafkaUtil = new KafkaUtil(retryBackoff, connectZk)
 
 
-
   KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString)
 
   info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName))
@@ -103,7 +106,7 @@ class KafkaCheckpointManager(
 
         systemProducer.send(taskName.getTaskName, envelope)
         systemProducer.flush(taskName.getTaskName) // make sure it is written
-        info("Completed writing checkpoint=%s into %s topic for system %s." format(checkpoint, checkpointTopic, systemName) )
+        debug("Completed writing checkpoint=%s into %s topic for system %s." format(checkpoint, checkpointTopic, systemName) )
         loop.done
       },
 
@@ -181,73 +184,40 @@ class KafkaCheckpointManager(
    */
   private def readLog(shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
                       handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = {
-
-    val UNKNOWN_OFFSET = "-1"
-    var attempts = 10
-    val POLL_TIMEOUT = 1000L
+    info("Reading from checkpoint system:%s topic:%s" format(systemName, checkpointTopic))
 
     val ssp: SystemStreamPartition = new SystemStreamPartition(systemName, checkpointTopic, new Partition(0))
-    val systemConsumer = getSystemConsumer()
-    val partitionMetadata = getSSPMetadata(checkpointTopic, new Partition(0))
-    // offsets returned are strings
-    val newestOffset = if (partitionMetadata.getNewestOffset == null) UNKNOWN_OFFSET else partitionMetadata.getNewestOffset
-    val oldestOffset = partitionMetadata.getOldestOffset
-    systemConsumer.register(ssp, oldestOffset) // checkpoint stream should always be read from the beginning
-    systemConsumer.start()
 
-    var msgCount = 0
-    try {
-      val emptyEnvelopes = util.Collections.emptyMap[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]
-      // convert offsets to long
-      var currentOffset = UNKNOWN_OFFSET.toLong
-      val newestOffsetLong = newestOffset.toLong
-      val sspToPoll = Collections.singleton(ssp)
-      while (currentOffset < newestOffsetLong) {
-
-        val envelopes: java.util.Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]] =
-        try {
-          systemConsumer.poll(sspToPoll, POLL_TIMEOUT)
-        } catch {
-          case e: Exception => {
-            // these exceptions are most likely intermediate
-            warn("Got %s exception while polling the consumer for checkpoints." format e)
-            if (attempts == 0) throw new SamzaException("Multiple attempts failed while reading the checkpoints. Giving up.", e)
-            attempts -= 1
-            emptyEnvelopes
-          }
-        }
+    if (systemConsumer == null) {
+      val partitionMetadata = getSSPMetadata(checkpointTopic, new Partition(0))
+      val oldestOffset = partitionMetadata.getOldestOffset
 
-        val messages: util.List[IncomingMessageEnvelope] = envelopes.get(ssp)
-        val messagesNum = if (messages != null) messages.size else 0
-        info("CheckpointMgr read %s envelopes (%s messages) from ssp %s. Current offset is %s, newest is %s"
-                     format (envelopes.size(), messagesNum, ssp, currentOffset, newestOffset))
-        if (envelopes.isEmpty || messagesNum <= 0) {
-          info("Got empty/null list of messages")
-        } else {
-          msgCount += messages.size()
-          // check the key
-          for (msg: IncomingMessageEnvelope <- messages) {
-            val key = msg.getKey.asInstanceOf[Array[Byte]]
-            currentOffset = msg.getOffset().toLong
-            if (key == null) {
-              throw new KafkaUtilException("While reading checkpoint (currentOffset=%s) stream encountered message without key."
-                                                   format currentOffset)
-            }
+      systemConsumer = getSystemConsumer()
+      systemConsumer.register(ssp, oldestOffset)
+      systemConsumer.start()
+    }
 
-            val checkpointKey = KafkaCheckpointLogKey.fromBytes(key)
+    val iterator =  new SystemStreamPartitionIterator(systemConsumer, ssp);
+    var msgCount = 0
+    while (iterator.hasNext) {
+      val msg = iterator.next
+      msgCount += 1
+
+      val offset = msg.getOffset
+      val key = msg.getKey.asInstanceOf[Array[Byte]]
+      if (key == null) {
+        throw new KafkaUtilException(
+          "While reading checkpoint (currentOffset=%s) stream encountered message without key." format offset)
+      }
 
-            if (!shouldHandleEntry(checkpointKey)) {
-              info("Skipping checkpoint log entry at offset %s with key %s." format(currentOffset, checkpointKey))
-            } else {
-              // handleEntry requires ByteBuffer
-              val checkpointPayload = ByteBuffer.wrap(msg.getMessage.asInstanceOf[Array[Byte]])
-              handleEntry(checkpointPayload, checkpointKey)
-            }
-          }
-        }
+      val checkpointKey = KafkaCheckpointLogKey.fromBytes(key)
+
+      if (!shouldHandleEntry(checkpointKey)) {
+        info("Skipping checkpoint log entry at offset %s with key %s." format(offset, checkpointKey))
+      } else {
+        val checkpointPayload = ByteBuffer.wrap(msg.getMessage.asInstanceOf[Array[Byte]])
+        handleEntry(checkpointPayload, checkpointKey)
       }
-    } finally {
-      systemConsumer.stop()
     }
     info("Done reading %s messages from checkpoint system:%s topic:%s" format(msgCount, systemName, checkpointTopic))
   }
@@ -282,12 +252,17 @@ class KafkaCheckpointManager(
 
 
   def stop = {
-    synchronized (
+    synchronized {
       if (systemProducer != null) {
         systemProducer.stop
         systemProducer = null
       }
-    )
+
+      if (systemConsumer != null) {
+        systemConsumer.stop
+        systemConsumer = null
+      }
+    }
 
   }