You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/08/26 18:02:55 UTC

git commit: SAMZA-393; enable log compaction for checkpoint topics in kafka

Repository: incubator-samza
Updated Branches:
  refs/heads/master 1be188b6c -> d733ed961


SAMZA-393; enable log compaction for checkpoint topics in kafka


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/d733ed96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/d733ed96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/d733ed96

Branch: refs/heads/master
Commit: d733ed961fec0a08a6706be22ca8a6537693bed2
Parents: 1be188b
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Aug 26 09:02:39 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Aug 26 09:02:39 2014 -0700

----------------------------------------------------------------------
 .../kafka/KafkaCheckpointManager.scala          |  7 +++--
 .../kafka/KafkaCheckpointManagerFactory.scala   | 30 +++++++++++++++-----
 .../kafka/TestKafkaCheckpointManager.scala      | 26 ++++++++++++-----
 3 files changed, 47 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d733ed96/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index f3e954a..1d5627d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -44,6 +44,7 @@ import org.apache.samza.system.kafka.TopicMetadataCache
 import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.TopicMetadataStore
 import scala.collection.mutable
+import java.util.Properties
 
 /**
  * Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
@@ -65,7 +66,8 @@ class KafkaCheckpointManager(
   connectZk: () => ZkClient,
   systemStreamPartitionGrouperFactoryString: String,
   retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
-  serde: CheckpointSerde = new CheckpointSerde) extends CheckpointManager with Logging {
+  serde: CheckpointSerde = new CheckpointSerde,
+  checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging {
   import KafkaCheckpointManager._
 
   var taskNames = Set[TaskName]()
@@ -355,7 +357,8 @@ class KafkaCheckpointManager(
             zkClient,
             checkpointTopic,
             1,
-            replicationFactor)
+            replicationFactor,
+            checkpointTopicProperties)
         } finally {
           zkClient.close
         }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d733ed96/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 977330f..7ab50a3 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -31,13 +31,33 @@ import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.KafkaConfig.Config2Kafka
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore }
+import java.util.Properties
+import scala.collection.JavaConversions._
 
 object KafkaCheckpointManagerFactory {
   /**
    * Version number to track the format of the checkpoint log
    */
   val CHECKPOINT_LOG_VERSION_NUMBER = 1
+
+  val INJECTED_PRODUCER_PROPERTIES = Map(
+    "request.required.acks" -> "-1",
+    // Forcibly disable compression because Kafka doesn't support compression
+    // on log compacted topics. Details in SAMZA-393.
+    "compression.codec" -> "none",
+    "producer.type" -> "sync",
+    // Subtract one here, because DefaultEventHandler calls messageSendMaxRetries + 1.
+    "message.send.max.retries" -> (Integer.MAX_VALUE - 1).toString)
+
+  // Set the checkpoint topic configs to have a very small segment size and
+  // enable log compaction. This keeps job startup time small since there 
+  // are fewer useless (overwritten) messages to read from the checkpoint 
+  // topic.
+  val CHECKPOINT_TOPIC_PROPERTIES = (new Properties /: Map(
+    "cleanup.policy" -> "compact",
+    "segment.bytes" -> "26214400")) { case (props, (k, v)) => props.put(k, v); props }
 }
+
 class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
   import KafkaCheckpointManagerFactory._
 
@@ -46,15 +66,10 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
     val systemName = config
       .getCheckpointSystem
       .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
-    val injectedProducerProps = Map(
-      "request.required.acks" -> "-1",
-      "producer.type" -> "sync",
-      // Subtract one here, because DefaultEventHandler calls messageSendMaxRetries + 1.
-      "message.send.max.retries" -> (Integer.MAX_VALUE - 1).toString)
     val producerConfig = config.getKafkaSystemProducerConfig(
       systemName,
       clientId,
-      injectedProducerProps)
+      INJECTED_PRODUCER_PROPERTIES)
     val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
     val replicationFactor = config.getCheckpointReplicationFactor.getOrElse("3").toInt
     val socketTimeout = consumerConfig.socketTimeoutMs
