You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/10/29 18:20:18 UTC
git commit: SAMZA-354;
add migration tool to manually migrate samza 0.7.0 checkpoint topics
to 0.8.0
Repository: incubator-samza
Updated Branches:
refs/heads/0.8.0 0bab52d82 -> 263d92178
SAMZA-354; add migration tool to manually migrate samza 0.7.0 checkpoint topics to 0.8.0
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/263d9217
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/263d9217
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/263d9217
Branch: refs/heads/0.8.0
Commit: 263d9217856ac092e6a79bdbc6ca4dd38d5c598e
Parents: 0bab52d
Author: Navina Ramesh <nr...@linkedin.com>
Authored: Wed Oct 29 08:40:56 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Oct 29 08:40:56 2014 -0700
----------------------------------------------------------------------
build.gradle | 10 +
.../kafka/KafkaCheckpointManagerFactory.scala | 5 +-
.../samza/util/CheckpointMigrationTool.scala | 352 +++++++++++++++++++
samza-shell/src/main/bash/run-migration-tool.sh | 23 ++
4 files changed, 387 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/263d9217/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 64c223a..94ad593 100644
--- a/build.gradle
+++ b/build.gradle
@@ -313,6 +313,16 @@ project(":samza-shell") {
if (project.hasProperty('configPath')) args += ['--config-path', configPath]
jvmArgs = ["-Dlog4j.configuration=file:src/main/resources/log4j-console.xml"]
}
+
+ // Usage: ./gradlew samza-shell:checkpointMigrationTool \
+ // -PconfigPath=file:///path/to/job/config.properties
+ task checkpointMigrationTool(type:JavaExec) {
+ main = 'org.apache.samza.util.CheckpointMigrationTool'
+ classpath = configurations.gradleShell
+ if (project.hasProperty('configPath')) args += ['--config-path', configPath]
+ if (project.hasProperty('configFactory')) args += ['--config-factory', configFactory]
+ jvmArgs = ["-Dlog4j.configuration=file:src/main/resources/log4j-console.xml"]
+ }
}
project(":samza-kv_$scalaVersion") {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/263d9217/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index f7db2a1..42338f9 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -62,6 +62,8 @@ object KafkaCheckpointManagerFactory {
"cleanup.policy" -> "compact",
"segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
}
+ def getTopic(jobName: String, jobId: String) =
+ "__samza_checkpoint_ver_%d_for_%s_%s" format (CHECKPOINT_LOG_VERSION_NUMBER, jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
}
class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
@@ -114,7 +116,4 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
systemStreamPartitionGrouperFactoryString,
checkpointTopicProperties = getCheckpointTopicProperties(config))
}
-
- private def getTopic(jobName: String, jobId: String) =
- "__samza_checkpoint_ver_%d_for_%s_%s" format (CHECKPOINT_LOG_VERSION_NUMBER, jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/263d9217/samza-kafka/src/main/scala/org/apache/samza/util/CheckpointMigrationTool.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/CheckpointMigrationTool.scala b/samza-kafka/src/main/scala/org/apache/samza/util/CheckpointMigrationTool.scala
new file mode 100644
index 0000000..5c4b3c4
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/CheckpointMigrationTool.scala
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util
+
+import java.util.Properties
+import kafka.admin.AdminUtils
+import kafka.api.{FetchRequestBuilder, PartitionOffsetRequestInfo, OffsetRequest, TopicMetadata}
+import kafka.common.{ErrorMapping, TopicAndPartition, TopicExistsException}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import kafka.producer.{Producer, KeyedMessage}
+import kafka.utils.{Utils, ZKStringSerializer}
+import org.I0Itec.zkclient.ZkClient
+import org.apache.samza.checkpoint.kafka.{KafkaCheckpointManagerFactory, KafkaCheckpointLogKey, KafkaCheckpointException}
+import org.apache.samza.config.Config
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointManagerFactory}
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.{KafkaConfig, Config, StreamConfig}
+import org.apache.samza.container.TaskName
+import org.apache.samza.container.grouper.stream.{GroupByPartitionFactory, SystemStreamPartitionGrouperFactory}
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.serializers.CheckpointSerde
+import org.apache.samza.system.kafka.TopicMetadataCache
+import org.apache.samza.system.{SystemStream, SystemFactory, SystemStreamPartition}
+import org.apache.samza.{Partition, SamzaException}
+import org.codehaus.jackson.map.ObjectMapper
+import scala.collection.JavaConversions
+import scala.collection.JavaConversions._
+import java.util.regex.Pattern
+
+
+/**
+ * <p> Command-line tool for migrating checkpoint topic for a job from 0.7 format to 0.8 format. </p>
+ *
+ * <p> When running this tool, you need to provide the configuration URI of job you
+ * want to migrate. </p>
+ *
+ * <p> If you're building Samza from source, you can use the 'checkpointMigrationTool' gradle
+ * task as a short-cut to running this tool.</p>
+ */
+object CheckpointMigrationTool {
+ val DEFAULT_SSP_GROUPER_FACTORY = classOf[GroupByPartitionFactory].getCanonicalName
+
+ def main(args: Array[String]) {
+ val cmdline = new CommandLine
+ val options = cmdline.parser.parse(args: _*)
+ val config = cmdline.loadConfig(options)
+ val tool = new CheckpointMigrationTool(config)
+ tool.run
+ }
+}
+
+class CheckpointMigrationTool(config: Config) extends Logging {
+
+ def run {
+ val kafkaHelper = new KafkaHelper(config)
+ val newCheckpointTopic = kafkaHelper.getNewCheckpointTopic
+ val oldCheckpointTopic = kafkaHelper.getOldCheckpointTopic
+ val newCheckpointTopicExists = kafkaHelper.checkTopicExists(newCheckpointTopic)
+ val oldCheckpointTopicExists = kafkaHelper.checkTopicExists(oldCheckpointTopic)
+
+ if(!oldCheckpointTopicExists) {
+ info("Checkpoint Topic in old format - %s does not exist. Nothing to do here. Quitting!" format oldCheckpointTopic)
+ } else {
+ if(newCheckpointTopicExists) {
+ info("Checkpoint Topic in new format - %s already exists!" format newCheckpointTopic)
+ info("Proceeding may overwrite more recent offsets in the checkpoint topic.")
+ info("Do you still wish to continue (yes/no)? ")
+ val answer = Console.readLine()
+ if(answer.toLowerCase.equals("yes")) {
+ writeNewCheckpoints(kafkaHelper, oldCheckpointTopic, newCheckpointTopic, false)
+ }
+ } else {
+ writeNewCheckpoints(kafkaHelper, oldCheckpointTopic, newCheckpointTopic, true)
+ }
+ }
+ }
+
+ private def writeNewCheckpoints(kafkaHelper: KafkaHelper, oldCheckpointTopic: String, newCheckpointTopic: String, createTopic: Boolean) = {
+ //Read most recent offsets from 0.7 checkpoint topic
+ val ssp = Util.getInputStreamPartitions(config)
+ val partitions = ssp.map(_.getPartition).toSet
+ val lastCheckpoint = partitions.flatMap(
+ partition => {
+ val cp = kafkaHelper.readLastCheckpoint(oldCheckpointTopic, partition)
+ if(cp != null) {
+ cp.getOffsets.map {
+ case (systemStream, offset) => new SystemStreamPartition(systemStream, partition) -> offset
+ }
+ } else {
+ throw new KafkaCheckpointException("Received null as offset for checkpoint topic %s partition %d" format(oldCheckpointTopic, partition.getPartitionId))
+ }
+ }).toMap
+
+ //Group input streams by partition (Default for 0.7 implementations)
+ val factory = Util.getObj[SystemStreamPartitionGrouperFactory](CheckpointMigrationTool.DEFAULT_SSP_GROUPER_FACTORY)
+ val taskNameToSSP = factory.getSystemStreamPartitionGrouper(config).group(lastCheckpoint.keySet)
+
+ //Create topic, if it doesn't already exist
+ if(createTopic) {
+ kafkaHelper.createTopic(newCheckpointTopic)
+ }
+ taskNameToSSP.map {
+ entry =>
+ val taskName = entry._1
+ val ssps = entry._2
+ val checkpoints = ssps.map {
+ ssp =>
+ ssp -> lastCheckpoint
+ .get(ssp)
+ .getOrElse(
+ new KafkaCheckpointException("Did not find checkpoint for %s partition %d" format(ssp.getSystemStream.toString, ssp.getPartition.getPartitionId)))
+ .toString
+ }.toMap
+ kafkaHelper.writeCheckpoint(taskName, new Checkpoint(checkpoints), newCheckpointTopic)
+ }
+ }
+}
+
+class KafkaHelper(config: Config) extends Logging {
+ val CHECKPOINT_LOG_VERSION_NUMBER = 1
+ val INJECTED_PRODUCER_PROPERTIES = Map("request.required.acks" -> "-1",
+ "compression.codec" -> "none",
+ "producer.type" -> "sync",
+ "message.send.max.retries" -> (Integer.MAX_VALUE - 1).toString)
+
+ private val getCheckpointTopicProperties = {
+ val segmentBytes = config.getCheckpointSegmentBytes.getOrElse("26214400")
+ (new Properties /: Map("cleanup.policy" -> "compact", "segment.bytes" -> segmentBytes)) {
+ case (props, (k, v)) => props.put(k, v); props
+ }
+ }
+
+ val retryBackoff = new ExponentialSleepStrategy
+ val jsonMapper = new ObjectMapper()
+
+ //Parse config data
+ val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs"))
+ val jobId = config.getJobId.getOrElse("1")
+ val clientId = KafkaUtil.getClientId("samza-checkpoint-migration-tool", config)
+ val replicationFactor = config.getCheckpointReplicationFactor.getOrElse("3").toInt
+
+ val systemName = config.getCheckpointSystem.getOrElse(
+ throw new SamzaException("no system defined for Kafka's checkpoint manager."))
+
+ //Kafka Producer Config
+ val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId, INJECTED_PRODUCER_PROPERTIES)
+ val brokersListString = Option(producerConfig.brokerList).getOrElse(throw new SamzaException(
+ "No broker list defined in config for %s." format systemName))
+
+ private val connectProducer = () => { new Producer[Array[Byte], Array[Byte]](producerConfig) }
+
+ //Kafka Consumer Config
+ val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+ val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, clientId, consumerConfig.socketTimeoutMs)
+ val zkConnect = Option(consumerConfig.zkConnect).getOrElse(throw new SamzaException(
+ "no zookeeper.connect defined in config"))
+
+ private val connectZk = () => { new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) }
+
+ def getOldCheckpointTopic: String = {
+ "__samza_checkpoint_%s_%s" format(jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+ }
+
+ def getNewCheckpointTopic: String = KafkaCheckpointManagerFactory.getTopic(jobName, jobId)
+
+ private def getTopicMetadata(topicName: String): TopicMetadata = {
+ val metadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName),
+ systemName,
+ (topics: Set[String]) => metadataStore.getTopicInfo(topics))
+ metadataMap(topicName)
+ }
+
+ private def getKafkaConsumer(metadata: TopicMetadata, partition: Partition) = {
+ val partitionMetadata = metadata.partitionsMetadata
+ .filter(_.partitionId == partition.getPartitionId)
+ .headOption
+ .getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition %d, but it didn't exist in Kafka." format partition.getPartitionId))
+ val leader = partitionMetadata.leader.getOrElse(throw new SamzaException("No leader available for topic %s" format metadata.topic))
+
+ info("Connecting to leader %s:%d for topic %s and partition %s to fetch last checkpoint message." format(leader.host, leader.port, metadata.topic, partition.getPartitionId))
+ new SimpleConsumer(leader.host,
+ leader.port,
+ consumerConfig.socketTimeoutMs,
+ consumerConfig.socketReceiveBufferBytes,
+ clientId)
+ }
+
+ def checkTopicExists(topicName: String): Boolean = {
+ val zkClient = connectZk()
+ try {
+ AdminUtils.topicExists(zkClient = zkClient, topic = topicName)
+ } catch {
+ case e: Exception =>
+ warn("Failed to check for existence of topic %s : %s" format(topicName, e))
+ debug("Exception detail:", e)
+ false
+ } finally {
+ zkClient.close
+ }
+ }
+
+ def createTopic(topicName: String) = {
+ info("Attempting to create checkpoint topic %s." format topicName)
+ retryBackoff.run(
+ loop => {
+ val zkClient = connectZk()
+ try {
+ AdminUtils.createTopic(zkClient, topicName, 1, replicationFactor, getCheckpointTopicProperties)
+ } finally {
+ zkClient.close
+ }
+ info("Created checkpoint topic %s." format topicName)
+ loop.done
+ },
+ (exception, loop) => {
+ exception match {
+ case e: TopicExistsException =>
+ info("Checkpoint topic %s already exists." format topicName)
+ loop.done
+ case e: Exception =>
+ warn("Failed to create topic %s: %s. Retrying." format(topicName, e))
+ debug("Exception detail:", e)
+ }
+ })
+ }
+
+ def readLastCheckpoint(topicName: String, partition: Partition): Checkpoint = {
+ info("Reading checkpoint for partition %s." format partition.getPartitionId)
+ val metadata = getTopicMetadata(topicName)
+ val checkpoint = retryBackoff.run(
+ loop => {
+ val consumer = getKafkaConsumer(metadata, partition)
+ val partitionId = partition.getPartitionId
+ try {
+ val topicAndPartition = new TopicAndPartition(topicName, partitionId)
+ val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(
+ Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))))
+ .partitionErrorAndOffsets.get(topicAndPartition)
+ .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:%d" format(topicName, partitionId)))
+
+ // Fail or retry if there was an an issue with the offset request.
+ ErrorMapping.maybeThrowException(offsetResponse.error)
+
+ val offset = offsetResponse.offsets.headOption.getOrElse(
+ throw new KafkaCheckpointException("Got response, but no offsets defined for %s:%d" format(topicName, partitionId)))
+
+ info("Got offset %s for topic %s and partition %s. Attempting to fetch message." format(offset, topicName, partitionId))
+
+ if (offset <= 0) {
+ info("Got offset 0 (no messages in checkpoint topic) for topic %s and partition %s, so returning null. If you expected the checkpoint topic to have messages, you're probably going to lose data." format(topicName, partition))
+ return null
+ }
+
+ val request = new FetchRequestBuilder()
+ .addFetch(topicName,
+ partitionId,
+ offset - 1,
+ consumerConfig.fetchMessageMaxBytes).maxWait(500).minBytes(1).clientId(clientId).build
+ val messageSet = consumer.fetch(request)
+ if (messageSet.hasError) {
+ warn("Got error code from broker for %s: %s" format(topicName, messageSet.errorCode(topicName,
+ partitionId)))
+ val errorCode = messageSet.errorCode(topicName, partitionId)
+ if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
+ warn("Got an offset out of range exception while getting last checkpoint for topic %s and partition %s, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format(topicName, partitionId))
+ return null
+ }
+ ErrorMapping.maybeThrowException(errorCode)
+ }
+ val messages = messageSet.messageSet(topicName, partitionId).toList
+
+ if (messages.length != 1) {
+ throw new KafkaCheckpointException("Something really unexpected happened. Got %s "
+ + "messages back when fetching from checkpoint topic %s and partition %s. "
+ + "Expected one message. It would be unsafe to go on without the latest checkpoint, "
+ + "so failing." format(messages.length, topicName, partition))
+ }
+
+ val checkpoint = jsonMapper.readValue(Utils.readBytes(messages(0).message.payload),
+ classOf[java.util.Map[String, java.util.Map[String, String]]]).flatMap {
+ case (systemName, streamToOffsetMap) =>
+ streamToOffsetMap.map {
+ case (streamName, offset) => (new SystemStreamPartition(systemName, streamName, partition), offset)
+ }
+ }
+ val results = JavaConversions.mapAsJavaMap(checkpoint)
+ loop.done
+ return new Checkpoint(results)
+ } finally {
+ consumer.close
+ }
+ },
+ (exception, loop) => {
+ exception match {
+ case e: KafkaCheckpointException => throw e
+ case e: Exception =>
+ warn("While trying to read last checkpoint for topic %s and partition %s: %s. Retrying." format(topicName, partition, e))
+ debug("Exception detail:", e)
+ }
+ }).getOrElse(throw new SamzaException("Failed to get checkpoint for partition %s" format partition.getPartitionId))
+
+ info("Got checkpoint state for partition %s: %s" format(partition.getPartitionId, checkpoint))
+ checkpoint
+ }
+
+ def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint, topicName: String) = {
+ KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(classOf[GroupByPartitionFactory].getCanonicalName)
+ val serde = new CheckpointSerde
+ val key = KafkaCheckpointLogKey.getCheckpointKey(taskName)
+ val keyBytes = key.toBytes()
+ val msgBytes = serde.toBytes(checkpoint)
+
+ var producer = connectProducer()
+ retryBackoff.run(
+ loop => {
+ if (producer == null) {
+ producer = connectProducer()
+ }
+
+ producer.send(new KeyedMessage(topicName, keyBytes, 0, msgBytes))
+ loop.done
+ },
+ (exception, loop) => {
+ warn("Failed to write partition entry %s: %s. Retrying." format(key, exception))
+ debug("Exception detail:", exception)
+ if (producer != null) {
+ producer.close
+ }
+ producer = null
+ })
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/263d9217/samza-shell/src/main/bash/run-migration-tool.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-migration-tool.sh b/samza-shell/src/main/bash/run-migration-tool.sh
new file mode 100644
index 0000000..7ac7f27
--- /dev/null
+++ b/samza-shell/src/main/bash/run-migration-tool.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if [ -z "$SAMZA_LOG4J_CONFIG" ]; then
+ export SAMZA_LOG4J_CONFIG=file:$(dirname $0)/log4j-console.xml
+fi
+
+exec $(dirname $0)/run-class.sh org.apache.samza.util.CheckpointMigrationTool $@