You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/08/26 22:43:14 UTC

git commit: SAMZA-399; make task.checkpoint.segment.bytes configurable

Repository: incubator-samza
Updated Branches:
  refs/heads/master cdd2ba7c0 -> d2bf10eb5


SAMZA-399; make task.checkpoint.segment.bytes configurable


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/d2bf10eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/d2bf10eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/d2bf10eb

Branch: refs/heads/master
Commit: d2bf10eb585df1fa566a2e5314dcdf485561e19f
Parents: cdd2ba7
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Aug 26 13:43:06 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Aug 26 13:43:06 2014 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html           | 10 ++++++++++
 .../kafka/KafkaCheckpointManagerFactory.scala         | 14 ++++++++++----
 .../scala/org/apache/samza/config/KafkaConfig.scala   |  2 ++
 .../checkpoint/kafka/TestKafkaCheckpointManager.scala |  6 ++++--
 4 files changed, 26 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d2bf10eb/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 0c74167..526ca9f 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -752,6 +752,16 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="task-checkpoint-segment-bytes">task.checkpoint.<br>segment.bytes</td>
+                    <td class="default">26214400</td>
+                    <td class="description">
+                        If you are using Kafka for checkpoints, this is the segment size to be used for the checkpoint
+                        topic's log segments. Keeping this number small is useful because it increases the frequency
+                        that Kafka will garbage collect old checkpoints.
+                    </td>
+                </tr>
+
+                <tr>
                     <th colspan="3" class="section" id="regex-rewriter">
                         Consuming all Kafka topics matching a regular expression<br>
                         <span class="subtitle">

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d2bf10eb/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 7ab50a3..f7db2a1 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -53,9 +53,15 @@ object KafkaCheckpointManagerFactory {
   // enable log compaction. This keeps job startup time small since there 
   // are fewer useless (overwritten) messages to read from the checkpoint 
   // topic.
-  val CHECKPOINT_TOPIC_PROPERTIES = (new Properties /: Map(
-    "cleanup.policy" -> "compact",
-    "segment.bytes" -> "26214400")) { case (props, (k, v)) => props.put(k, v); props }
+  def getCheckpointTopicProperties(config: Config) = {
+    val segmentBytes = config
+      .getCheckpointSegmentBytes
+      .getOrElse("26214400")
+
+    (new Properties /: Map(
+      "cleanup.policy" -> "compact",
+      "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
+  }
 }
 
 class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
@@ -106,7 +112,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
       connectProducer,
       connectZk,
       systemStreamPartitionGrouperFactoryString,
-      checkpointTopicProperties = CHECKPOINT_TOPIC_PROPERTIES)
+      checkpointTopicProperties = getCheckpointTopicProperties(config))
   }
 
   private def getTopic(jobName: String, jobId: String) =

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d2bf10eb/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index bdb416d..9fc1f56 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -32,6 +32,7 @@ object KafkaConfig {
 
   val CHECKPOINT_SYSTEM = "task.checkpoint.system"
   val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
+  val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
 
   /**
    * Defines how low a queue can get for a single system/stream/partition
@@ -46,6 +47,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   // checkpoints
   def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM)
   def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR)
+  def getCheckpointSegmentBytes() = getOption(KafkaConfig.CHECKPOINT_SEGMENT_BYTES)
 
   // custom consumer config
   def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d2bf10eb/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index f556479..4827731 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -44,6 +44,8 @@ import scala.collection.JavaConversions._
 import scala.collection._
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
 import kafka.admin.AdminUtils
+import org.apache.samza.config.MapConfig
+import scala.collection.JavaConversions._
 
 object TestKafkaCheckpointManager {
   val checkpointTopic = "checkpoint-topic"
@@ -177,7 +179,7 @@ class TestKafkaCheckpointManager {
     connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig),
     connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
     systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
-    checkpointTopicProperties = KafkaCheckpointManagerFactory.CHECKPOINT_TOPIC_PROPERTIES)
+    checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
 
   // inject serde. Kafka exceptions will be thrown when serde.fromBytes is called
   private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = new KafkaCheckpointManager(
@@ -193,7 +195,7 @@ class TestKafkaCheckpointManager {
     connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
     systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
     serde = new InvalideSerde(exception),
-    checkpointTopicProperties = KafkaCheckpointManagerFactory.CHECKPOINT_TOPIC_PROPERTIES)
+    checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
 
   class InvalideSerde(exception: String) extends CheckpointSerde {
     override def fromBytes(bytes: Array[Byte]): Checkpoint = {