You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/08/26 18:02:55 UTC
git commit: SAMZA-393;
enable log compaction for checkpoint topics in kafka
Repository: incubator-samza
Updated Branches:
refs/heads/master 1be188b6c -> d733ed961
SAMZA-393; enable log compaction for checkpoint topics in kafka
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/d733ed96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/d733ed96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/d733ed96
Branch: refs/heads/master
Commit: d733ed961fec0a08a6706be22ca8a6537693bed2
Parents: 1be188b
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Aug 26 09:02:39 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Aug 26 09:02:39 2014 -0700
----------------------------------------------------------------------
.../kafka/KafkaCheckpointManager.scala | 7 +++--
.../kafka/KafkaCheckpointManagerFactory.scala | 30 +++++++++++++++-----
.../kafka/TestKafkaCheckpointManager.scala | 26 ++++++++++++-----
3 files changed, 47 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d733ed96/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index f3e954a..1d5627d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -44,6 +44,7 @@ import org.apache.samza.system.kafka.TopicMetadataCache
import org.apache.samza.util.ExponentialSleepStrategy
import org.apache.samza.util.TopicMetadataStore
import scala.collection.mutable
+import java.util.Properties
/**
* Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
@@ -65,7 +66,8 @@ class KafkaCheckpointManager(
connectZk: () => ZkClient,
systemStreamPartitionGrouperFactoryString: String,
retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
- serde: CheckpointSerde = new CheckpointSerde) extends CheckpointManager with Logging {
+ serde: CheckpointSerde = new CheckpointSerde,
+ checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging {
import KafkaCheckpointManager._
var taskNames = Set[TaskName]()
@@ -355,7 +357,8 @@ class KafkaCheckpointManager(
zkClient,
checkpointTopic,
1,
- replicationFactor)
+ replicationFactor,
+ checkpointTopicProperties)
} finally {
zkClient.close
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d733ed96/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 977330f..7ab50a3 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -31,13 +31,33 @@ import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.KafkaConfig.Config2Kafka
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore }
+import java.util.Properties
+import scala.collection.JavaConversions._
object KafkaCheckpointManagerFactory {
/**
* Version number to track the format of the checkpoint log
*/
val CHECKPOINT_LOG_VERSION_NUMBER = 1
+
+ val INJECTED_PRODUCER_PROPERTIES = Map(
+ "request.required.acks" -> "-1",
+ // Forcibly disable compression because Kafka doesn't support compression
+ // on log compacted topics. Details in SAMZA-393.
+ "compression.codec" -> "none",
+ "producer.type" -> "sync",
+ // Subtract one here, because DefaultEventHandler calls messageSendMaxRetries + 1.
+ "message.send.max.retries" -> (Integer.MAX_VALUE - 1).toString)
+
+ // Set the checkpoint topic configs to have a very small segment size and
+ // enable log compaction. This keeps job startup time small since there
+ // are fewer useless (overwritten) messages to read from the checkpoint
+ // topic.
+ val CHECKPOINT_TOPIC_PROPERTIES = (new Properties /: Map(
+ "cleanup.policy" -> "compact",
+ "segment.bytes" -> "26214400")) { case (props, (k, v)) => props.put(k, v); props }
}
+
class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
import KafkaCheckpointManagerFactory._
@@ -46,15 +66,10 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
val systemName = config
.getCheckpointSystem
.getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
- val injectedProducerProps = Map(
- "request.required.acks" -> "-1",
- "producer.type" -> "sync",
- // Subtract one here, because DefaultEventHandler calls messageSendMaxRetries + 1.
- "message.send.max.retries" -> (Integer.MAX_VALUE - 1).toString)
val producerConfig = config.getKafkaSystemProducerConfig(
systemName,
clientId,
- injectedProducerProps)
+ INJECTED_PRODUCER_PROPERTIES)
val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
val replicationFactor = config.getCheckpointReplicationFactor.getOrElse("3").toInt
val socketTimeout = consumerConfig.socketTimeoutMs
@@ -90,7 +105,8 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
metadataStore,
connectProducer,
connectZk,
- systemStreamPartitionGrouperFactoryString)
+ systemStreamPartitionGrouperFactoryString,
+ checkpointTopicProperties = CHECKPOINT_TOPIC_PROPERTIES)
}
private def getTopic(jobName: String, jobId: String) =
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d733ed96/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 34fe6dd..f556479 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -37,14 +37,17 @@ import org.apache.samza.container.TaskName
import org.apache.samza.serializers.CheckpointSerde
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore }
-import org.apache.samza.{SamzaException, Partition}
+import org.apache.samza.{ SamzaException, Partition }
import org.junit.Assert._
-import org.junit.{AfterClass, BeforeClass, Test}
+import org.junit.{ AfterClass, BeforeClass, Test }
import scala.collection.JavaConversions._
import scala.collection._
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import kafka.admin.AdminUtils
object TestKafkaCheckpointManager {
+ val checkpointTopic = "checkpoint-topic"
+ val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
val zkConnect: String = TestZKUtils.zookeeperConnect
var zkClient: ZkClient = null
val zkConnectionTimeout = 6000
@@ -68,6 +71,7 @@ object TestKafkaCheckpointManager {
config.put("metadata.broker.list", brokers)
config.put("producer.type", "sync")
config.put("request.required.acks", "-1")
+ config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
val producerConfig = new ProducerConfig(config)
val partition = new Partition(0)
val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123"))
@@ -113,8 +117,14 @@ class TestKafkaCheckpointManager {
val taskName = new TaskName(partition.toString)
kcm.register(taskName)
kcm.start
- var readCp = kcm.readLastCheckpoint(taskName)
+ // check that log compaction is enabled.
+ val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+ val topicConfig = AdminUtils.fetchTopicConfig(zkClient, checkpointTopic)
+ zkClient.close
+ assertEquals("compact", topicConfig.get("cleanup.policy"))
+ assertEquals("26214400", topicConfig.get("segment.bytes"))
// read before topic exists should result in a null checkpoint
+ var readCp = kcm.readLastCheckpoint(taskName)
assertNull(readCp)
// create topic the first time around
kcm.writeCheckpoint(taskName, cp1)
@@ -157,7 +167,7 @@ class TestKafkaCheckpointManager {
private def getKafkaCheckpointManager = new KafkaCheckpointManager(
clientId = "some-client-id",
- checkpointTopic = "checkpoint-topic",
+ checkpointTopic = checkpointTopic,
systemName = "kafka",
replicationFactor = 3,
socketTimeout = 30000,
@@ -166,12 +176,13 @@ class TestKafkaCheckpointManager {
metadataStore = metadataStore,
connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig),
connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
- systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString)
+ systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
+ checkpointTopicProperties = KafkaCheckpointManagerFactory.CHECKPOINT_TOPIC_PROPERTIES)
// inject serde. Kafka exceptions will be thrown when serde.fromBytes is called
private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = new KafkaCheckpointManager(
clientId = "some-client-id-invalid-serde",
- checkpointTopic = "checkpoint-topic-invalid-serde",
+ checkpointTopic = serdeCheckpointTopic,
systemName = "kafka",
replicationFactor = 3,
socketTimeout = 30000,
@@ -181,7 +192,8 @@ class TestKafkaCheckpointManager {
connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig),
connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
- serde = new InvalideSerde(exception))
+ serde = new InvalideSerde(exception),
+ checkpointTopicProperties = KafkaCheckpointManagerFactory.CHECKPOINT_TOPIC_PROPERTIES)
class InvalideSerde(exception: String) extends CheckpointSerde {
override def fromBytes(bytes: Array[Byte]): Checkpoint = {