@@ -90,7 +105,8 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
       metadataStore,
       connectProducer,
       connectZk,
-      systemStreamPartitionGrouperFactoryString)
+      systemStreamPartitionGrouperFactoryString,
+      checkpointTopicProperties = CHECKPOINT_TOPIC_PROPERTIES)
   }
 
   private def getTopic(jobName: String, jobId: String) =

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d733ed96/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 34fe6dd..f556479 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -37,14 +37,17 @@ import org.apache.samza.container.TaskName
 import org.apache.samza.serializers.CheckpointSerde
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore }
-import org.apache.samza.{SamzaException, Partition}
+import org.apache.samza.{ SamzaException, Partition }
 import org.junit.Assert._
-import org.junit.{AfterClass, BeforeClass, Test}
+import org.junit.{ AfterClass, BeforeClass, Test }
 import scala.collection.JavaConversions._
 import scala.collection._
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import kafka.admin.AdminUtils
 
 object TestKafkaCheckpointManager {
+  val checkpointTopic = "checkpoint-topic"
+  val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
   val zkConnect: String = TestZKUtils.zookeeperConnect
   var zkClient: ZkClient = null
   val zkConnectionTimeout = 6000
@@ -68,6 +71,7 @@ object TestKafkaCheckpointManager {
   config.put("metadata.broker.list", brokers)
   config.put("producer.type", "sync")
   config.put("request.required.acks", "-1")
+  config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
   val producerConfig = new ProducerConfig(config)
   val partition = new Partition(0)
   val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123"))
@@ -113,8 +117,14 @@ class TestKafkaCheckpointManager {
     val taskName = new TaskName(partition.toString)
     kcm.register(taskName)
     kcm.start
-    var readCp = kcm.readLastCheckpoint(taskName)
+    // check that log compaction is enabled.
+    val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+    val topicConfig = AdminUtils.fetchTopicConfig(zkClient, checkpointTopic)
+    zkClient.close
+    assertEquals("compact", topicConfig.get("cleanup.policy"))
+    assertEquals("26214400", topicConfig.get("segment.bytes"))
     // read before topic exists should result in a null checkpoint
+    var readCp = kcm.readLastCheckpoint(taskName)
     assertNull(readCp)
     // create topic the first time around
     kcm.writeCheckpoint(taskName, cp1)
@@ -157,7 +167,7 @@ class TestKafkaCheckpointManager {
 
   private def getKafkaCheckpointManager = new KafkaCheckpointManager(
     clientId = "some-client-id",
-    checkpointTopic = "checkpoint-topic",
+    checkpointTopic = checkpointTopic,
     systemName = "kafka",
     replicationFactor = 3,
     socketTimeout = 30000,
@@ -166,12 +176,13 @@ class TestKafkaCheckpointManager {
     metadataStore = metadataStore,
     connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig),
     connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
-    systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString)
+    systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
+    checkpointTopicProperties = KafkaCheckpointManagerFactory.CHECKPOINT_TOPIC_PROPERTIES)
 
   // inject serde. Kafka exceptions will be thrown when serde.fromBytes is called
   private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = new KafkaCheckpointManager(
     clientId = "some-client-id-invalid-serde",
-    checkpointTopic = "checkpoint-topic-invalid-serde",
+    checkpointTopic = serdeCheckpointTopic,
     systemName = "kafka",
     replicationFactor = 3,
     socketTimeout = 30000,
@@ -181,7 +192,8 @@ class TestKafkaCheckpointManager {
     connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig),
     connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
     systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
-    serde = new InvalideSerde(exception))
+    serde = new InvalideSerde(exception),
+    checkpointTopicProperties = KafkaCheckpointManagerFactory.CHECKPOINT_TOPIC_PROPERTIES)
 
   class InvalideSerde(exception: String) extends CheckpointSerde {
     override def fromBytes(bytes: Array[Byte]): Checkpoint = {