You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/08/26 22:43:14 UTC
git commit: SAMZA-399; make task.checkpoint.segment.bytes configurable
Repository: incubator-samza
Updated Branches:
refs/heads/master cdd2ba7c0 -> d2bf10eb5
SAMZA-399; make task.checkpoint.segment.bytes configurable
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/d2bf10eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/d2bf10eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/d2bf10eb
Branch: refs/heads/master
Commit: d2bf10eb585df1fa566a2e5314dcdf485561e19f
Parents: cdd2ba7
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Aug 26 13:43:06 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Aug 26 13:43:06 2014 -0700
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 10 ++++++++++
.../kafka/KafkaCheckpointManagerFactory.scala | 14 ++++++++++----
.../scala/org/apache/samza/config/KafkaConfig.scala | 2 ++
.../checkpoint/kafka/TestKafkaCheckpointManager.scala | 6 ++++--
4 files changed, 26 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d2bf10eb/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 0c74167..526ca9f 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -752,6 +752,16 @@
</tr>
<tr>
+ <td class="property" id="task-checkpoint-segment-bytes">task.checkpoint.<br>segment.bytes</td>
+ <td class="default">26214400</td>
+ <td class="description">
+ If you are using Kafka for checkpoints, this is the segment size to be used for the checkpoint
+ topic's log segments. Keeping this number small is useful because it increases the frequency
+ that Kafka will garbage collect old checkpoints.
+ </td>
+ </tr>
+
+ <tr>
<th colspan="3" class="section" id="regex-rewriter">
Consuming all Kafka topics matching a regular expression<br>
<span class="subtitle">
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d2bf10eb/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 7ab50a3..f7db2a1 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -53,9 +53,15 @@ object KafkaCheckpointManagerFactory {
// enable log compaction. This keeps job startup time small since there
// are fewer useless (overwritten) messages to read from the checkpoint
// topic.
- val CHECKPOINT_TOPIC_PROPERTIES = (new Properties /: Map(
- "cleanup.policy" -> "compact",
- "segment.bytes" -> "26214400")) { case (props, (k, v)) => props.put(k, v); props }
+ def getCheckpointTopicProperties(config: Config) = {
+ val segmentBytes = config
+ .getCheckpointSegmentBytes
+ .getOrElse("26214400")
+
+ (new Properties /: Map(
+ "cleanup.policy" -> "compact",
+ "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
+ }
}
class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
@@ -106,7 +112,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
connectProducer,
connectZk,
systemStreamPartitionGrouperFactoryString,
- checkpointTopicProperties = CHECKPOINT_TOPIC_PROPERTIES)
+ checkpointTopicProperties = getCheckpointTopicProperties(config))
}
private def getTopic(jobName: String, jobId: String) =
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d2bf10eb/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index bdb416d..9fc1f56 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -32,6 +32,7 @@ object KafkaConfig {
val CHECKPOINT_SYSTEM = "task.checkpoint.system"
val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
+ val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
/**
* Defines how low a queue can get for a single system/stream/partition
@@ -46,6 +47,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
// checkpoints
def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM)
def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR)
+ def getCheckpointSegmentBytes() = getOption(KafkaConfig.CHECKPOINT_SEGMENT_BYTES)
// custom consumer config
def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d2bf10eb/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index f556479..4827731 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -44,6 +44,8 @@ import scala.collection.JavaConversions._
import scala.collection._
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
import kafka.admin.AdminUtils
+import org.apache.samza.config.MapConfig
+import scala.collection.JavaConversions._
object TestKafkaCheckpointManager {
val checkpointTopic = "checkpoint-topic"
@@ -177,7 +179,7 @@ class TestKafkaCheckpointManager {
connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig),
connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
- checkpointTopicProperties = KafkaCheckpointManagerFactory.CHECKPOINT_TOPIC_PROPERTIES)
+ checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
// inject serde. Kafka exceptions will be thrown when serde.fromBytes is called
private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = new KafkaCheckpointManager(
@@ -193,7 +195,7 @@ class TestKafkaCheckpointManager {
connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
serde = new InvalideSerde(exception),
- checkpointTopicProperties = KafkaCheckpointManagerFactory.CHECKPOINT_TOPIC_PROPERTIES)
+ checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
class InvalideSerde(exception: String) extends CheckpointSerde {
override def fromBytes(bytes: Array[Byte]): Checkpoint = {