You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:32 UTC
[16/50] [abbrv] samza git commit: SAMZA-798 : Performance and
stability issue after combining checkpoint and coordinator stream
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala
deleted file mode 100644
index 958d07c..0000000
--- a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package old.checkpoint
-
-import java.util
-
-import org.apache.samza.SamzaException
-import org.apache.samza.container.TaskName
-import org.codehaus.jackson.`type`.TypeReference
-import org.codehaus.jackson.map.ObjectMapper
-
-import scala.collection.JavaConversions._
-
-/**
- * Kafka Checkpoint Log-specific key used to identify what type of entry is
- * written for any particular log entry.
- *
- * @param map Backing map to hold key values
- */
-class KafkaCheckpointLogKey private (val map: Map[String, String]) {
- // This might be better as a case class...
- import KafkaCheckpointLogKey._
-
- /**
- * Serialize this key to bytes
- * @return Key as bytes
- */
- def toBytes(): Array[Byte] = {
- val jMap = new util.HashMap[String, String](map.size)
- jMap.putAll(map)
-
- JSON_MAPPER.writeValueAsBytes(jMap)
- }
-
- private def getKey = map.getOrElse(CHECKPOINT_KEY_KEY, throw new SamzaException("No " + CHECKPOINT_KEY_KEY + " in map for Kafka Checkpoint log key"))
-
- /**
- * Is this key for a checkpoint entry?
- *
- * @return true iff this key's entry is for a checkpoint
- */
- def isCheckpointKey = getKey.equals(CHECKPOINT_KEY_TYPE)
-
- /**
- * Is this key for a changelog partition mapping?
- *
- * @return true iff this key's entry is for a changelog partition mapping
- */
- def isChangelogPartitionMapping = getKey.equals(CHANGELOG_PARTITION_KEY_TYPE)
-
- /**
- * If this Key is for a checkpoint entry, return its associated TaskName.
- *
- * @return TaskName for this checkpoint or throw an exception if this key does not have a TaskName entry
- */
- def getCheckpointTaskName = {
- val asString = map.getOrElse(CHECKPOINT_TASKNAME_KEY, throw new SamzaException("No TaskName in checkpoint key: " + this))
- new TaskName(asString)
- }
-
- def canEqual(other: Any): Boolean = other.isInstanceOf[KafkaCheckpointLogKey]
-
- override def equals(other: Any): Boolean = other match {
- case that: KafkaCheckpointLogKey =>
- (that canEqual this) &&
- map == that.map
- case _ => false
- }
-
- override def hashCode(): Int = {
- val state = Seq(map)
- state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
- }
-}
-
-object KafkaCheckpointLogKey {
- /**
- * Messages in the checkpoint log have keys associated with them. These keys are maps that describe the message's
- * type, either a checkpoint or a changelog-partition-mapping.
- */
- val CHECKPOINT_KEY_KEY = "type"
- val CHECKPOINT_KEY_TYPE = "checkpoint"
- val CHANGELOG_PARTITION_KEY_TYPE = "changelog-partition-mapping"
- val CHECKPOINT_TASKNAME_KEY = "taskName"
- val SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY = "systemstreampartition-grouper-factory"
-
- /**
- * Partition mapping keys have no dynamic values, so we just need one instance.
- */
- val CHANGELOG_PARTITION_MAPPING_KEY = new KafkaCheckpointLogKey(Map(CHECKPOINT_KEY_KEY -> CHANGELOG_PARTITION_KEY_TYPE))
-
- private val JSON_MAPPER = new ObjectMapper()
- val KEY_TYPEREFERENCE = new TypeReference[util.HashMap[String, String]]() {}
-
- var systemStreamPartitionGrouperFactoryString:Option[String] = None
-
- /**
- * Set the name of the factory configured to provide the SystemStreamPartition grouping
- * so it be included in the key.
- *
- * @param str Config value of SystemStreamPartition Grouper Factory
- */
- def setSystemStreamPartitionGrouperFactoryString(str:String) = {
- systemStreamPartitionGrouperFactoryString = Some(str)
- }
-
- /**
- * Get the name of the factory configured to provide the SystemStreamPartition grouping
- * so it be included in the key
- */
- def getSystemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString.getOrElse(throw new SamzaException("No SystemStreamPartition grouping factory string has been set."))
-
- /**
- * Build a key for a a checkpoint log entry for a particular TaskName
- * @param taskName TaskName to build for this checkpoint entry
- *
- * @return Key for checkpoint log entry
- */
- def getCheckpointKey(taskName:TaskName) = {
- val map = Map(CHECKPOINT_KEY_KEY -> CHECKPOINT_KEY_TYPE,
- CHECKPOINT_TASKNAME_KEY -> taskName.getTaskName,
- SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY -> getSystemStreamPartitionGrouperFactoryString)
-
- new KafkaCheckpointLogKey(map)
- }
-
- /**
- * Build a key for a changelog partition mapping entry
- *
- * @return Key for changelog partition mapping entry
- */
- def getChangelogPartitionMappingKey() = CHANGELOG_PARTITION_MAPPING_KEY
-
- /**
- * Deserialize a Kafka checkpoint log key
- * @param bytes Serialized (via JSON) Kafka checkpoint log key
- * @return Checkpoint log key
- */
- def fromBytes(bytes: Array[Byte]): KafkaCheckpointLogKey = {
- try {
- val jmap: util.HashMap[String, String] = JSON_MAPPER.readValue(bytes, KEY_TYPEREFERENCE)
-
- if(!jmap.containsKey(CHECKPOINT_KEY_KEY)) {
- throw new SamzaException("No type entry in checkpoint key: " + jmap)
- }
-
- // Only checkpoint keys have ssp grouper factory keys
- if(jmap.get(CHECKPOINT_KEY_KEY).equals(CHECKPOINT_KEY_TYPE)) {
- val sspGrouperFactory = jmap.get(SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY)
-
- if (sspGrouperFactory == null) {
- throw new SamzaException("No SystemStreamPartition Grouper factory entry in checkpoint key: " + jmap)
- }
-
- if (!sspGrouperFactory.equals(getSystemStreamPartitionGrouperFactoryString)) {
- throw new DifferingSystemStreamPartitionGrouperFactoryValues(sspGrouperFactory, getSystemStreamPartitionGrouperFactoryString)
- }
- }
-
- new KafkaCheckpointLogKey(jmap.toMap)
- } catch {
- case e: Exception =>
- throw new SamzaException("Exception while deserializing checkpoint key", e)
- }
- }
-}
-
-class DifferingSystemStreamPartitionGrouperFactoryValues(inKey:String, inConfig:String) extends SamzaException {
- override def getMessage() = "Checkpoint key's SystemStreamPartition Grouper factory (" + inKey +
- ") does not match value from current configuration (" + inConfig + "). " +
- "This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported."
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala
deleted file mode 100644
index 627631a..0000000
--- a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package old.checkpoint
-
-import java.nio.ByteBuffer
-import java.util
-import java.util.Properties
-
-import kafka.admin.AdminUtils
-import kafka.api._
-import kafka.common.{ErrorMapping, InvalidMessageSizeException, TopicAndPartition, TopicExistsException, UnknownTopicOrPartitionException}
-import kafka.consumer.SimpleConsumer
-import kafka.message.InvalidMessageException
-import kafka.utils.Utils
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
-import org.apache.samza.container.TaskName
-import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.system.kafka.TopicMetadataCache
-import org.apache.samza.util.{ExponentialSleepStrategy, KafkaUtil, Logging, TopicMetadataStore}
-
-import scala.collection.mutable
-
-/**
- * Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
- * To read a checkpoint for a specific taskName, we find the newest message
- * keyed to that taskName. If there is no such message, no checkpoint data
- * exists. The underlying log has a single partition into which all
- * checkpoints and TaskName to changelog partition mappings are written.
- */
-class KafkaCheckpointManager(clientId: String,
- checkpointTopic: String,
- systemName: String,
- socketTimeout: Int,
- bufferSize: Int,
- fetchSize: Int,
- metadataStore: TopicMetadataStore,
- connectProducer: () => Producer[Array[Byte], Array[Byte]],
- connectZk: () => ZkClient,
- systemStreamPartitionGrouperFactoryString: String,
- retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
- serde: CheckpointSerde = new CheckpointSerde,
- checkpointTopicProperties: Properties = new Properties) extends Logging {
- import KafkaCheckpointManager._
-
- var taskNames = Set[TaskName]()
- var producer: Producer[Array[Byte], Array[Byte]] = null
- var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
-
- var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint
-
- KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString)
-
- info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName))
-
- private def getConsumer(): SimpleConsumer = {
- val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
- val metadata = metadataMap(checkpointTopic)
- val partitionMetadata = metadata.partitionsMetadata
- .filter(_.partitionId == 0)
- .headOption
- .getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition 0 for checkpoint topic, but it didn't exist in Kafka."))
- val leader = partitionMetadata
- .leader
- .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic))
-
- info("Connecting to leader %s:%d for topic %s and to fetch all checkpoint messages." format(leader.host, leader.port, checkpointTopic))
-
- new SimpleConsumer(leader.host, leader.port, socketTimeout, bufferSize, clientId)
- }
-
- private def getEarliestOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition): Long = consumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, -1)
-
- private def getOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition, earliestOrLatest: Long): Long = {
- val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))))
- .partitionErrorAndOffsets
- .get(topicAndPartition)
- .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:0" format checkpointTopic))
- // Fail or retry if there was an an issue with the offset request.
- KafkaUtil.maybeThrowException(offsetResponse.error)
-
- val offset: Long = offsetResponse
- .offsets
- .headOption
- .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:0" format checkpointTopic))
-
- offset
- }
-
- /**
- * Read the last checkpoint for specified TaskName
- *
- * @param taskName Specific Samza taskName for which to get the last checkpoint of.
- */
- def readLastCheckpoint(taskName: TaskName): Checkpoint = {
- if (!taskNames.contains(taskName)) {
- throw new SamzaException(taskName + " not registered with this CheckpointManager")
- }
-
- info("Reading checkpoint for taskName " + taskName)
-
- if (taskNamesToOffsets == null) {
- info("No TaskName to checkpoint mapping provided. Reading for first time.")
- taskNamesToOffsets = readCheckpointsFromLog()
- } else {
- info("Already existing checkpoint mapping. Merging new offsets")
- taskNamesToOffsets ++= readCheckpointsFromLog()
- }
-
- val checkpoint = taskNamesToOffsets.get(taskName).getOrElse(null)
-
- info("Got checkpoint state for taskName %s: %s" format(taskName, checkpoint))
-
- checkpoint
- }
-
- /**
- * Read through entire log, discarding changelog mapping, and building map of TaskNames to Checkpoints
- */
- def readCheckpointsFromLog(): Map[TaskName, Checkpoint] = {
- val checkpoints = mutable.Map[TaskName, Checkpoint]()
-
- def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isCheckpointKey
-
- def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
- val taskName = checkpointKey.getCheckpointTaskName
- val checkpoint = serde.fromBytes(Utils.readBytes(payload))
- debug("Adding checkpoint " + checkpoint + " for taskName " + taskName)
- checkpoints.put(taskName, checkpoint) // replacing any existing, older checkpoints as we go
- }
-
- readLog(CHECKPOINT_LOG4J_ENTRY, shouldHandleEntry, handleCheckpoint)
-
- checkpoints.toMap /* of the immutable kind */
- }
-
- /**
- * Read through entire log, discarding checkpoints, finding latest changelogPartitionMapping
- *
- * Lots of duplicated code from the checkpoint method, but will be better to refactor this code into AM-based
- * checkpoint log reading
- */
- def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = {
- var changelogPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
-
- def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isChangelogPartitionMapping
-
- def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
- changelogPartitionMapping = serde.changelogPartitionMappingFromBytes(Utils.readBytes(payload))
-
- debug("Adding changelog partition mapping" + changelogPartitionMapping)
- }
-
- readLog(CHANGELOG_PARTITION_MAPPING_LOG4j, shouldHandleEntry, handleCheckpoint)
-
- changelogPartitionMapping
- }
-
- /**
- * Common code for reading both changelog partition mapping and change log
- *
- * @param entryType What type of entry to look for within the log key's
- * @param handleEntry Code to handle an entry in the log once it's found
- */
- private def readLog(entryType:String, shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
- handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = {
- retryBackoff.run[Unit](
- loop => {
- val consumer = getConsumer()
-
- val topicAndPartition = new TopicAndPartition(checkpointTopic, 0)
-
- try {
- var offset = startingOffset.getOrElse(getEarliestOffset(consumer, topicAndPartition))
-
- info("Got offset %s for topic %s and partition 0. Attempting to fetch messages for %s." format(offset, checkpointTopic, entryType))
-
- val latestOffset = getOffset(consumer, topicAndPartition, OffsetRequest.LatestTime)
-
- info("Get latest offset %s for topic %s and partition 0." format(latestOffset, checkpointTopic))
-
- if (offset < 0) {
- info("Got offset 0 (no messages in %s) for topic %s and partition 0, so returning empty collection. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (entryType, checkpointTopic))
- return
- }
-
- while (offset < latestOffset) {
- val request = new FetchRequestBuilder()
- .addFetch(checkpointTopic, 0, offset, fetchSize)
- .maxWait(500)
- .minBytes(1)
- .clientId(clientId)
- .build
-
- val fetchResponse = consumer.fetch(request)
- if (fetchResponse.hasError) {
- warn("Got error code from broker for %s: %s" format(checkpointTopic, fetchResponse.errorCode(checkpointTopic, 0)))
- val errorCode = fetchResponse.errorCode(checkpointTopic, 0)
- if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
- warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic))
- return
- }
- KafkaUtil.maybeThrowException(errorCode)
- }
-
- for (response <- fetchResponse.messageSet(checkpointTopic, 0)) {
- offset = response.nextOffset
- startingOffset = Some(offset) // For next time we call
-
- if (!response.message.hasKey) {
- throw new KafkaCheckpointException("Encountered message without key.")
- }
-
- val checkpointKey = KafkaCheckpointLogKey.fromBytes(Utils.readBytes(response.message.key))
-
- if (!shouldHandleEntry(checkpointKey)) {
- debug("Skipping " + entryType + " entry with key " + checkpointKey)
- } else {
- handleEntry(response.message.payload, checkpointKey)
- }
- }
- }
- } finally {
- consumer.close()
- }
-
- loop.done
- Unit
- },
-
- (exception, loop) => {
- exception match {
- case e: InvalidMessageException => throw new KafkaCheckpointException("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e)
- case e: InvalidMessageSizeException => throw new KafkaCheckpointException("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e)
- case e: UnknownTopicOrPartitionException => throw new KafkaCheckpointException("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e)
- case e: KafkaCheckpointException => throw e
- case e: Exception =>
- warn("While trying to read last %s entry for topic %s and partition 0: %s. Retrying." format(entryType, checkpointTopic, e))
- debug("Exception detail:", e)
- }
- }
- ).getOrElse(throw new SamzaException("Failed to get entries for " + entryType + " from topic " + checkpointTopic))
-
- }
-
- def topicExists: Boolean = {
- val zkClient = connectZk()
- try {
- AdminUtils.topicExists(zkClient, checkpointTopic)
- } finally {
- zkClient.close()
- }
- }
-
- def start {
- if (topicExists) {
- validateTopic
- } else {
- throw new SamzaException("Failed to start KafkaCheckpointManager for non-existing checkpoint topic. KafkaCheckpointManager should only be used for migration purpose.")
- }
- }
-
- def register(taskName: TaskName) {
- debug("Adding taskName " + taskName + " to " + this)
- taskNames += taskName
- }
-
- def stop = {
- if (producer != null) {
- producer.close
- }
- }
-
- def validateTopic = {
- info("Validating checkpoint topic %s." format checkpointTopic)
- retryBackoff.run(
- loop => {
- val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, metadataStore.getTopicInfo)
- val topicMetadata = topicMetadataMap(checkpointTopic)
- KafkaUtil.maybeThrowException(topicMetadata.errorCode)
-
- val partitionCount = topicMetadata.partitionsMetadata.length
- if (partitionCount != 1) {
- throw new KafkaCheckpointException("Checkpoint topic validation failed for topic %s because partition count %s did not match expected partition count of 1." format(checkpointTopic, topicMetadata.partitionsMetadata.length))
- }
-
- info("Successfully validated checkpoint topic %s." format checkpointTopic)
- loop.done
- },
-
- (exception, loop) => {
- exception match {
- case e: KafkaCheckpointException => throw e
- case e: Exception =>
- warn("While trying to validate topic %s: %s. Retrying." format(checkpointTopic, e))
- debug("Exception detail:", e)
- }
- }
- )
- }
-
- override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic)
-}
-
-object KafkaCheckpointManager {
- val CHECKPOINT_LOG4J_ENTRY = "checkpoint log"
- val CHANGELOG_PARTITION_MAPPING_LOG4j = "changelog partition mapping"
-}
-
-/**
- * KafkaCheckpointManager handles retries, so we need two kinds of exceptions:
- * one to signal a hard failure, and the other to retry. The
- * KafkaCheckpointException is thrown to indicate a hard failure that the Kafka
- * CheckpointManager can't recover from.
- */
-class KafkaCheckpointException(s: String, t: Throwable) extends SamzaException(s, t) {
- def this(s: String) = this(s, null)
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala
deleted file mode 100644
index 189752a..0000000
--- a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package old.checkpoint
-
-import java.util.Properties
-
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.CheckpointManager
-import org.apache.samza.config.Config
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging}
-
-object KafkaCheckpointManagerFactory {
- /**
- * Version number to track the format of the checkpoint log
- */
- val CHECKPOINT_LOG_VERSION_NUMBER = 1
-
- val INJECTED_PRODUCER_PROPERTIES = Map(
- "acks" -> "all",
- // Forcibly disable compression because Kafka doesn't support compression
- // on log compacted topics. Details in SAMZA-586.
- "compression.type" -> "none")
-
- // Set the checkpoint topic configs to have a very small segment size and
- // enable log compaction. This keeps job startup time small since there
- // are fewer useless (overwritten) messages to read from the checkpoint
- // topic.
- def getCheckpointTopicProperties(config: Config) = {
- val segmentBytes = "26214400"
-
- (new Properties /: Map(
- "cleanup.policy" -> "compact",
- "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
- }
-}
-
-class KafkaCheckpointManagerFactory extends Logging {
- import KafkaCheckpointManagerFactory._
-
- def getCheckpointManager(config: Config, registry: MetricsRegistry): KafkaCheckpointManager = {
- val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config)
- val systemName = Option(config.get("task.checkpoint.system")).getOrElse(throw new SamzaException("no system defined for checkpoint manager, cannot perform migration."))
- val producerConfig = config.getKafkaSystemProducerConfig(
- systemName,
- clientId,
- INJECTED_PRODUCER_PROPERTIES)
- val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
- val socketTimeout = consumerConfig.socketTimeoutMs
- val bufferSize = consumerConfig.socketReceiveBufferBytes
- val fetchSize = consumerConfig.fetchMessageMaxBytes // must be > buffer size
-
- val connectProducer = () => {
- new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
- }
- val zkConnect = Option(consumerConfig.zkConnect)
- .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
- val connectZk = () => {
- new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
- }
- val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs"))
- val jobId = config.getJobId.getOrElse("1")
- val bootstrapServers = producerConfig.bootsrapServers
- val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, socketTimeout)
- val checkpointTopic = getTopic(jobName, jobId)
-
- // Find out the SSPGrouperFactory class so it can be included/verified in the key
- val systemStreamPartitionGrouperFactoryString = config.getSystemStreamPartitionGrouperFactory
-
- new KafkaCheckpointManager(
- clientId,
- checkpointTopic,
- systemName,
- socketTimeout,
- bufferSize,
- fetchSize,
- metadataStore,
- connectProducer,
- connectZk,
- systemStreamPartitionGrouperFactoryString,
- checkpointTopicProperties = getCheckpointTopicProperties(config))
- }
-
- private def getTopic(jobName: String, jobId: String) =
- "__samza_checkpoint_ver_%d_for_%s_%s" format (CHECKPOINT_LOG_VERSION_NUMBER, jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala
deleted file mode 100644
index 32afe4c..0000000
--- a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package old.checkpoint
-
-import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
-import org.apache.samza.config.Config
-import org.apache.samza.container.TaskName
-import org.apache.samza.coordinator.JobCoordinator
-import org.apache.samza.coordinator.stream.messages.{CoordinatorStreamMessage, SetMigrationMetaMessage}
-import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamSystemFactory}
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.migration.MigrationPlan
-import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.util.{Logging, NoOpMetricsRegistry}
-import scala.collection.JavaConverters._
-
-class KafkaCheckpointMigration extends MigrationPlan with Logging {
- val source = "CHECKPOINTMIGRATION"
- val migrationKey = "CheckpointMigration09to10"
- val migrationVal = "true"
-
- def migrate(config: Config, getManager:() => KafkaCheckpointManager): Unit = {
- val coordinatorFactory = new CoordinatorStreamSystemFactory
- val coordinatorSystemProducer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
- var manager = getManager()
- // make sure to validate that we only perform migration when the checkpoint topic exists
- if (manager.topicExists) {
- manager.validateTopic
- val checkpointMap = manager.readCheckpointsFromLog()
- manager.stop
-
- manager = getManager()
- val changelogMap = manager.readChangeLogPartitionMapping()
- manager.stop
-
- val coordinatorSystemConsumer = coordinatorFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
- if (migrationVerification(coordinatorSystemConsumer)) {
- info("Migration %s was already performed, doing nothing" format migrationKey)
- return
- }
- info("No previous migration for %s were detected, performing migration" format migrationKey)
- val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, source)
- checkpointManager.start()
- checkpointMap.foreach { case (taskName: TaskName, checkpoint: Checkpoint) => checkpointManager.writeCheckpoint(taskName, checkpoint) }
- val changelogPartitionManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, source)
- changelogPartitionManager.start()
- changelogPartitionManager.writeChangeLogPartitionMapping(changelogMap)
- changelogPartitionManager.stop()
- checkpointManager.stop()
- }
- migrationCompletionMark(coordinatorSystemProducer)
- }
-
- override def migrate(config: Config) {
- val factory = new KafkaCheckpointManagerFactory
- def getManager() = factory.getCheckpointManager(config, new NoOpMetricsRegistry)
- migrate(config, getManager)
- }
-
- def migrationVerification(coordinatorSystemConsumer : CoordinatorStreamSystemConsumer): Boolean = {
- coordinatorSystemConsumer.register()
- coordinatorSystemConsumer.start()
- coordinatorSystemConsumer.bootstrap()
- val stream = coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE)
- val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal)
- stream.contains(message.asInstanceOf[CoordinatorStreamMessage])
- }
-
- def migrationCompletionMark(coordinatorSystemProducer: CoordinatorStreamSystemProducer) = {
- info("Marking completion of migration %s" format migrationKey)
- val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal)
- coordinatorSystemProducer.start()
- coordinatorSystemProducer.send(message)
- coordinatorSystemProducer.stop()
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
new file mode 100644
index 0000000..ea8462d
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint.kafka
+
+import java.util
+
+import org.apache.samza.SamzaException
+import org.apache.samza.container.TaskName
+import org.codehaus.jackson.`type`.TypeReference
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.JavaConversions._
+
+/**
+ * Kafka Checkpoint Log-specific key used to identify what type of entry is
+ * written for any particular log entry.
+ *
+ * @param map Backing map to hold key values
+ */
+class KafkaCheckpointLogKey private (val map: Map[String, String]) {
+ // This might be better as a case class...
+ import org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey._
+
+ /**
+ * Serialize this key to bytes
+ * @return Key as bytes
+ */
+ def toBytes(): Array[Byte] = {
+ val jMap = new util.HashMap[String, String](map.size)
+ jMap.putAll(map)
+
+ JSON_MAPPER.writeValueAsBytes(jMap)
+ }
+
+ private def getKey = map.getOrElse(CHECKPOINT_KEY_KEY, throw new SamzaException("No " + CHECKPOINT_KEY_KEY + " in map for Kafka Checkpoint log key"))
+
+ /**
+ * Is this key for a checkpoint entry?
+ *
+ * @return true iff this key's entry is for a checkpoint
+ */
+ def isCheckpointKey = getKey.equals(CHECKPOINT_KEY_TYPE)
+
+ /**
+ * Is this key for a changelog partition mapping?
+ *
+ * @return true iff this key's entry is for a changelog partition mapping
+ */
+ @Deprecated
+ def isChangelogPartitionMapping = getKey.equals(CHANGELOG_PARTITION_KEY_TYPE)
+
+ /**
+ * If this Key is for a checkpoint entry, return its associated TaskName.
+ *
+ * @return TaskName for this checkpoint or throw an exception if this key does not have a TaskName entry
+ */
+ def getCheckpointTaskName = {
+ val asString = map.getOrElse(CHECKPOINT_TASKNAME_KEY, throw new SamzaException("No TaskName in checkpoint key: " + this))
+ new TaskName(asString)
+ }
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[KafkaCheckpointLogKey]
+
+ override def equals(other: Any): Boolean = other match {
+ case that: KafkaCheckpointLogKey =>
+ (that canEqual this) &&
+ map == that.map
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ val state = Seq(map)
+ state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+ }
+}
+
+object KafkaCheckpointLogKey {
+ /**
+ * Messages in the checkpoint log have keys associated with them. These keys are maps that describe the message's
+ * type, either a checkpoint or a changelog-partition-mapping.
+ */
+ val CHECKPOINT_KEY_KEY = "type"
+ val CHECKPOINT_KEY_TYPE = "checkpoint"
+
+ @Deprecated
+ val CHANGELOG_PARTITION_KEY_TYPE = "changelog-partition-mapping"
+
+ val CHECKPOINT_TASKNAME_KEY = "taskName"
+ val SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY = "systemstreampartition-grouper-factory"
+
+ /**
+ * Partition mapping keys have no dynamic values, so we just need one instance.
+ */
+ @Deprecated
+ val CHANGELOG_PARTITION_MAPPING_KEY = new KafkaCheckpointLogKey(Map(CHECKPOINT_KEY_KEY -> CHANGELOG_PARTITION_KEY_TYPE))
+
+ private val JSON_MAPPER = new ObjectMapper()
+ val KEY_TYPEREFERENCE = new TypeReference[util.HashMap[String, String]]() {}
+
+ var systemStreamPartitionGrouperFactoryString:Option[String] = None
+
+ /**
+ * Set the name of the factory configured to provide the SystemStreamPartition grouping
+ * so it be included in the key.
+ *
+ * @param str Config value of SystemStreamPartition Grouper Factory
+ */
+ def setSystemStreamPartitionGrouperFactoryString(str:String) = {
+ systemStreamPartitionGrouperFactoryString = Some(str)
+ }
+
+ /**
+ * Get the name of the factory configured to provide the SystemStreamPartition grouping
+ * so it be included in the key
+ */
+ def getSystemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString.getOrElse(throw new SamzaException("No SystemStreamPartition grouping factory string has been set."))
+
+ /**
+ * Build a key for a a checkpoint log entry for a particular TaskName
+ * @param taskName TaskName to build for this checkpoint entry
+ *
+ * @return Key for checkpoint log entry
+ */
+ def getCheckpointKey(taskName:TaskName) = {
+ val map = Map(CHECKPOINT_KEY_KEY -> CHECKPOINT_KEY_TYPE,
+ CHECKPOINT_TASKNAME_KEY -> taskName.getTaskName,
+ SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY -> getSystemStreamPartitionGrouperFactoryString)
+
+ new KafkaCheckpointLogKey(map)
+ }
+
+ /**
+ * Build a key for a changelog partition mapping entry
+ *
+ * @return Key for changelog partition mapping entry
+ */
+ @Deprecated
+ def getChangelogPartitionMappingKey() = CHANGELOG_PARTITION_MAPPING_KEY
+
+ /**
+ * Deserialize a Kafka checkpoint log key
+ * @param bytes Serialized (via JSON) Kafka checkpoint log key
+ * @return Checkpoint log key
+ */
+ def fromBytes(bytes: Array[Byte]): KafkaCheckpointLogKey = {
+ try {
+ val jmap: util.HashMap[String, String] = JSON_MAPPER.readValue(bytes, KEY_TYPEREFERENCE)
+
+ if(!jmap.containsKey(CHECKPOINT_KEY_KEY)) {
+ throw new SamzaException("No type entry in checkpoint key: " + jmap)
+ }
+
+ // Only checkpoint keys have ssp grouper factory keys
+ if(jmap.get(CHECKPOINT_KEY_KEY).equals(CHECKPOINT_KEY_TYPE)) {
+ val sspGrouperFactory = jmap.get(SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY)
+
+ if (sspGrouperFactory == null) {
+ throw new SamzaException("No SystemStreamPartition Grouper factory entry in checkpoint key: " + jmap)
+ }
+
+ if (!sspGrouperFactory.equals(getSystemStreamPartitionGrouperFactoryString)) {
+ throw new DifferingSystemStreamPartitionGrouperFactoryValues(sspGrouperFactory, getSystemStreamPartitionGrouperFactoryString)
+ }
+ }
+
+ new KafkaCheckpointLogKey(jmap.toMap)
+ } catch {
+ case e: Exception =>
+ throw new SamzaException("Exception while deserializing checkpoint key", e)
+ }
+ }
+}
+
+class DifferingSystemStreamPartitionGrouperFactoryValues(inKey:String, inConfig:String) extends SamzaException {
+ override def getMessage() = "Checkpoint key's SystemStreamPartition Grouper factory (" + inKey +
+ ") does not match value from current configuration (" + inConfig + "). " +
+ "This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported."
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
new file mode 100644
index 0000000..787de1f
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.Properties
+
+import kafka.api._
+import kafka.common.{ErrorMapping, InvalidMessageSizeException, TopicAndPartition, UnknownTopicOrPartitionException}
+import kafka.consumer.SimpleConsumer
+import kafka.message.InvalidMessageException
+import kafka.utils.Utils
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
+import org.apache.samza.container.TaskName
+import org.apache.samza.serializers.CheckpointSerde
+import org.apache.samza.system.kafka.TopicMetadataCache
+import org.apache.samza.util._
+
+import scala.collection.mutable
+
+/**
+ * Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
+ * To read a checkpoint for a specific taskName, we find the newest message
+ * keyed to that taskName. If there is no such message, no checkpoint data
+ * exists. The underlying log has a single partition into which all
+ * checkpoints and TaskName to changelog partition mappings are written.
+ */
+class KafkaCheckpointManager(
+ clientId: String,
+ checkpointTopic: String,
+ val systemName: String,
+ replicationFactor: Int,
+ socketTimeout: Int,
+ bufferSize: Int,
+ fetchSize: Int,
+ val metadataStore: TopicMetadataStore,
+ connectProducer: () => Producer[Array[Byte], Array[Byte]],
+ val connectZk: () => ZkClient,
+ systemStreamPartitionGrouperFactoryString: String,
+ val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
+ serde: CheckpointSerde = new CheckpointSerde,
+ checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging {
+ import org.apache.samza.checkpoint.kafka.KafkaCheckpointManager._
+
+ var taskNames = Set[TaskName]()
+ var producer: Producer[Array[Byte], Array[Byte]] = null
+ var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
+
+ var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint
+ val kafkaUtil: KafkaUtil = new KafkaUtil(retryBackoff, connectZk)
+
+ KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString)
+
+ info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName))
+
+ /**
+ * Write Checkpoint for specified taskName to log
+ *
+ * @param taskName Specific Samza taskName of which to write a checkpoint of.
+ * @param checkpoint Reference to a Checkpoint object to store offset data in.
+ **/
+ override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
+ val key = KafkaCheckpointLogKey.getCheckpointKey(taskName)
+ val keyBytes = key.toBytes()
+ val msgBytes = serde.toBytes(checkpoint)
+ retryBackoff.run(
+ loop => {
+ if (producer == null) {
+ producer = connectProducer()
+ }
+
+ producer.send(new ProducerRecord(checkpointTopic, 0, keyBytes, msgBytes)).get()
+ loop.done
+ },
+
+ (exception, loop) => {
+ warn("Failed to write %s partition entry %s: %s. Retrying." format(CHECKPOINT_LOG4J_ENTRY, key, exception))
+ debug("Exception detail:", exception)
+ if (producer != null) {
+ producer.close
+ }
+ producer = null
+ }
+ )
+ }
+
+ private def getConsumer(): SimpleConsumer = {
+ val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
+ val metadata = metadataMap(checkpointTopic)
+ val partitionMetadata = metadata.partitionsMetadata
+ .filter(_.partitionId == 0)
+ .headOption
+ .getOrElse(throw new KafkaUtilException("Tried to find partition information for partition 0 for checkpoint topic, but it didn't exist in Kafka."))
+ val leader = partitionMetadata
+ .leader
+ .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic))
+
+ info("Connecting to leader %s:%d for topic %s and to fetch all checkpoint messages." format(leader.host, leader.port, checkpointTopic))
+
+ new SimpleConsumer(leader.host, leader.port, socketTimeout, bufferSize, clientId)
+ }
+
+ private def getEarliestOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition): Long = consumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, -1)
+
+ private def getOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition, earliestOrLatest: Long): Long = {
+ val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))))
+ .partitionErrorAndOffsets
+ .get(topicAndPartition)
+ .getOrElse(throw new KafkaUtilException("Unable to find offset information for %s:0" format checkpointTopic))
+ // Fail or retry if there was an an issue with the offset request.
+ KafkaUtil.maybeThrowException(offsetResponse.error)
+
+ val offset: Long = offsetResponse
+ .offsets
+ .headOption
+ .getOrElse(throw new KafkaUtilException("Got response, but no offsets defined for %s:0" format checkpointTopic))
+
+ offset
+ }
+
+ /**
+ * Read the last checkpoint for specified TaskName
+ *
+ * @param taskName Specific Samza taskName for which to get the last checkpoint of.
+ **/
+ override def readLastCheckpoint(taskName: TaskName): Checkpoint = {
+ if (!taskNames.contains(taskName)) {
+ throw new SamzaException(taskName + " not registered with this CheckpointManager")
+ }
+
+ info("Reading checkpoint for taskName " + taskName)
+
+ if (taskNamesToOffsets == null) {
+ info("No TaskName to checkpoint mapping provided. Reading for first time.")
+ taskNamesToOffsets = readCheckpointsFromLog()
+ } else {
+ info("Already existing checkpoint mapping. Merging new offsets")
+ taskNamesToOffsets ++= readCheckpointsFromLog()
+ }
+
+ val checkpoint = taskNamesToOffsets.get(taskName).getOrElse(null)
+
+ info("Got checkpoint state for taskName %s: %s" format(taskName, checkpoint))
+
+ checkpoint
+ }
+
+ /**
+ * Read through entire log, discarding changelog mapping, and building map of TaskNames to Checkpoints
+ */
+ def readCheckpointsFromLog(): Map[TaskName, Checkpoint] = {
+ val checkpoints = mutable.Map[TaskName, Checkpoint]()
+
+ def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isCheckpointKey
+
+ def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
+ val taskName = checkpointKey.getCheckpointTaskName
+ val checkpoint = serde.fromBytes(Utils.readBytes(payload))
+ debug("Adding checkpoint " + checkpoint + " for taskName " + taskName)
+ checkpoints.put(taskName, checkpoint) // replacing any existing, older checkpoints as we go
+ }
+
+ readLog(CHECKPOINT_LOG4J_ENTRY, shouldHandleEntry, handleCheckpoint)
+ checkpoints.toMap /* of the immutable kind */
+ }
+
+
+ /**
+ * Common code for reading both changelog partition mapping and change log
+ *
+ * @param entryType What type of entry to look for within the log key's
+ * @param handleEntry Code to handle an entry in the log once it's found
+ */
+ private def readLog(entryType:String, shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
+ handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = {
+ retryBackoff.run[Unit](
+ loop => {
+ val consumer = getConsumer()
+
+ val topicAndPartition = new TopicAndPartition(checkpointTopic, 0)
+
+ try {
+ var offset = startingOffset.getOrElse(getEarliestOffset(consumer, topicAndPartition))
+
+ info("Got offset %s for topic %s and partition 0. Attempting to fetch messages for %s." format(offset, checkpointTopic, entryType))
+
+ val latestOffset = getOffset(consumer, topicAndPartition, OffsetRequest.LatestTime)
+
+ info("Get latest offset %s for topic %s and partition 0." format(latestOffset, checkpointTopic))
+
+ if (offset < 0) {
+ info("Got offset 0 (no messages in %s) for topic %s and partition 0, so returning empty collection. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (entryType, checkpointTopic))
+ return
+ }
+
+ while (offset < latestOffset) {
+ val request = new FetchRequestBuilder()
+ .addFetch(checkpointTopic, 0, offset, fetchSize)
+ .maxWait(500)
+ .minBytes(1)
+ .clientId(clientId)
+ .build
+
+ val fetchResponse = consumer.fetch(request)
+ if (fetchResponse.hasError) {
+ warn("Got error code from broker for %s: %s" format(checkpointTopic, fetchResponse.errorCode(checkpointTopic, 0)))
+ val errorCode = fetchResponse.errorCode(checkpointTopic, 0)
+ if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
+ warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic))
+ return
+ }
+ KafkaUtil.maybeThrowException(errorCode)
+ }
+
+ for (response <- fetchResponse.messageSet(checkpointTopic, 0)) {
+ offset = response.nextOffset
+ startingOffset = Some(offset) // For next time we call
+
+ if (!response.message.hasKey) {
+ throw new KafkaUtilException("Encountered message without key.")
+ }
+
+ val checkpointKey = KafkaCheckpointLogKey.fromBytes(Utils.readBytes(response.message.key))
+
+ if (!shouldHandleEntry(checkpointKey)) {
+ debug("Skipping " + entryType + " entry with key " + checkpointKey)
+ } else {
+ handleEntry(response.message.payload, checkpointKey)
+ }
+ }
+ }
+ } finally {
+ consumer.close()
+ }
+
+ loop.done
+ Unit
+ },
+
+ (exception, loop) => {
+ exception match {
+ case e: InvalidMessageException => throw new KafkaUtilException("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e)
+ case e: InvalidMessageSizeException => throw new KafkaUtilException("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e)
+ case e: UnknownTopicOrPartitionException => throw new KafkaUtilException("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e)
+ case e: KafkaUtilException => throw e
+ case e: Exception =>
+ warn("While trying to read last %s entry for topic %s and partition 0: %s. Retrying." format(entryType, checkpointTopic, e))
+ debug("Exception detail:", e)
+ }
+ }
+ ).getOrElse(throw new SamzaException("Failed to get entries for " + entryType + " from topic " + checkpointTopic))
+
+ }
+
+ def start {
+ kafkaUtil.createTopic(checkpointTopic, 1, replicationFactor, checkpointTopicProperties)
+ kafkaUtil.validateTopicPartitionCount(checkpointTopic, systemName, metadataStore, 1)
+ }
+
+ def register(taskName: TaskName) {
+ debug("Adding taskName " + taskName + " to " + this)
+ taskNames += taskName
+ }
+
+ def stop = {
+ if (producer != null) {
+ producer.close
+ }
+ }
+
+
+ /**
+ * Read through entire log, discarding checkpoints, finding latest changelogPartitionMapping
+ * To be used for Migration purpose only. In newer version, changelogPartitionMapping will be handled through coordinator stream
+ */
+ @Deprecated
+ def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = {
+ var changelogPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
+
+ def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isChangelogPartitionMapping
+
+ def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
+ changelogPartitionMapping = serde.changelogPartitionMappingFromBytes(Utils.readBytes(payload))
+
+ debug("Adding changelog partition mapping" + changelogPartitionMapping)
+ }
+
+ readLog(CHANGELOG_PARTITION_MAPPING_LOG4j, shouldHandleEntry, handleCheckpoint)
+
+ changelogPartitionMapping
+ }
+
+ override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic)
+}
+
+object KafkaCheckpointManager {
+ val CHECKPOINT_LOG4J_ENTRY = "checkpoint log"
+ val CHANGELOG_PARTITION_MAPPING_LOG4j = "changelog partition mapping"
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
new file mode 100644
index 0000000..7db8940
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka
+
+import java.util.Properties
+
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.{CheckpointManager, CheckpointManagerFactory}
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.config.{Config, KafkaConfig}
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging}
+
+object KafkaCheckpointManagerFactory {
+ val INJECTED_PRODUCER_PROPERTIES = Map(
+ "acks" -> "all",
+ // Forcibly disable compression because Kafka doesn't support compression
+ // on log compacted topics. Details in SAMZA-586.
+ "compression.type" -> "none")
+
+ // Set the checkpoint topic configs to have a very small segment size and
+ // enable log compaction. This keeps job startup time small since there
+ // are fewer useless (overwritten) messages to read from the checkpoint
+ // topic.
+ def getCheckpointTopicProperties(config: Config) = {
+ val segmentBytes: Int = if (config == null) {
+ KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES
+ } else {
+ config.getCheckpointSegmentBytes()
+ }
+ (new Properties /: Map(
+ "cleanup.policy" -> "compact",
+ "segment.bytes" -> String.valueOf(segmentBytes))) { case (props, (k, v)) => props.put(k, v); props }
+ }
+}
+
+class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
+ import org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory._
+
+ def getCheckpointManager(config: Config, registry: MetricsRegistry): CheckpointManager = {
+ val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config)
+ val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs"))
+ val jobId = config.getJobId.getOrElse("1")
+
+ val systemName = config
+ .getCheckpointSystem
+ .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
+
+ val producerConfig = config.getKafkaSystemProducerConfig(
+ systemName,
+ clientId,
+ INJECTED_PRODUCER_PROPERTIES)
+ val connectProducer = () => {
+ new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
+ }
+
+ val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+ val zkConnect = Option(consumerConfig.zkConnect)
+ .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
+ val connectZk = () => {
+ new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+ }
+ val socketTimeout = consumerConfig.socketTimeoutMs
+
+
+ new KafkaCheckpointManager(
+ clientId,
+ KafkaUtil.getCheckpointTopic(jobName, jobId),
+ systemName,
+ config.getCheckpointReplicationFactor.getOrElse("3").toInt,
+ socketTimeout,
+ consumerConfig.socketReceiveBufferBytes,
+ consumerConfig.fetchMessageMaxBytes, // must be > buffer size
+ new ClientUtilTopicMetadataStore(producerConfig.bootsrapServers, clientId, socketTimeout),
+ connectProducer,
+ connectZk,
+ config.getSystemStreamPartitionGrouperFactory, // To find out the SSPGrouperFactory class so it can be included/verified in the key
+ checkpointTopicProperties = getCheckpointTopicProperties(config))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 798033c..a65e8e8 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -40,6 +40,10 @@ object KafkaConfig {
val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
+ val CHECKPOINT_SYSTEM = "task.checkpoint.system"
+ val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
+ val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
+
val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog.replication.factor"
val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka."
// The default segment size to use for changelog topics
@@ -54,10 +58,16 @@ object KafkaConfig {
*/
val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold"
+ val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400
+
implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
}
class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
+ // checkpoints
+ def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM)
+ def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR)
+ def getCheckpointSegmentBytes() = getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
// custom consumer config
def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala b/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
new file mode 100644
index 0000000..c6b1fe4
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.migration
+
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient.ZkClient
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.kafka.{KafkaCheckpointManager, KafkaCheckpointManagerFactory}
+import org.apache.samza.config.Config
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.coordinator.stream.messages.{CoordinatorStreamMessage, SetMigrationMetaMessage}
+import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemFactory, CoordinatorStreamSystemProducer}
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.storage.ChangelogPartitionManager
+import org.apache.samza.util._
+
+/**
+ * Migrates changelog partition mapping from checkpoint topic to coordinator stream
+ */
+class KafkaCheckpointMigration extends MigrationPlan with Logging {
+ val source = "CHECKPOINTMIGRATION"
+ val migrationKey = "CheckpointMigration09to10"
+ val migrationVal = "true"
+
+ var connectZk: () => ZkClient = null
+
+ private def getCheckpointSystemName(config: Config): String = {
+ config
+ .getCheckpointSystem
+ .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
+ }
+
+ private def getClientId(config: Config): String = {
+ KafkaUtil.getClientId("samza-checkpoint-manager", config)
+ }
+
+ private def getTopicMetadataStore(config: Config): TopicMetadataStore = {
+ val clientId = getClientId(config)
+ val systemName = getCheckpointSystemName(config)
+
+ val producerConfig = config.getKafkaSystemProducerConfig(
+ systemName,
+ clientId,
+ KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
+
+ val consumerConfig = config.getKafkaSystemConsumerConfig(
+ systemName,
+ clientId)
+
+ new ClientUtilTopicMetadataStore(producerConfig.bootsrapServers, clientId, consumerConfig.socketTimeoutMs)
+ }
+
+ private def getConnectZk(config: Config): () => ZkClient = {
+ val clientId = getClientId(config)
+
+ val checkpointSystemName = getCheckpointSystemName(config)
+
+ val consumerConfig = config.getKafkaSystemConsumerConfig(
+ checkpointSystemName,
+ clientId)
+
+ val zkConnectString = Option(consumerConfig.zkConnect)
+ .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
+ () => {
+ new ZkClient(zkConnectString, 6000, 6000, ZKStringSerializer)
+ }
+ }
+
+ override def migrate(config: Config) {
+ val jobName = config.getName.getOrElse(throw new SamzaException("Cannot find job name. Cannot proceed with migration."))
+ val jobId = config.getJobId.getOrElse("1")
+
+ val checkpointTopicName = KafkaUtil.getCheckpointTopic(jobName, jobId)
+
+ val coordinatorSystemFactory = new CoordinatorStreamSystemFactory
+ val coordinatorSystemConsumer = coordinatorSystemFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
+ val coordinatorSystemProducer = coordinatorSystemFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+
+ val checkpointManager = new KafkaCheckpointManagerFactory().getCheckpointManager(config, new NoOpMetricsRegistry).asInstanceOf[KafkaCheckpointManager]
+
+ val kafkaUtil = new KafkaUtil(new ExponentialSleepStrategy, getConnectZk(config))
+
+ // make sure to validate that we only perform migration when checkpoint topic exists
+ if (kafkaUtil.topicExists(checkpointTopicName)) {
+ kafkaUtil.validateTopicPartitionCount(
+ checkpointTopicName,
+ getCheckpointSystemName(config),
+ getTopicMetadataStore(config),
+ 1)
+
+ if (migrationVerification(coordinatorSystemConsumer)) {
+ info("Migration %s was already performed, doing nothing" format migrationKey)
+ return
+ }
+
+ info("No previous migration for %s were detected, performing migration" format migrationKey)
+
+ info("Loading changelog partition mapping from checkpoint topic - %s" format checkpointTopicName)
+ val changelogMap = checkpointManager.readChangeLogPartitionMapping()
+ checkpointManager.stop
+
+ info("Writing changelog to coordinator stream topic - %s" format Util.getCoordinatorStreamName(jobName, jobId))
+ val changelogPartitionManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, source)
+ changelogPartitionManager.start()
+ changelogPartitionManager.writeChangeLogPartitionMapping(changelogMap)
+ changelogPartitionManager.stop()
+ }
+ migrationCompletionMark(coordinatorSystemProducer)
+
+ }
+
+ def migrationVerification(coordinatorSystemConsumer : CoordinatorStreamSystemConsumer): Boolean = {
+ coordinatorSystemConsumer.register()
+ coordinatorSystemConsumer.start()
+ coordinatorSystemConsumer.bootstrap()
+ val stream = coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE)
+ val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal)
+ stream.contains(message.asInstanceOf[CoordinatorStreamMessage])
+ }
+
+ def migrationCompletionMark(coordinatorSystemProducer: CoordinatorStreamSystemProducer) = {
+ info("Marking completion of migration %s" format migrationKey)
+ val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal)
+ coordinatorSystemProducer.start()
+ coordinatorSystemProducer.send(message)
+ coordinatorSystemProducer.stop()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index a7a095b..f4311d1 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -19,16 +19,24 @@
package org.apache.samza.util
+import java.util.Properties
import java.util.concurrent.atomic.AtomicLong
+import kafka.admin.AdminUtils
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
import org.apache.kafka.common.PartitionInfo
import org.apache.samza.config.Config
import org.apache.samza.config.ConfigException
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.system.OutgoingMessageEnvelope
-import kafka.common.ErrorMapping
-import kafka.common.ReplicaNotAvailableException
+import kafka.common.{TopicExistsException, ErrorMapping, ReplicaNotAvailableException}
+import org.apache.samza.system.kafka.TopicMetadataCache
object KafkaUtil extends Logging {
+ /**
+ * Version number to track the format of the checkpoint log
+ */
+ val CHECKPOINT_LOG_VERSION_NUMBER = 1
val counter = new AtomicLong(0)
def getClientId(id: String, config: Config): String = getClientId(
@@ -51,6 +59,9 @@ object KafkaUtil extends Logging {
abs(envelope.getPartitionKey.hashCode()) % numPartitions
}
+ def getCheckpointTopic(jobName: String, jobId: String) =
+ "__samza_checkpoint_ver_%d_for_%s_%s" format (CHECKPOINT_LOG_VERSION_NUMBER, jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+
/**
* Exactly the same as Kafka's ErrorMapping.maybeThrowException
* implementation, except suppresses ReplicaNotAvailableException exceptions.
@@ -75,3 +86,102 @@ object KafkaUtil extends Logging {
code != ErrorMapping.NoError && code != ErrorMapping.ReplicaNotAvailableCode
}
}
+
+class KafkaUtil(val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
+ val connectZk: () => ZkClient) extends Logging {
+ /**
+ * Common code for creating a topic in Kafka
+ *
+ * @param topicName Name of the topic to be created
+ * @param partitionCount Number of partitions in the topic
+ * @param replicationFactor Number of replicas for the topic
+ * @param topicProperties Any topic related properties
+ */
+ def createTopic(topicName: String, partitionCount: Int, replicationFactor: Int, topicProperties: Properties = new Properties) {
+ info("Attempting to create topic %s." format topicName)
+ retryBackoff.run(
+ loop => {
+ val zkClient = connectZk()
+ try {
+ AdminUtils.createTopic(
+ zkClient,
+ topicName,
+ partitionCount,
+ replicationFactor,
+ topicProperties)
+ } finally {
+ zkClient.close
+ }
+
+ info("Created topic %s." format topicName)
+ loop.done
+ },
+
+ (exception, loop) => {
+ exception match {
+ case tee: TopicExistsException =>
+ info("Topic %s already exists." format topicName)
+ loop.done
+ case e: Exception =>
+ warn("Failed to create topic %s: %s. Retrying." format(topicName, e))
+ debug("Exception detail:", e)
+ }
+ }
+ )
+ }
+
+ /**
+ * Common code to validate partition count in a topic
+ *
+ * @param topicName Name of the topic to be validated
+ * @param systemName Kafka system to use
+ * @param metadataStore Topic Metadata store
+ * @param expectedPartitionCount Expected number of partitions
+ */
+ def validateTopicPartitionCount(topicName: String,
+ systemName: String,
+ metadataStore: TopicMetadataStore,
+ expectedPartitionCount: Int) {
+ info("Validating topic %s. Expecting partition count: %d" format (topicName, expectedPartitionCount))
+ retryBackoff.run(
+ loop => {
+ val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo)
+ val topicMetadata = topicMetadataMap(topicName)
+ KafkaUtil.maybeThrowException(topicMetadata.errorCode)
+
+ val partitionCount = topicMetadata.partitionsMetadata.length
+ if (partitionCount != expectedPartitionCount) {
+ throw new KafkaUtilException("Validation failed for topic %s because partition count %s did not " +
+ "match expected partition count of %d." format(topicName, partitionCount, expectedPartitionCount))
+ }
+
+ info("Successfully validated topic %s." format topicName)
+ loop.done
+ },
+
+ (exception, loop) => {
+ exception match {
+ case e: KafkaUtilException => throw e
+ case e: Exception =>
+ warn("While trying to validate topic %s: %s. Retrying." format(topicName, e))
+ debug("Exception detail:", e)
+ }
+ }
+ )
+ }
+
+ /**
+ * Code to verify that a topic exists
+ *
+ * @param topicName Name of the topic
+ * @return If exists, it returns true. Otherwise, false.
+ */
+ def topicExists(topicName: String): Boolean = {
+ val zkClient = connectZk()
+ try {
+ AdminUtils.topicExists(zkClient, topicName)
+ } finally {
+ zkClient.close()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtilException.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtilException.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtilException.scala
new file mode 100644
index 0000000..b9c5dd7
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtilException.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util
+
+import org.apache.samza.SamzaException
+
+/**
+ * KafkaCheckpointManager handles retries, so we need two kinds of exceptions:
+ * one to signal a hard failure, and the other to retry. The
+ * KafkaUtilException is thrown to indicate a hard failure that the Kafka
+ * CheckpointManager can't recover from.
+ */
+class KafkaUtilException(var message: String, t: Throwable) extends SamzaException(message, t) {
+ def this(s: String) = this(s, null)
+}
\ No newline at end of file